]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
81a889ed8783481f1493c42c9117364174f51efe
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1 import time, os.path, platform, stat, re, simplejson, struct
2
3 import mock
4
5 from twisted.trial import unittest
6
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
10 import itertools
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22      ReadBucketProxy
23 from allmydata.interfaces import BadWriteEnablerError
24 from allmydata.test.common import LoggingServiceParent
25 from allmydata.test.common_web import WebRenderingMixin
26 from allmydata.test.no_network import NoNetworkServer
27 from allmydata.web.storage import StorageStatus, remove_prefix
28
29 class Marker:
30     pass
31 class FakeCanary:
32     def __init__(self, ignore_disconnectors=False):
33         self.ignore = ignore_disconnectors
34         self.disconnectors = {}
35     def notifyOnDisconnect(self, f, *args, **kwargs):
36         if self.ignore:
37             return
38         m = Marker()
39         self.disconnectors[m] = (f, args, kwargs)
40         return m
41     def dontNotifyOnDisconnect(self, marker):
42         if self.ignore:
43             return
44         del self.disconnectors[marker]
45
46 class FakeStatsProvider:
47     def count(self, name, delta=1):
48         pass
49     def register_producer(self, producer):
50         pass
51
52 class Bucket(unittest.TestCase):
53     def make_workdir(self, name):
54         basedir = os.path.join("storage", "Bucket", name)
55         incoming = os.path.join(basedir, "tmp", "bucket")
56         final = os.path.join(basedir, "bucket")
57         fileutil.make_dirs(basedir)
58         fileutil.make_dirs(os.path.join(basedir, "tmp"))
59         return incoming, final
60
61     def bucket_writer_closed(self, bw, consumed):
62         pass
63     def add_latency(self, category, latency):
64         pass
65     def count(self, name, delta=1):
66         pass
67
68     def make_lease(self):
69         owner_num = 0
70         renew_secret = os.urandom(32)
71         cancel_secret = os.urandom(32)
72         expiration_time = time.time() + 5000
73         return LeaseInfo(owner_num, renew_secret, cancel_secret,
74                          expiration_time, "\x00" * 20)
75
76     def test_create(self):
77         incoming, final = self.make_workdir("test_create")
78         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
79                           FakeCanary())
80         bw.remote_write(0, "a"*25)
81         bw.remote_write(25, "b"*25)
82         bw.remote_write(50, "c"*25)
83         bw.remote_write(75, "d"*7)
84         bw.remote_close()
85
86     def test_readwrite(self):
87         incoming, final = self.make_workdir("test_readwrite")
88         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
89                           FakeCanary())
90         bw.remote_write(0, "a"*25)
91         bw.remote_write(25, "b"*25)
92         bw.remote_write(50, "c"*7) # last block may be short
93         bw.remote_close()
94
95         # now read from it
96         br = BucketReader(self, bw.finalhome)
97         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
98         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
99         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
100
101 class RemoteBucket:
102
103     def callRemote(self, methname, *args, **kwargs):
104         def _call():
105             meth = getattr(self.target, "remote_" + methname)
106             return meth(*args, **kwargs)
107         return defer.maybeDeferred(_call)
108
109 class BucketProxy(unittest.TestCase):
110     def make_bucket(self, name, size):
111         basedir = os.path.join("storage", "BucketProxy", name)
112         incoming = os.path.join(basedir, "tmp", "bucket")
113         final = os.path.join(basedir, "bucket")
114         fileutil.make_dirs(basedir)
115         fileutil.make_dirs(os.path.join(basedir, "tmp"))
116         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
117                           FakeCanary())
118         rb = RemoteBucket()
119         rb.target = bw
120         return bw, rb, final
121
122     def make_lease(self):
123         owner_num = 0
124         renew_secret = os.urandom(32)
125         cancel_secret = os.urandom(32)
126         expiration_time = time.time() + 5000
127         return LeaseInfo(owner_num, renew_secret, cancel_secret,
128                          expiration_time, "\x00" * 20)
129
130     def bucket_writer_closed(self, bw, consumed):
131         pass
132     def add_latency(self, category, latency):
133         pass
134     def count(self, name, delta=1):
135         pass
136
137     def test_create(self):
138         bw, rb, sharefname = self.make_bucket("test_create", 500)
139         bp = WriteBucketProxy(rb, None,
140                               data_size=300,
141                               block_size=10,
142                               num_segments=5,
143                               num_share_hashes=3,
144                               uri_extension_size_max=500)
145         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
146
147     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
148         # Let's pretend each share has 100 bytes of data, and that there are
149         # 4 segments (25 bytes each), and 8 shares total. So the two
150         # per-segment merkle trees (crypttext_hash_tree,
151         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
152         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
153         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
154         # long. That should make the whole share:
155         #
156         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
157         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
158
159         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
160
161         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
162                             for i in range(7)]
163         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
164                         for i in range(7)]
165         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
166                         for i in (1,9,13)]
167         uri_extension = "s" + "E"*498 + "e"
168
169         bw, rb, sharefname = self.make_bucket(name, sharesize)
170         bp = wbp_class(rb, None,
171                        data_size=95,
172                        block_size=25,
173                        num_segments=4,
174                        num_share_hashes=3,
175                        uri_extension_size_max=len(uri_extension))
176
177         d = bp.put_header()
178         d.addCallback(lambda res: bp.put_block(0, "a"*25))
179         d.addCallback(lambda res: bp.put_block(1, "b"*25))
180         d.addCallback(lambda res: bp.put_block(2, "c"*25))
181         d.addCallback(lambda res: bp.put_block(3, "d"*20))
182         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
183         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
184         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
185         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
186         d.addCallback(lambda res: bp.close())
187
188         # now read everything back
189         def _start_reading(res):
190             br = BucketReader(self, sharefname)
191             rb = RemoteBucket()
192             rb.target = br
193             server = NoNetworkServer("abc", None)
194             rbp = rbp_class(rb, server, storage_index="")
195             self.failUnlessIn("to peer", repr(rbp))
196             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
197
198             d1 = rbp.get_block_data(0, 25, 25)
199             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
200             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
201             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
202             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
203             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
204             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
205             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
206
207             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
208             d1.addCallback(lambda res:
209                            self.failUnlessEqual(res, crypttext_hashes))
210             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
211             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
212             d1.addCallback(lambda res: rbp.get_share_hashes())
213             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
214             d1.addCallback(lambda res: rbp.get_uri_extension())
215             d1.addCallback(lambda res:
216                            self.failUnlessEqual(res, uri_extension))
217
218             return d1
219
220         d.addCallback(_start_reading)
221
222         return d
223
224     def test_readwrite_v1(self):
225         return self._do_test_readwrite("test_readwrite_v1",
226                                        0x24, WriteBucketProxy, ReadBucketProxy)
227
228     def test_readwrite_v2(self):
229         return self._do_test_readwrite("test_readwrite_v2",
230                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
231
232 class Server(unittest.TestCase):
233
234     def setUp(self):
235         self.sparent = LoggingServiceParent()
236         self.sparent.startService()
237         self._lease_secret = itertools.count()
238     def tearDown(self):
239         return self.sparent.stopService()
240
241     def workdir(self, name):
242         basedir = os.path.join("storage", "Server", name)
243         return basedir
244
245     def create(self, name, reserved_space=0, klass=StorageServer):
246         workdir = self.workdir(name)
247         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
248                    stats_provider=FakeStatsProvider())
249         ss.setServiceParent(self.sparent)
250         return ss
251
252     def test_create(self):
253         self.create("test_create")
254
255     def allocate(self, ss, storage_index, sharenums, size, canary=None):
256         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
257         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
258         if not canary:
259             canary = FakeCanary()
260         return ss.remote_allocate_buckets(storage_index,
261                                           renew_secret, cancel_secret,
262                                           sharenums, size, canary)
263
264     def test_large_share(self):
265         syslow = platform.system().lower()
266         if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
267             raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
268
269         avail = fileutil.get_available_space('.', 512*2**20)
270         if avail <= 4*2**30:
271             raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
272
273         ss = self.create("test_large_share")
274
275         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
276         self.failUnlessEqual(already, set())
277         self.failUnlessEqual(set(writers.keys()), set([0]))
278
279         shnum, bucket = writers.items()[0]
280         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
281         bucket.remote_write(2**32, "ab")
282         bucket.remote_close()
283
284         readers = ss.remote_get_buckets("allocate")
285         reader = readers[shnum]
286         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
287
288     def test_dont_overfill_dirs(self):
289         """
290         This test asserts that if you add a second share whose storage index
291         share lots of leading bits with an extant share (but isn't the exact
292         same storage index), this won't add an entry to the share directory.
293         """
294         ss = self.create("test_dont_overfill_dirs")
295         already, writers = self.allocate(ss, "storageindex", [0], 10)
296         for i, wb in writers.items():
297             wb.remote_write(0, "%10d" % i)
298             wb.remote_close()
299         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
300                                 "shares")
301         children_of_storedir = set(os.listdir(storedir))
302
303         # Now store another one under another storageindex that has leading
304         # chars the same as the first storageindex.
305         already, writers = self.allocate(ss, "storageindey", [0], 10)
306         for i, wb in writers.items():
307             wb.remote_write(0, "%10d" % i)
308             wb.remote_close()
309         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
310                                 "shares")
311         new_children_of_storedir = set(os.listdir(storedir))
312         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
313
314     def test_remove_incoming(self):
315         ss = self.create("test_remove_incoming")
316         already, writers = self.allocate(ss, "vid", range(3), 10)
317         for i,wb in writers.items():
318             wb.remote_write(0, "%10d" % i)
319             wb.remote_close()
320         incoming_share_dir = wb.incominghome
321         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
322         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
323         incoming_dir = os.path.dirname(incoming_prefix_dir)
324         self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
325         self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
326         self.failUnless(os.path.exists(incoming_dir), incoming_dir)
327
328     def test_abort(self):
329         # remote_abort, when called on a writer, should make sure that
330         # the allocated size of the bucket is not counted by the storage
331         # server when accounting for space.
332         ss = self.create("test_abort")
333         already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
334         self.failIfEqual(ss.allocated_size(), 0)
335
336         # Now abort the writers.
337         for writer in writers.itervalues():
338             writer.remote_abort()
339         self.failUnlessEqual(ss.allocated_size(), 0)
340
341
342     def test_allocate(self):
343         ss = self.create("test_allocate")
344
345         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
346
347         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
348         self.failUnlessEqual(already, set())
349         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
350
351         # while the buckets are open, they should not count as readable
352         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
353
354         # close the buckets
355         for i,wb in writers.items():
356             wb.remote_write(0, "%25d" % i)
357             wb.remote_close()
358             # aborting a bucket that was already closed is a no-op
359             wb.remote_abort()
360
361         # now they should be readable
362         b = ss.remote_get_buckets("allocate")
363         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
364         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
365         b_str = str(b[0])
366         self.failUnlessIn("BucketReader", b_str)
367         self.failUnlessIn("mfwgy33dmf2g 0", b_str)
368
369         # now if we ask about writing again, the server should offer those
370         # three buckets as already present. It should offer them even if we
371         # don't ask about those specific ones.
372         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
373         self.failUnlessEqual(already, set([0,1,2]))
374         self.failUnlessEqual(set(writers.keys()), set([3,4]))
375
376         # while those two buckets are open for writing, the server should
377         # refuse to offer them to uploaders
378
379         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
380         self.failUnlessEqual(already2, set([0,1,2]))
381         self.failUnlessEqual(set(writers2.keys()), set([5]))
382
383         # aborting the writes should remove the tempfiles
384         for i,wb in writers2.items():
385             wb.remote_abort()
386         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
387         self.failUnlessEqual(already2, set([0,1,2]))
388         self.failUnlessEqual(set(writers2.keys()), set([5]))
389
390         for i,wb in writers2.items():
391             wb.remote_abort()
392         for i,wb in writers.items():
393             wb.remote_abort()
394
395     def test_bad_container_version(self):
396         ss = self.create("test_bad_container_version")
397         a,w = self.allocate(ss, "si1", [0], 10)
398         w[0].remote_write(0, "\xff"*10)
399         w[0].remote_close()
400
401         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
402         f = open(fn, "rb+")
403         f.seek(0)
404         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
405         f.close()
406
407         ss.remote_get_buckets("allocate")
408
409         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
410                                   ss.remote_get_buckets, "si1")
411         self.failUnlessIn(" had version 0 but we wanted 1", str(e))
412
413     def test_disconnect(self):
414         # simulate a disconnection
415         ss = self.create("test_disconnect")
416         canary = FakeCanary()
417         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
418         self.failUnlessEqual(already, set())
419         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
420         for (f,args,kwargs) in canary.disconnectors.values():
421             f(*args, **kwargs)
422         del already
423         del writers
424
425         # that ought to delete the incoming shares
426         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
427         self.failUnlessEqual(already, set())
428         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
429
430     @mock.patch('allmydata.util.fileutil.get_disk_stats')
431     def test_reserved_space(self, mock_get_disk_stats):
432         reserved_space=10000
433         mock_get_disk_stats.return_value = {
434             'free_for_nonroot': 15000,
435             'avail': max(15000 - reserved_space, 0),
436             }
437
438         ss = self.create("test_reserved_space", reserved_space=reserved_space)
439         # 15k available, 10k reserved, leaves 5k for shares
440
441         # a newly created and filled share incurs this much overhead, beyond
442         # the size we request.
443         OVERHEAD = 3*4
444         LEASE_SIZE = 4+32+32+4
445         canary = FakeCanary(True)
446         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
447         self.failUnlessEqual(len(writers), 3)
448         # now the StorageServer should have 3000 bytes provisionally
449         # allocated, allowing only 2000 more to be claimed
450         self.failUnlessEqual(len(ss._active_writers), 3)
451
452         # allocating 1001-byte shares only leaves room for one
453         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
454         self.failUnlessEqual(len(writers2), 1)
455         self.failUnlessEqual(len(ss._active_writers), 4)
456
457         # we abandon the first set, so their provisional allocation should be
458         # returned
459         del already
460         del writers
461         self.failUnlessEqual(len(ss._active_writers), 1)
462         # now we have a provisional allocation of 1001 bytes
463
464         # and we close the second set, so their provisional allocation should
465         # become real, long-term allocation, and grows to include the
466         # overhead.
467         for bw in writers2.values():
468             bw.remote_write(0, "a"*25)
469             bw.remote_close()
470         del already2
471         del writers2
472         del bw
473         self.failUnlessEqual(len(ss._active_writers), 0)
474
475         allocated = 1001 + OVERHEAD + LEASE_SIZE
476
477         # we have to manually increase available, since we're not doing real
478         # disk measurements
479         mock_get_disk_stats.return_value = {
480             'free_for_nonroot': 15000 - allocated,
481             'avail': max(15000 - allocated - reserved_space, 0),
482             }
483
484         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
485         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
486         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
487         self.failUnlessEqual(len(writers3), 39)
488         self.failUnlessEqual(len(ss._active_writers), 39)
489
490         del already3
491         del writers3
492         self.failUnlessEqual(len(ss._active_writers), 0)
493         ss.disownServiceParent()
494         del ss
495
496     def test_seek(self):
497         basedir = self.workdir("test_seek_behavior")
498         fileutil.make_dirs(basedir)
499         filename = os.path.join(basedir, "testfile")
500         f = open(filename, "wb")
501         f.write("start")
502         f.close()
503         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
504         # files. mode="a" preserves previous contents but does not allow
505         # seeking-to-create-holes. mode="r+" allows both.
506         f = open(filename, "rb+")
507         f.seek(100)
508         f.write("100")
509         f.close()
510         filelen = os.stat(filename)[stat.ST_SIZE]
511         self.failUnlessEqual(filelen, 100+3)
512         f2 = open(filename, "rb")
513         self.failUnlessEqual(f2.read(5), "start")
514
515
516     def test_leases(self):
517         ss = self.create("test_leases")
518         canary = FakeCanary()
519         sharenums = range(5)
520         size = 100
521
522         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
523                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
524         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
525                                                      sharenums, size, canary)
526         self.failUnlessEqual(len(already), 0)
527         self.failUnlessEqual(len(writers), 5)
528         for wb in writers.values():
529             wb.remote_close()
530
531         leases = list(ss.get_leases("si0"))
532         self.failUnlessEqual(len(leases), 1)
533         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
534
535         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
536                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
537         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
538                                                      sharenums, size, canary)
539         for wb in writers.values():
540             wb.remote_close()
541
542         # take out a second lease on si1
543         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
544                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
545         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
546                                                      sharenums, size, canary)
547         self.failUnlessEqual(len(already), 5)
548         self.failUnlessEqual(len(writers), 0)
549
550         leases = list(ss.get_leases("si1"))
551         self.failUnlessEqual(len(leases), 2)
552         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
553
554         # and a third lease, using add-lease
555         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
556                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
557         ss.remote_add_lease("si1", rs2a, cs2a)
558         leases = list(ss.get_leases("si1"))
559         self.failUnlessEqual(len(leases), 3)
560         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
561
562         # add-lease on a missing storage index is silently ignored
563         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
564
565         # check that si0 is readable
566         readers = ss.remote_get_buckets("si0")
567         self.failUnlessEqual(len(readers), 5)
568
569         # renew the first lease. Only the proper renew_secret should work
570         ss.remote_renew_lease("si0", rs0)
571         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
572         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
573
574         # check that si0 is still readable
575         readers = ss.remote_get_buckets("si0")
576         self.failUnlessEqual(len(readers), 5)
577
578         # now cancel it
579         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
580         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
581         ss.remote_cancel_lease("si0", cs0)
582
583         # si0 should now be gone
584         readers = ss.remote_get_buckets("si0")
585         self.failUnlessEqual(len(readers), 0)
586         # and the renew should no longer work
587         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
588
589
590         # cancel the first lease on si1, leaving the second and third in place
591         ss.remote_cancel_lease("si1", cs1)
592         readers = ss.remote_get_buckets("si1")
593         self.failUnlessEqual(len(readers), 5)
594         # the corresponding renew should no longer work
595         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
596
597         leases = list(ss.get_leases("si1"))
598         self.failUnlessEqual(len(leases), 2)
599         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
600
601         ss.remote_renew_lease("si1", rs2)
602         # cancelling the second and third should make it go away
603         ss.remote_cancel_lease("si1", cs2)
604         ss.remote_cancel_lease("si1", cs2a)
605         readers = ss.remote_get_buckets("si1")
606         self.failUnlessEqual(len(readers), 0)
607         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
608         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
609         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
610
611         leases = list(ss.get_leases("si1"))
612         self.failUnlessEqual(len(leases), 0)
613
614
615         # test overlapping uploads
616         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
617                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
618         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
619                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
620         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
621                                                      sharenums, size, canary)
622         self.failUnlessEqual(len(already), 0)
623         self.failUnlessEqual(len(writers), 5)
624         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
625                                                        sharenums, size, canary)
626         self.failUnlessEqual(len(already2), 0)
627         self.failUnlessEqual(len(writers2), 0)
628         for wb in writers.values():
629             wb.remote_close()
630
631         leases = list(ss.get_leases("si3"))
632         self.failUnlessEqual(len(leases), 1)
633
634         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
635                                                        sharenums, size, canary)
636         self.failUnlessEqual(len(already3), 5)
637         self.failUnlessEqual(len(writers3), 0)
638
639         leases = list(ss.get_leases("si3"))
640         self.failUnlessEqual(len(leases), 2)
641
642     def test_readonly(self):
643         workdir = self.workdir("test_readonly")
644         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
645         ss.setServiceParent(self.sparent)
646
647         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
648         self.failUnlessEqual(already, set())
649         self.failUnlessEqual(writers, {})
650
651         stats = ss.get_stats()
652         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
653         if "storage_server.disk_avail" in stats:
654             # Some platforms may not have an API to get disk stats.
655             # But if there are stats, readonly_storage means disk_avail=0
656             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
657
658     def test_discard(self):
659         # discard is really only used for other tests, but we test it anyways
660         workdir = self.workdir("test_discard")
661         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
662         ss.setServiceParent(self.sparent)
663
664         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
665         self.failUnlessEqual(already, set())
666         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
667         for i,wb in writers.items():
668             wb.remote_write(0, "%25d" % i)
669             wb.remote_close()
670         # since we discard the data, the shares should be present but sparse.
671         # Since we write with some seeks, the data we read back will be all
672         # zeros.
673         b = ss.remote_get_buckets("vid")
674         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
675         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
676
677     def test_advise_corruption(self):
678         workdir = self.workdir("test_advise_corruption")
679         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
680         ss.setServiceParent(self.sparent)
681
682         si0_s = base32.b2a("si0")
683         ss.remote_advise_corrupt_share("immutable", "si0", 0,
684                                        "This share smells funny.\n")
685         reportdir = os.path.join(workdir, "corruption-advisories")
686         reports = os.listdir(reportdir)
687         self.failUnlessEqual(len(reports), 1)
688         report_si0 = reports[0]
689         self.failUnlessIn(si0_s, report_si0)
690         f = open(os.path.join(reportdir, report_si0), "r")
691         report = f.read()
692         f.close()
693         self.failUnlessIn("type: immutable", report)
694         self.failUnlessIn("storage_index: %s" % si0_s, report)
695         self.failUnlessIn("share_number: 0", report)
696         self.failUnlessIn("This share smells funny.", report)
697
698         # test the RIBucketWriter version too
699         si1_s = base32.b2a("si1")
700         already,writers = self.allocate(ss, "si1", [1], 75)
701         self.failUnlessEqual(already, set())
702         self.failUnlessEqual(set(writers.keys()), set([1]))
703         writers[1].remote_write(0, "data")
704         writers[1].remote_close()
705
706         b = ss.remote_get_buckets("si1")
707         self.failUnlessEqual(set(b.keys()), set([1]))
708         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
709
710         reports = os.listdir(reportdir)
711         self.failUnlessEqual(len(reports), 2)
712         report_si1 = [r for r in reports if si1_s in r][0]
713         f = open(os.path.join(reportdir, report_si1), "r")
714         report = f.read()
715         f.close()
716         self.failUnlessIn("type: immutable", report)
717         self.failUnlessIn("storage_index: %s" % si1_s, report)
718         self.failUnlessIn("share_number: 1", report)
719         self.failUnlessIn("This share tastes like dust.", report)
720
721
722
723 class MutableServer(unittest.TestCase):
724
725     def setUp(self):
726         self.sparent = LoggingServiceParent()
727         self._lease_secret = itertools.count()
728     def tearDown(self):
729         return self.sparent.stopService()
730
731     def workdir(self, name):
732         basedir = os.path.join("storage", "MutableServer", name)
733         return basedir
734
735     def create(self, name):
736         workdir = self.workdir(name)
737         ss = StorageServer(workdir, "\x00" * 20)
738         ss.setServiceParent(self.sparent)
739         return ss
740
741     def test_create(self):
742         self.create("test_create")
743
744     def write_enabler(self, we_tag):
745         return hashutil.tagged_hash("we_blah", we_tag)
746
747     def renew_secret(self, tag):
748         return hashutil.tagged_hash("renew_blah", str(tag))
749
750     def cancel_secret(self, tag):
751         return hashutil.tagged_hash("cancel_blah", str(tag))
752
753     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
754         write_enabler = self.write_enabler(we_tag)
755         renew_secret = self.renew_secret(lease_tag)
756         cancel_secret = self.cancel_secret(lease_tag)
757         rstaraw = ss.remote_slot_testv_and_readv_and_writev
758         testandwritev = dict( [ (shnum, ([], [], None) )
759                          for shnum in sharenums ] )
760         readv = []
761         rc = rstaraw(storage_index,
762                      (write_enabler, renew_secret, cancel_secret),
763                      testandwritev,
764                      readv)
765         (did_write, readv_data) = rc
766         self.failUnless(did_write)
767         self.failUnless(isinstance(readv_data, dict))
768         self.failUnlessEqual(len(readv_data), 0)
769
770     def test_bad_magic(self):
771         ss = self.create("test_bad_magic")
772         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
773         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
774         f = open(fn, "rb+")
775         f.seek(0)
776         f.write("BAD MAGIC")
777         f.close()
778         read = ss.remote_slot_readv
779         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
780                                   read, "si1", [0], [(0,10)])
781         self.failUnlessIn(" had magic ", str(e))
782         self.failUnlessIn(" but we wanted ", str(e))
783
784     def test_container_size(self):
785         ss = self.create("test_container_size")
786         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
787                       set([0,1,2]), 100)
788         read = ss.remote_slot_readv
789         rstaraw = ss.remote_slot_testv_and_readv_and_writev
790         secrets = ( self.write_enabler("we1"),
791                     self.renew_secret("we1"),
792                     self.cancel_secret("we1") )
793         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
794         answer = rstaraw("si1", secrets,
795                          {0: ([], [(0,data)], len(data)+12)},
796                          [])
797         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
798
799         # trying to make the container too large will raise an exception
800         TOOBIG = MutableShareFile.MAX_SIZE + 10
801         self.failUnlessRaises(DataTooLargeError,
802                               rstaraw, "si1", secrets,
803                               {0: ([], [(0,data)], TOOBIG)},
804                               [])
805
806         # it should be possible to make the container smaller, although at
807         # the moment this doesn't actually affect the share, unless the
808         # container size is dropped to zero, in which case the share is
809         # deleted.
810         answer = rstaraw("si1", secrets,
811                          {0: ([], [(0,data)], len(data)+8)},
812                          [])
813         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
814
815         answer = rstaraw("si1", secrets,
816                          {0: ([], [(0,data)], 0)},
817                          [])
818         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
819
820         read_answer = read("si1", [0], [(0,10)])
821         self.failUnlessEqual(read_answer, {})
822
823     def test_allocate(self):
824         ss = self.create("test_allocate")
825         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
826                       set([0,1,2]), 100)
827
828         read = ss.remote_slot_readv
829         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
830                              {0: [""]})
831         self.failUnlessEqual(read("si1", [], [(0, 10)]),
832                              {0: [""], 1: [""], 2: [""]})
833         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
834                              {0: [""]})
835
836         # try writing to one
837         secrets = ( self.write_enabler("we1"),
838                     self.renew_secret("we1"),
839                     self.cancel_secret("we1") )
840         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
841         write = ss.remote_slot_testv_and_readv_and_writev
842         answer = write("si1", secrets,
843                        {0: ([], [(0,data)], None)},
844                        [])
845         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
846
847         self.failUnlessEqual(read("si1", [0], [(0,20)]),
848                              {0: ["00000000001111111111"]})
849         self.failUnlessEqual(read("si1", [0], [(95,10)]),
850                              {0: ["99999"]})
851         #self.failUnlessEqual(s0.remote_get_length(), 100)
852
853         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
854         f = self.failUnlessRaises(BadWriteEnablerError,
855                                   write, "si1", bad_secrets,
856                                   {}, [])
857         self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
858
859         # this testv should fail
860         answer = write("si1", secrets,
861                        {0: ([(0, 12, "eq", "444444444444"),
862                              (20, 5, "eq", "22222"),
863                              ],
864                             [(0, "x"*100)],
865                             None),
866                         },
867                        [(0,12), (20,5)],
868                        )
869         self.failUnlessEqual(answer, (False,
870                                       {0: ["000000000011", "22222"],
871                                        1: ["", ""],
872                                        2: ["", ""],
873                                        }))
874         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
875
876         # as should this one
877         answer = write("si1", secrets,
878                        {0: ([(10, 5, "lt", "11111"),
879                              ],
880                             [(0, "x"*100)],
881                             None),
882                         },
883                        [(10,5)],
884                        )
885         self.failUnlessEqual(answer, (False,
886                                       {0: ["11111"],
887                                        1: [""],
888                                        2: [""]},
889                                       ))
890         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
891
892
893     def test_operators(self):
894         # test operators, the data we're comparing is '11111' in all cases.
895         # test both fail+pass, reset data after each one.
896         ss = self.create("test_operators")
897
898         secrets = ( self.write_enabler("we1"),
899                     self.renew_secret("we1"),
900                     self.cancel_secret("we1") )
901         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
902         write = ss.remote_slot_testv_and_readv_and_writev
903         read = ss.remote_slot_readv
904
905         def reset():
906             write("si1", secrets,
907                   {0: ([], [(0,data)], None)},
908                   [])
909
910         reset()
911
912         #  lt
913         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
914                                              ],
915                                             [(0, "x"*100)],
916                                             None,
917                                             )}, [(10,5)])
918         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
919         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
920         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
921         reset()
922
923         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
924                                              ],
925                                             [(0, "x"*100)],
926                                             None,
927                                             )}, [(10,5)])
928         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
929         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
930         reset()
931
932         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
933                                              ],
934                                             [(0, "y"*100)],
935                                             None,
936                                             )}, [(10,5)])
937         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
938         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
939         reset()
940
941         #  le
942         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
943                                              ],
944                                             [(0, "x"*100)],
945                                             None,
946                                             )}, [(10,5)])
947         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
948         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
949         reset()
950
951         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
952                                              ],
953                                             [(0, "y"*100)],
954                                             None,
955                                             )}, [(10,5)])
956         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
957         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
958         reset()
959
960         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
961                                              ],
962                                             [(0, "y"*100)],
963                                             None,
964                                             )}, [(10,5)])
965         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
966         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
967         reset()
968
969         #  eq
970         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
971                                              ],
972                                             [(0, "x"*100)],
973                                             None,
974                                             )}, [(10,5)])
975         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
976         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
977         reset()
978
979         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
980                                              ],
981                                             [(0, "y"*100)],
982                                             None,
983                                             )}, [(10,5)])
984         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
985         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
986         reset()
987
988         #  ne
989         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
990                                              ],
991                                             [(0, "x"*100)],
992                                             None,
993                                             )}, [(10,5)])
994         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
995         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
996         reset()
997
998         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
999                                              ],
1000                                             [(0, "y"*100)],
1001                                             None,
1002                                             )}, [(10,5)])
1003         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1004         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1005         reset()
1006
1007         #  ge
1008         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1009                                              ],
1010                                             [(0, "y"*100)],
1011                                             None,
1012                                             )}, [(10,5)])
1013         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1014         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1015         reset()
1016
1017         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1018                                              ],
1019                                             [(0, "y"*100)],
1020                                             None,
1021                                             )}, [(10,5)])
1022         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1023         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1024         reset()
1025
1026         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1027                                              ],
1028                                             [(0, "y"*100)],
1029                                             None,
1030                                             )}, [(10,5)])
1031         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1032         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1033         reset()
1034
1035         #  gt
1036         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1037                                              ],
1038                                             [(0, "y"*100)],
1039                                             None,
1040                                             )}, [(10,5)])
1041         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1042         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1043         reset()
1044
1045         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1046                                              ],
1047                                             [(0, "x"*100)],
1048                                             None,
1049                                             )}, [(10,5)])
1050         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1051         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1052         reset()
1053
1054         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1055                                              ],
1056                                             [(0, "x"*100)],
1057                                             None,
1058                                             )}, [(10,5)])
1059         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1060         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1061         reset()
1062
1063         # finally, test some operators against empty shares
1064         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1065                                              ],
1066                                             [(0, "x"*100)],
1067                                             None,
1068                                             )}, [(10,5)])
1069         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1070         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1071         reset()
1072
1073     def test_readv(self):
1074         ss = self.create("test_readv")
1075         secrets = ( self.write_enabler("we1"),
1076                     self.renew_secret("we1"),
1077                     self.cancel_secret("we1") )
1078         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1079         write = ss.remote_slot_testv_and_readv_and_writev
1080         read = ss.remote_slot_readv
1081         data = [("%d" % i) * 100 for i in range(3)]
1082         rc = write("si1", secrets,
1083                    {0: ([], [(0,data[0])], None),
1084                     1: ([], [(0,data[1])], None),
1085                     2: ([], [(0,data[2])], None),
1086                     }, [])
1087         self.failUnlessEqual(rc, (True, {}))
1088
1089         answer = read("si1", [], [(0, 10)])
1090         self.failUnlessEqual(answer, {0: ["0"*10],
1091                                       1: ["1"*10],
1092                                       2: ["2"*10]})
1093
1094     def compare_leases_without_timestamps(self, leases_a, leases_b):
1095         self.failUnlessEqual(len(leases_a), len(leases_b))
1096         for i in range(len(leases_a)):
1097             a = leases_a[i]
1098             b = leases_b[i]
1099             self.failUnlessEqual(a.owner_num,       b.owner_num)
1100             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1101             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1102             self.failUnlessEqual(a.nodeid,          b.nodeid)
1103
1104     def compare_leases(self, leases_a, leases_b):
1105         self.failUnlessEqual(len(leases_a), len(leases_b))
1106         for i in range(len(leases_a)):
1107             a = leases_a[i]
1108             b = leases_b[i]
1109             self.failUnlessEqual(a.owner_num,       b.owner_num)
1110             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1111             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1112             self.failUnlessEqual(a.nodeid,          b.nodeid)
1113             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1114
1115     def test_leases(self):
1116         ss = self.create("test_leases")
1117         def secrets(n):
1118             return ( self.write_enabler("we1"),
1119                      self.renew_secret("we1-%d" % n),
1120                      self.cancel_secret("we1-%d" % n) )
1121         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1122         write = ss.remote_slot_testv_and_readv_and_writev
1123         read = ss.remote_slot_readv
1124         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1125         self.failUnlessEqual(rc, (True, {}))
1126
1127         # create a random non-numeric file in the bucket directory, to
1128         # exercise the code that's supposed to ignore those.
1129         bucket_dir = os.path.join(self.workdir("test_leases"),
1130                                   "shares", storage_index_to_dir("si1"))
1131         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1132         f.write("you ought to be ignoring me\n")
1133         f.close()
1134
1135         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1136         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1137
1138         # add-lease on a missing storage index is silently ignored
1139         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1140
1141         # re-allocate the slots and use the same secrets, that should update
1142         # the lease
1143         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1144         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1145
1146         # renew it directly
1147         ss.remote_renew_lease("si1", secrets(0)[1])
1148         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1149
1150         # now allocate them with a bunch of different secrets, to trigger the
1151         # extended lease code. Use add_lease for one of them.
1152         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1153         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1154         secrets2 = secrets(2)
1155         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1156         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1157         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1158         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1159         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1160
1161         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1162
1163         # cancel one of them
1164         ss.remote_cancel_lease("si1", secrets(5)[2])
1165         self.failUnlessEqual(len(list(s0.get_leases())), 5)
1166
1167         all_leases = list(s0.get_leases())
1168         # and write enough data to expand the container, forcing the server
1169         # to move the leases
1170         write("si1", secrets(0),
1171               {0: ([], [(0,data)], 200), },
1172               [])
1173
1174         # read back the leases, make sure they're still intact.
1175         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1176
1177         ss.remote_renew_lease("si1", secrets(0)[1])
1178         ss.remote_renew_lease("si1", secrets(1)[1])
1179         ss.remote_renew_lease("si1", secrets(2)[1])
1180         ss.remote_renew_lease("si1", secrets(3)[1])
1181         ss.remote_renew_lease("si1", secrets(4)[1])
1182         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1183         # get a new copy of the leases, with the current timestamps. Reading
1184         # data and failing to renew/cancel leases should leave the timestamps
1185         # alone.
1186         all_leases = list(s0.get_leases())
1187         # renewing with a bogus token should prompt an error message
1188
1189         # examine the exception thus raised, make sure the old nodeid is
1190         # present, to provide for share migration
1191         e = self.failUnlessRaises(IndexError,
1192                                   ss.remote_renew_lease, "si1",
1193                                   secrets(20)[1])
1194         e_s = str(e)
1195         self.failUnlessIn("Unable to renew non-existent lease", e_s)
1196         self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1197         self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1198
1199         # same for cancelling
1200         self.failUnlessRaises(IndexError,
1201                               ss.remote_cancel_lease, "si1",
1202                               secrets(20)[2])
1203         self.compare_leases(all_leases, list(s0.get_leases()))
1204
1205         # reading shares should not modify the timestamp
1206         read("si1", [], [(0,200)])
1207         self.compare_leases(all_leases, list(s0.get_leases()))
1208
1209         write("si1", secrets(0),
1210               {0: ([], [(200, "make me bigger")], None)}, [])
1211         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1212
1213         write("si1", secrets(0),
1214               {0: ([], [(500, "make me really bigger")], None)}, [])
1215         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1216
1217         # now cancel them all
1218         ss.remote_cancel_lease("si1", secrets(0)[2])
1219         ss.remote_cancel_lease("si1", secrets(1)[2])
1220         ss.remote_cancel_lease("si1", secrets(2)[2])
1221         ss.remote_cancel_lease("si1", secrets(3)[2])
1222
1223         # the slot should still be there
1224         remaining_shares = read("si1", [], [(0,10)])
1225         self.failUnlessEqual(len(remaining_shares), 1)
1226         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1227
1228         # cancelling a non-existent lease should raise an IndexError
1229         self.failUnlessRaises(IndexError,
1230                               ss.remote_cancel_lease, "si1", "nonsecret")
1231
1232         # and the slot should still be there
1233         remaining_shares = read("si1", [], [(0,10)])
1234         self.failUnlessEqual(len(remaining_shares), 1)
1235         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1236
1237         ss.remote_cancel_lease("si1", secrets(4)[2])
1238         # now the slot should be gone
1239         no_shares = read("si1", [], [(0,10)])
1240         self.failUnlessEqual(no_shares, {})
1241
1242         # cancelling a lease on a non-existent share should raise an IndexError
1243         self.failUnlessRaises(IndexError,
1244                               ss.remote_cancel_lease, "si2", "nonsecret")
1245
1246     def test_remove(self):
1247         ss = self.create("test_remove")
1248         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1249                       set([0,1,2]), 100)
1250         readv = ss.remote_slot_readv
1251         writev = ss.remote_slot_testv_and_readv_and_writev
1252         secrets = ( self.write_enabler("we1"),
1253                     self.renew_secret("we1"),
1254                     self.cancel_secret("we1") )
1255         # delete sh0 by setting its size to zero
1256         answer = writev("si1", secrets,
1257                         {0: ([], [], 0)},
1258                         [])
1259         # the answer should mention all the shares that existed before the
1260         # write
1261         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1262         # but a new read should show only sh1 and sh2
1263         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1264                              {1: [""], 2: [""]})
1265
1266         # delete sh1 by setting its size to zero
1267         answer = writev("si1", secrets,
1268                         {1: ([], [], 0)},
1269                         [])
1270         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1271         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1272                              {2: [""]})
1273
1274         # delete sh2 by setting its size to zero
1275         answer = writev("si1", secrets,
1276                         {2: ([], [], 0)},
1277                         [])
1278         self.failUnlessEqual(answer, (True, {2:[]}) )
1279         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1280                              {})
1281         # and the bucket directory should now be gone
1282         si = base32.b2a("si1")
1283         # note: this is a detail of the storage server implementation, and
1284         # may change in the future
1285         prefix = si[:2]
1286         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1287         bucketdir = os.path.join(prefixdir, si)
1288         self.failUnless(os.path.exists(prefixdir), prefixdir)
1289         self.failIf(os.path.exists(bucketdir), bucketdir)
1290
1291 class Stats(unittest.TestCase):
1292
1293     def setUp(self):
1294         self.sparent = LoggingServiceParent()
1295         self._lease_secret = itertools.count()
1296     def tearDown(self):
1297         return self.sparent.stopService()
1298
1299     def workdir(self, name):
1300         basedir = os.path.join("storage", "Server", name)
1301         return basedir
1302
1303     def create(self, name):
1304         workdir = self.workdir(name)
1305         ss = StorageServer(workdir, "\x00" * 20)
1306         ss.setServiceParent(self.sparent)
1307         return ss
1308
1309     def test_latencies(self):
1310         ss = self.create("test_latencies")
1311         for i in range(10000):
1312             ss.add_latency("allocate", 1.0 * i)
1313         for i in range(1000):
1314             ss.add_latency("renew", 1.0 * i)
1315         for i in range(20):
1316             ss.add_latency("write", 1.0 * i)
1317         for i in range(10):
1318             ss.add_latency("cancel", 2.0 * i)
1319         ss.add_latency("get", 5.0)
1320
1321         output = ss.get_latencies()
1322
1323         self.failUnlessEqual(sorted(output.keys()),
1324                              sorted(["allocate", "renew", "cancel", "write", "get"]))
1325         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1326         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
1327         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
1328         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
1329         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
1330         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
1331         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
1332         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
1333         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
1334
1335         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1336         self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
1337         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1, output)
1338         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
1339         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
1340         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
1341         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
1342         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
1343         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
1344
1345         self.failUnlessEqual(len(ss.latencies["write"]), 20)
1346         self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
1347         self.failUnless(output["write"]["01_0_percentile"] is None, output)
1348         self.failUnless(abs(output["write"]["10_0_percentile"] -  2) < 1, output)
1349         self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
1350         self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
1351         self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
1352         self.failUnless(output["write"]["99_0_percentile"] is None, output)
1353         self.failUnless(output["write"]["99_9_percentile"] is None, output)
1354
1355         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1356         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
1357         self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
1358         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1, output)
1359         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
1360         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
1361         self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
1362         self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
1363         self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
1364
1365         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1366         self.failUnless(output["get"]["mean"] is None, output)
1367         self.failUnless(output["get"]["01_0_percentile"] is None, output)
1368         self.failUnless(output["get"]["10_0_percentile"] is None, output)
1369         self.failUnless(output["get"]["50_0_percentile"] is None, output)
1370         self.failUnless(output["get"]["90_0_percentile"] is None, output)
1371         self.failUnless(output["get"]["95_0_percentile"] is None, output)
1372         self.failUnless(output["get"]["99_0_percentile"] is None, output)
1373         self.failUnless(output["get"]["99_9_percentile"] is None, output)
1374
1375 def remove_tags(s):
1376     s = re.sub(r'<[^>]*>', ' ', s)
1377     s = re.sub(r'\s+', ' ', s)
1378     return s
1379
1380 class MyBucketCountingCrawler(BucketCountingCrawler):
1381     def finished_prefix(self, cycle, prefix):
1382         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1383         if self.hook_ds:
1384             d = self.hook_ds.pop(0)
1385             d.callback(None)
1386
1387 class MyStorageServer(StorageServer):
1388     def add_bucket_counter(self):
1389         statefile = os.path.join(self.storedir, "bucket_counter.state")
1390         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1391         self.bucket_counter.setServiceParent(self)
1392
1393 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1394
1395     def setUp(self):
1396         self.s = service.MultiService()
1397         self.s.startService()
1398     def tearDown(self):
1399         return self.s.stopService()
1400
1401     def test_bucket_counter(self):
1402         basedir = "storage/BucketCounter/bucket_counter"
1403         fileutil.make_dirs(basedir)
1404         ss = StorageServer(basedir, "\x00" * 20)
1405         # to make sure we capture the bucket-counting-crawler in the middle
1406         # of a cycle, we reach in and reduce its maximum slice time to 0. We
1407         # also make it start sooner than usual.
1408         ss.bucket_counter.slow_start = 0
1409         orig_cpu_slice = ss.bucket_counter.cpu_slice
1410         ss.bucket_counter.cpu_slice = 0
1411         ss.setServiceParent(self.s)
1412
1413         w = StorageStatus(ss)
1414
1415         # this sample is before the crawler has started doing anything
1416         html = w.renderSynchronously()
1417         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
1418         s = remove_tags(html)
1419         self.failUnlessIn("Accepting new shares: Yes", s)
1420         self.failUnlessIn("Reserved space: - 0 B (0)", s)
1421         self.failUnlessIn("Total buckets: Not computed yet", s)
1422         self.failUnlessIn("Next crawl in", s)
1423
1424         # give the bucket-counting-crawler one tick to get started. The
1425         # cpu_slice=0 will force it to yield right after it processes the
1426         # first prefix
1427
1428         d = fireEventually()
1429         def _check(ignored):
1430             # are we really right after the first prefix?
1431             state = ss.bucket_counter.get_state()
1432             if state["last-complete-prefix"] is None:
1433                 d2 = fireEventually()
1434                 d2.addCallback(_check)
1435                 return d2
1436             self.failUnlessEqual(state["last-complete-prefix"],
1437                                  ss.bucket_counter.prefixes[0])
1438             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1439             html = w.renderSynchronously()
1440             s = remove_tags(html)
1441             self.failUnlessIn(" Current crawl ", s)
1442             self.failUnlessIn(" (next work in ", s)
1443         d.addCallback(_check)
1444
1445         # now give it enough time to complete a full cycle
1446         def _watch():
1447             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1448         d.addCallback(lambda ignored: self.poll(_watch))
1449         def _check2(ignored):
1450             ss.bucket_counter.cpu_slice = orig_cpu_slice
1451             html = w.renderSynchronously()
1452             s = remove_tags(html)
1453             self.failUnlessIn("Total buckets: 0 (the number of", s)
1454             self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
1455         d.addCallback(_check2)
1456         return d
1457
1458     def test_bucket_counter_cleanup(self):
1459         basedir = "storage/BucketCounter/bucket_counter_cleanup"
1460         fileutil.make_dirs(basedir)
1461         ss = StorageServer(basedir, "\x00" * 20)
1462         # to make sure we capture the bucket-counting-crawler in the middle
1463         # of a cycle, we reach in and reduce its maximum slice time to 0.
1464         ss.bucket_counter.slow_start = 0
1465         orig_cpu_slice = ss.bucket_counter.cpu_slice
1466         ss.bucket_counter.cpu_slice = 0
1467         ss.setServiceParent(self.s)
1468
1469         d = fireEventually()
1470
1471         def _after_first_prefix(ignored):
1472             state = ss.bucket_counter.state
1473             if state["last-complete-prefix"] is None:
1474                 d2 = fireEventually()
1475                 d2.addCallback(_after_first_prefix)
1476                 return d2
1477             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1478             # now sneak in and mess with its state, to make sure it cleans up
1479             # properly at the end of the cycle
1480             self.failUnlessEqual(state["last-complete-prefix"],
1481                                  ss.bucket_counter.prefixes[0])
1482             state["bucket-counts"][-12] = {}
1483             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1484             ss.bucket_counter.save_state()
1485         d.addCallback(_after_first_prefix)
1486
1487         # now give it enough time to complete a cycle
1488         def _watch():
1489             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1490         d.addCallback(lambda ignored: self.poll(_watch))
1491         def _check2(ignored):
1492             ss.bucket_counter.cpu_slice = orig_cpu_slice
1493             s = ss.bucket_counter.get_state()
1494             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1495             self.failIf("bogusprefix!" in s["storage-index-samples"],
1496                         s["storage-index-samples"].keys())
1497         d.addCallback(_check2)
1498         return d
1499
1500     def test_bucket_counter_eta(self):
1501         basedir = "storage/BucketCounter/bucket_counter_eta"
1502         fileutil.make_dirs(basedir)
1503         ss = MyStorageServer(basedir, "\x00" * 20)
1504         ss.bucket_counter.slow_start = 0
1505         # these will be fired inside finished_prefix()
1506         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1507         w = StorageStatus(ss)
1508
1509         d = defer.Deferred()
1510
1511         def _check_1(ignored):
1512             # no ETA is available yet
1513             html = w.renderSynchronously()
1514             s = remove_tags(html)
1515             self.failUnlessIn("complete (next work", s)
1516
1517         def _check_2(ignored):
1518             # one prefix has finished, so an ETA based upon that elapsed time
1519             # should be available.
1520             html = w.renderSynchronously()
1521             s = remove_tags(html)
1522             self.failUnlessIn("complete (ETA ", s)
1523
1524         def _check_3(ignored):
1525             # two prefixes have finished
1526             html = w.renderSynchronously()
1527             s = remove_tags(html)
1528             self.failUnlessIn("complete (ETA ", s)
1529             d.callback("done")
1530
1531         hooks[0].addCallback(_check_1).addErrback(d.errback)
1532         hooks[1].addCallback(_check_2).addErrback(d.errback)
1533         hooks[2].addCallback(_check_3).addErrback(d.errback)
1534
1535         ss.setServiceParent(self.s)
1536         return d
1537
1538 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1539     stop_after_first_bucket = False
1540     def process_bucket(self, *args, **kwargs):
1541         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1542         if self.stop_after_first_bucket:
1543             self.stop_after_first_bucket = False
1544             self.cpu_slice = -1.0
1545     def yielding(self, sleep_time):
1546         if not self.stop_after_first_bucket:
1547             self.cpu_slice = 500
1548
1549 class BrokenStatResults:
1550     pass
1551 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1552     def stat(self, fn):
1553         s = os.stat(fn)
1554         bsr = BrokenStatResults()
1555         for attrname in dir(s):
1556             if attrname.startswith("_"):
1557                 continue
1558             if attrname == "st_blocks":
1559                 continue
1560             setattr(bsr, attrname, getattr(s, attrname))
1561         return bsr
1562
1563 class InstrumentedStorageServer(StorageServer):
1564     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1565 class No_ST_BLOCKS_StorageServer(StorageServer):
1566     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1567
1568 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1569
1570     def setUp(self):
1571         self.s = service.MultiService()
1572         self.s.startService()
1573     def tearDown(self):
1574         return self.s.stopService()
1575
1576     def make_shares(self, ss):
1577         def make(si):
1578             return (si, hashutil.tagged_hash("renew", si),
1579                     hashutil.tagged_hash("cancel", si))
1580         def make_mutable(si):
1581             return (si, hashutil.tagged_hash("renew", si),
1582                     hashutil.tagged_hash("cancel", si),
1583                     hashutil.tagged_hash("write-enabler", si))
1584         def make_extra_lease(si, num):
1585             return (hashutil.tagged_hash("renew-%d" % num, si),
1586                     hashutil.tagged_hash("cancel-%d" % num, si))
1587
1588         immutable_si_0, rs0, cs0 = make("\x00" * 16)
1589         immutable_si_1, rs1, cs1 = make("\x01" * 16)
1590         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1591         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1592         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1593         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1594         sharenums = [0]
1595         canary = FakeCanary()
1596         # note: 'tahoe debug dump-share' will not handle this file, since the
1597         # inner contents are not a valid CHK share
1598         data = "\xff" * 1000
1599
1600         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1601                                          1000, canary)
1602         w[0].remote_write(0, data)
1603         w[0].remote_close()
1604
1605         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1606                                          1000, canary)
1607         w[0].remote_write(0, data)
1608         w[0].remote_close()
1609         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1610
1611         writev = ss.remote_slot_testv_and_readv_and_writev
1612         writev(mutable_si_2, (we2, rs2, cs2),
1613                {0: ([], [(0,data)], len(data))}, [])
1614         writev(mutable_si_3, (we3, rs3, cs3),
1615                {0: ([], [(0,data)], len(data))}, [])
1616         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1617
1618         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1619         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1620         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1621
1622     def test_basic(self):
1623         basedir = "storage/LeaseCrawler/basic"
1624         fileutil.make_dirs(basedir)
1625         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1626         # make it start sooner than usual.
1627         lc = ss.lease_checker
1628         lc.slow_start = 0
1629         lc.cpu_slice = 500
1630         lc.stop_after_first_bucket = True
1631         webstatus = StorageStatus(ss)
1632
1633         # create a few shares, with some leases on them
1634         self.make_shares(ss)
1635         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1636
1637         # add a non-sharefile to exercise another code path
1638         fn = os.path.join(ss.sharedir,
1639                           storage_index_to_dir(immutable_si_0),
1640                           "not-a-share")
1641         f = open(fn, "wb")
1642         f.write("I am not a share.\n")
1643         f.close()
1644
1645         # this is before the crawl has started, so we're not in a cycle yet
1646         initial_state = lc.get_state()
1647         self.failIf(lc.get_progress()["cycle-in-progress"])
1648         self.failIfIn("cycle-to-date", initial_state)
1649         self.failIfIn("estimated-remaining-cycle", initial_state)
1650         self.failIfIn("estimated-current-cycle", initial_state)
1651         self.failUnlessIn("history", initial_state)
1652         self.failUnlessEqual(initial_state["history"], {})
1653
1654         ss.setServiceParent(self.s)
1655
1656         DAY = 24*60*60
1657
1658         d = fireEventually()
1659
1660         # now examine the state right after the first bucket has been
1661         # processed.
1662         def _after_first_bucket(ignored):
1663             initial_state = lc.get_state()
1664             if "cycle-to-date" not in initial_state:
1665                 d2 = fireEventually()
1666                 d2.addCallback(_after_first_bucket)
1667                 return d2
1668             self.failUnlessIn("cycle-to-date", initial_state)
1669             self.failUnlessIn("estimated-remaining-cycle", initial_state)
1670             self.failUnlessIn("estimated-current-cycle", initial_state)
1671             self.failUnlessIn("history", initial_state)
1672             self.failUnlessEqual(initial_state["history"], {})
1673
1674             so_far = initial_state["cycle-to-date"]
1675             self.failUnlessEqual(so_far["expiration-enabled"], False)
1676             self.failUnlessIn("configured-expiration-mode", so_far)
1677             self.failUnlessIn("lease-age-histogram", so_far)
1678             lah = so_far["lease-age-histogram"]
1679             self.failUnlessEqual(type(lah), list)
1680             self.failUnlessEqual(len(lah), 1)
1681             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1682             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1683             self.failUnlessEqual(so_far["corrupt-shares"], [])
1684             sr1 = so_far["space-recovered"]
1685             self.failUnlessEqual(sr1["examined-buckets"], 1)
1686             self.failUnlessEqual(sr1["examined-shares"], 1)
1687             self.failUnlessEqual(sr1["actual-shares"], 0)
1688             self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1689             self.failUnlessEqual(sr1["original-sharebytes"], 0)
1690             left = initial_state["estimated-remaining-cycle"]
1691             sr2 = left["space-recovered"]
1692             self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1693             self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1694             self.failIfEqual(sr2["actual-shares"], None)
1695             self.failIfEqual(sr2["configured-diskbytes"], None)
1696             self.failIfEqual(sr2["original-sharebytes"], None)
1697         d.addCallback(_after_first_bucket)
1698         d.addCallback(lambda ign: self.render1(webstatus))
1699         def _check_html_in_cycle(html):
1700             s = remove_tags(html)
1701             self.failUnlessIn("So far, this cycle has examined "
1702                               "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1703             self.failUnlessIn("and has recovered: "
1704                               "0 shares, 0 buckets (0 mutable / 0 immutable), "
1705                               "0 B (0 B / 0 B)", s)
1706             self.failUnlessIn("If expiration were enabled, "
1707                               "we would have recovered: "
1708                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1709                               " 0 B (0 B / 0 B) by now", s)
1710             self.failUnlessIn("and the remainder of this cycle "
1711                               "would probably recover: "
1712                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1713                               " 0 B (0 B / 0 B)", s)
1714             self.failUnlessIn("and the whole cycle would probably recover: "
1715                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1716                               " 0 B (0 B / 0 B)", s)
1717             self.failUnlessIn("if we were strictly using each lease's default "
1718                               "31-day lease lifetime", s)
1719             self.failUnlessIn("this cycle would be expected to recover: ", s)
1720         d.addCallback(_check_html_in_cycle)
1721
1722         # wait for the crawler to finish the first cycle. Nothing should have
1723         # been removed.
1724         def _wait():
1725             return bool(lc.get_state()["last-cycle-finished"] is not None)
1726         d.addCallback(lambda ign: self.poll(_wait))
1727
1728         def _after_first_cycle(ignored):
1729             s = lc.get_state()
1730             self.failIf("cycle-to-date" in s)
1731             self.failIf("estimated-remaining-cycle" in s)
1732             self.failIf("estimated-current-cycle" in s)
1733             last = s["history"][0]
1734             self.failUnlessIn("cycle-start-finish-times", last)
1735             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1736             self.failUnlessEqual(last["expiration-enabled"], False)
1737             self.failUnlessIn("configured-expiration-mode", last)
1738
1739             self.failUnlessIn("lease-age-histogram", last)
1740             lah = last["lease-age-histogram"]
1741             self.failUnlessEqual(type(lah), list)
1742             self.failUnlessEqual(len(lah), 1)
1743             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1744
1745             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1746             self.failUnlessEqual(last["corrupt-shares"], [])
1747
1748             rec = last["space-recovered"]
1749             self.failUnlessEqual(rec["examined-buckets"], 4)
1750             self.failUnlessEqual(rec["examined-shares"], 4)
1751             self.failUnlessEqual(rec["actual-buckets"], 0)
1752             self.failUnlessEqual(rec["original-buckets"], 0)
1753             self.failUnlessEqual(rec["configured-buckets"], 0)
1754             self.failUnlessEqual(rec["actual-shares"], 0)
1755             self.failUnlessEqual(rec["original-shares"], 0)
1756             self.failUnlessEqual(rec["configured-shares"], 0)
1757             self.failUnlessEqual(rec["actual-diskbytes"], 0)
1758             self.failUnlessEqual(rec["original-diskbytes"], 0)
1759             self.failUnlessEqual(rec["configured-diskbytes"], 0)
1760             self.failUnlessEqual(rec["actual-sharebytes"], 0)
1761             self.failUnlessEqual(rec["original-sharebytes"], 0)
1762             self.failUnlessEqual(rec["configured-sharebytes"], 0)
1763
1764             def _get_sharefile(si):
1765                 return list(ss._iter_share_files(si))[0]
1766             def count_leases(si):
1767                 return len(list(_get_sharefile(si).get_leases()))
1768             self.failUnlessEqual(count_leases(immutable_si_0), 1)
1769             self.failUnlessEqual(count_leases(immutable_si_1), 2)
1770             self.failUnlessEqual(count_leases(mutable_si_2), 1)
1771             self.failUnlessEqual(count_leases(mutable_si_3), 2)
1772         d.addCallback(_after_first_cycle)
1773         d.addCallback(lambda ign: self.render1(webstatus))
1774         def _check_html(html):
1775             s = remove_tags(html)
1776             self.failUnlessIn("recovered: 0 shares, 0 buckets "
1777                               "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
1778             self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
1779                               "(2 mutable / 2 immutable),", s)
1780             self.failUnlessIn("but expiration was not enabled", s)
1781         d.addCallback(_check_html)
1782         d.addCallback(lambda ign: self.render_json(webstatus))
1783         def _check_json(json):
1784             data = simplejson.loads(json)
1785             self.failUnlessIn("lease-checker", data)
1786             self.failUnlessIn("lease-checker-progress", data)
1787         d.addCallback(_check_json)
1788         return d
1789
1790     def backdate_lease(self, sf, renew_secret, new_expire_time):
1791         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1792         # "renew" a lease with a new_expire_time that is older than what the
1793         # current lease has), so we have to reach inside it.
1794         for i,lease in enumerate(sf.get_leases()):
1795             if lease.renew_secret == renew_secret:
1796                 lease.expiration_time = new_expire_time
1797                 f = open(sf.home, 'rb+')
1798                 sf._write_lease_record(f, i, lease)
1799                 f.close()
1800                 return
1801         raise IndexError("unable to renew non-existent lease")
1802
1803     def test_expire_age(self):
1804         basedir = "storage/LeaseCrawler/expire_age"
1805         fileutil.make_dirs(basedir)
1806         # setting expiration_time to 2000 means that any lease which is more
1807         # than 2000s old will be expired.
1808         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1809                                        expiration_enabled=True,
1810                                        expiration_mode="age",
1811                                        expiration_override_lease_duration=2000)
1812         # make it start sooner than usual.
1813         lc = ss.lease_checker
1814         lc.slow_start = 0
1815         lc.stop_after_first_bucket = True
1816         webstatus = StorageStatus(ss)
1817
1818         # create a few shares, with some leases on them
1819         self.make_shares(ss)
1820         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1821
1822         def count_shares(si):
1823             return len(list(ss._iter_share_files(si)))
1824         def _get_sharefile(si):
1825             return list(ss._iter_share_files(si))[0]
1826         def count_leases(si):
1827             return len(list(_get_sharefile(si).get_leases()))
1828
1829         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1830         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1831         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1832         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1833         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1834         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1835         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1836         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1837
1838         # artificially crank back the expiration time on the first lease of
1839         # each share, to make it look like it expired already (age=1000s).
1840         # Some shares have an extra lease which is set to expire at the
1841         # default time in 31 days from now (age=31days). We then run the
1842         # crawler, which will expire the first lease, making some shares get
1843         # deleted and others stay alive (with one remaining lease)
1844         now = time.time()
1845
1846         sf0 = _get_sharefile(immutable_si_0)
1847         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1848         sf0_size = os.stat(sf0.home).st_size
1849
1850         # immutable_si_1 gets an extra lease
1851         sf1 = _get_sharefile(immutable_si_1)
1852         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1853
1854         sf2 = _get_sharefile(mutable_si_2)
1855         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1856         sf2_size = os.stat(sf2.home).st_size
1857
1858         # mutable_si_3 gets an extra lease
1859         sf3 = _get_sharefile(mutable_si_3)
1860         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1861
1862         ss.setServiceParent(self.s)
1863
1864         d = fireEventually()
1865         # examine the state right after the first bucket has been processed
1866         def _after_first_bucket(ignored):
1867             p = lc.get_progress()
1868             if not p["cycle-in-progress"]:
1869                 d2 = fireEventually()
1870                 d2.addCallback(_after_first_bucket)
1871                 return d2
1872         d.addCallback(_after_first_bucket)
1873         d.addCallback(lambda ign: self.render1(webstatus))
1874         def _check_html_in_cycle(html):
1875             s = remove_tags(html)
1876             # the first bucket encountered gets deleted, and its prefix
1877             # happens to be about 1/5th of the way through the ring, so the
1878             # predictor thinks we'll have 5 shares and that we'll delete them
1879             # all. This part of the test depends upon the SIs landing right
1880             # where they do now.
1881             self.failUnlessIn("The remainder of this cycle is expected to "
1882                               "recover: 4 shares, 4 buckets", s)
1883             self.failUnlessIn("The whole cycle is expected to examine "
1884                               "5 shares in 5 buckets and to recover: "
1885                               "5 shares, 5 buckets", s)
1886         d.addCallback(_check_html_in_cycle)
1887
1888         # wait for the crawler to finish the first cycle. Two shares should
1889         # have been removed
1890         def _wait():
1891             return bool(lc.get_state()["last-cycle-finished"] is not None)
1892         d.addCallback(lambda ign: self.poll(_wait))
1893
1894         def _after_first_cycle(ignored):
1895             self.failUnlessEqual(count_shares(immutable_si_0), 0)
1896             self.failUnlessEqual(count_shares(immutable_si_1), 1)
1897             self.failUnlessEqual(count_leases(immutable_si_1), 1)
1898             self.failUnlessEqual(count_shares(mutable_si_2), 0)
1899             self.failUnlessEqual(count_shares(mutable_si_3), 1)
1900             self.failUnlessEqual(count_leases(mutable_si_3), 1)
1901
1902             s = lc.get_state()
1903             last = s["history"][0]
1904
1905             self.failUnlessEqual(last["expiration-enabled"], True)
1906             self.failUnlessEqual(last["configured-expiration-mode"],
1907                                  ("age", 2000, None, ("mutable", "immutable")))
1908             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1909
1910             rec = last["space-recovered"]
1911             self.failUnlessEqual(rec["examined-buckets"], 4)
1912             self.failUnlessEqual(rec["examined-shares"], 4)
1913             self.failUnlessEqual(rec["actual-buckets"], 2)
1914             self.failUnlessEqual(rec["original-buckets"], 2)
1915             self.failUnlessEqual(rec["configured-buckets"], 2)
1916             self.failUnlessEqual(rec["actual-shares"], 2)
1917             self.failUnlessEqual(rec["original-shares"], 2)
1918             self.failUnlessEqual(rec["configured-shares"], 2)
1919             size = sf0_size + sf2_size
1920             self.failUnlessEqual(rec["actual-sharebytes"], size)
1921             self.failUnlessEqual(rec["original-sharebytes"], size)
1922             self.failUnlessEqual(rec["configured-sharebytes"], size)
1923             # different platforms have different notions of "blocks used by
1924             # this file", so merely assert that it's a number
1925             self.failUnless(rec["actual-diskbytes"] >= 0,
1926                             rec["actual-diskbytes"])
1927             self.failUnless(rec["original-diskbytes"] >= 0,
1928                             rec["original-diskbytes"])
1929             self.failUnless(rec["configured-diskbytes"] >= 0,
1930                             rec["configured-diskbytes"])
1931         d.addCallback(_after_first_cycle)
1932         d.addCallback(lambda ign: self.render1(webstatus))
1933         def _check_html(html):
1934             s = remove_tags(html)
1935             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1936             self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1937             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1938         d.addCallback(_check_html)
1939         return d
1940
1941     def test_expire_cutoff_date(self):
1942         basedir = "storage/LeaseCrawler/expire_cutoff_date"
1943         fileutil.make_dirs(basedir)
1944         # setting cutoff-date to 2000 seconds ago means that any lease which
1945         # is more than 2000s old will be expired.
1946         now = time.time()
1947         then = int(now - 2000)
1948         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1949                                        expiration_enabled=True,
1950                                        expiration_mode="cutoff-date",
1951                                        expiration_cutoff_date=then)
1952         # make it start sooner than usual.
1953         lc = ss.lease_checker
1954         lc.slow_start = 0
1955         lc.stop_after_first_bucket = True
1956         webstatus = StorageStatus(ss)
1957
1958         # create a few shares, with some leases on them
1959         self.make_shares(ss)
1960         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1961
1962         def count_shares(si):
1963             return len(list(ss._iter_share_files(si)))
1964         def _get_sharefile(si):
1965             return list(ss._iter_share_files(si))[0]
1966         def count_leases(si):
1967             return len(list(_get_sharefile(si).get_leases()))
1968
1969         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1970         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1971         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1972         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1973         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1974         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1975         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1976         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1977
1978         # artificially crank back the expiration time on the first lease of
1979         # each share, to make it look like was renewed 3000s ago. To achieve
1980         # this, we need to set the expiration time to now-3000+31days. This
1981         # will change when the lease format is improved to contain both
1982         # create/renew time and duration.
1983         new_expiration_time = now - 3000 + 31*24*60*60
1984
1985         # Some shares have an extra lease which is set to expire at the
1986         # default time in 31 days from now (age=31days). We then run the
1987         # crawler, which will expire the first lease, making some shares get
1988         # deleted and others stay alive (with one remaining lease)
1989
1990         sf0 = _get_sharefile(immutable_si_0)
1991         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1992         sf0_size = os.stat(sf0.home).st_size
1993
1994         # immutable_si_1 gets an extra lease
1995         sf1 = _get_sharefile(immutable_si_1)
1996         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1997
1998         sf2 = _get_sharefile(mutable_si_2)
1999         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2000         sf2_size = os.stat(sf2.home).st_size
2001
2002         # mutable_si_3 gets an extra lease
2003         sf3 = _get_sharefile(mutable_si_3)
2004         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2005
2006         ss.setServiceParent(self.s)
2007
2008         d = fireEventually()
2009         # examine the state right after the first bucket has been processed
2010         def _after_first_bucket(ignored):
2011             p = lc.get_progress()
2012             if not p["cycle-in-progress"]:
2013                 d2 = fireEventually()
2014                 d2.addCallback(_after_first_bucket)
2015                 return d2
2016         d.addCallback(_after_first_bucket)
2017         d.addCallback(lambda ign: self.render1(webstatus))
2018         def _check_html_in_cycle(html):
2019             s = remove_tags(html)
2020             # the first bucket encountered gets deleted, and its prefix
2021             # happens to be about 1/5th of the way through the ring, so the
2022             # predictor thinks we'll have 5 shares and that we'll delete them
2023             # all. This part of the test depends upon the SIs landing right
2024             # where they do now.
2025             self.failUnlessIn("The remainder of this cycle is expected to "
2026                               "recover: 4 shares, 4 buckets", s)
2027             self.failUnlessIn("The whole cycle is expected to examine "
2028                               "5 shares in 5 buckets and to recover: "
2029                               "5 shares, 5 buckets", s)
2030         d.addCallback(_check_html_in_cycle)
2031
2032         # wait for the crawler to finish the first cycle. Two shares should
2033         # have been removed
2034         def _wait():
2035             return bool(lc.get_state()["last-cycle-finished"] is not None)
2036         d.addCallback(lambda ign: self.poll(_wait))
2037
2038         def _after_first_cycle(ignored):
2039             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2040             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2041             self.failUnlessEqual(count_leases(immutable_si_1), 1)
2042             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2043             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2044             self.failUnlessEqual(count_leases(mutable_si_3), 1)
2045
2046             s = lc.get_state()
2047             last = s["history"][0]
2048
2049             self.failUnlessEqual(last["expiration-enabled"], True)
2050             self.failUnlessEqual(last["configured-expiration-mode"],
2051                                  ("cutoff-date", None, then,
2052                                   ("mutable", "immutable")))
2053             self.failUnlessEqual(last["leases-per-share-histogram"],
2054                                  {1: 2, 2: 2})
2055
2056             rec = last["space-recovered"]
2057             self.failUnlessEqual(rec["examined-buckets"], 4)
2058             self.failUnlessEqual(rec["examined-shares"], 4)
2059             self.failUnlessEqual(rec["actual-buckets"], 2)
2060             self.failUnlessEqual(rec["original-buckets"], 0)
2061             self.failUnlessEqual(rec["configured-buckets"], 2)
2062             self.failUnlessEqual(rec["actual-shares"], 2)
2063             self.failUnlessEqual(rec["original-shares"], 0)
2064             self.failUnlessEqual(rec["configured-shares"], 2)
2065             size = sf0_size + sf2_size
2066             self.failUnlessEqual(rec["actual-sharebytes"], size)
2067             self.failUnlessEqual(rec["original-sharebytes"], 0)
2068             self.failUnlessEqual(rec["configured-sharebytes"], size)
2069             # different platforms have different notions of "blocks used by
2070             # this file", so merely assert that it's a number
2071             self.failUnless(rec["actual-diskbytes"] >= 0,
2072                             rec["actual-diskbytes"])
2073             self.failUnless(rec["original-diskbytes"] >= 0,
2074                             rec["original-diskbytes"])
2075             self.failUnless(rec["configured-diskbytes"] >= 0,
2076                             rec["configured-diskbytes"])
2077         d.addCallback(_after_first_cycle)
2078         d.addCallback(lambda ign: self.render1(webstatus))
2079         def _check_html(html):
2080             s = remove_tags(html)
2081             self.failUnlessIn("Expiration Enabled:"
2082                               " expired leases will be removed", s)
2083             date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2084             substr = "Leases created or last renewed before %s will be considered expired." % date
2085             self.failUnlessIn(substr, s)
2086             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2087         d.addCallback(_check_html)
2088         return d
2089
2090     def test_only_immutable(self):
2091         basedir = "storage/LeaseCrawler/only_immutable"
2092         fileutil.make_dirs(basedir)
2093         now = time.time()
2094         then = int(now - 2000)
2095         ss = StorageServer(basedir, "\x00" * 20,
2096                            expiration_enabled=True,
2097                            expiration_mode="cutoff-date",
2098                            expiration_cutoff_date=then,
2099                            expiration_sharetypes=("immutable",))
2100         lc = ss.lease_checker
2101         lc.slow_start = 0
2102         webstatus = StorageStatus(ss)
2103
2104         self.make_shares(ss)
2105         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2106         # set all leases to be expirable
2107         new_expiration_time = now - 3000 + 31*24*60*60
2108
2109         def count_shares(si):
2110             return len(list(ss._iter_share_files(si)))
2111         def _get_sharefile(si):
2112             return list(ss._iter_share_files(si))[0]
2113         def count_leases(si):
2114             return len(list(_get_sharefile(si).get_leases()))
2115
2116         sf0 = _get_sharefile(immutable_si_0)
2117         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2118         sf1 = _get_sharefile(immutable_si_1)
2119         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2120         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2121         sf2 = _get_sharefile(mutable_si_2)
2122         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2123         sf3 = _get_sharefile(mutable_si_3)
2124         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2125         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2126
2127         ss.setServiceParent(self.s)
2128         def _wait():
2129             return bool(lc.get_state()["last-cycle-finished"] is not None)
2130         d = self.poll(_wait)
2131
2132         def _after_first_cycle(ignored):
2133             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2134             self.failUnlessEqual(count_shares(immutable_si_1), 0)
2135             self.failUnlessEqual(count_shares(mutable_si_2), 1)
2136             self.failUnlessEqual(count_leases(mutable_si_2), 1)
2137             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2138             self.failUnlessEqual(count_leases(mutable_si_3), 2)
2139         d.addCallback(_after_first_cycle)
2140         d.addCallback(lambda ign: self.render1(webstatus))
2141         def _check_html(html):
2142             s = remove_tags(html)
2143             self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2144         d.addCallback(_check_html)
2145         return d
2146
2147     def test_only_mutable(self):
2148         basedir = "storage/LeaseCrawler/only_mutable"
2149         fileutil.make_dirs(basedir)
2150         now = time.time()
2151         then = int(now - 2000)
2152         ss = StorageServer(basedir, "\x00" * 20,
2153                            expiration_enabled=True,
2154                            expiration_mode="cutoff-date",
2155                            expiration_cutoff_date=then,
2156                            expiration_sharetypes=("mutable",))
2157         lc = ss.lease_checker
2158         lc.slow_start = 0
2159         webstatus = StorageStatus(ss)
2160
2161         self.make_shares(ss)
2162         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2163         # set all leases to be expirable
2164         new_expiration_time = now - 3000 + 31*24*60*60
2165
2166         def count_shares(si):
2167             return len(list(ss._iter_share_files(si)))
2168         def _get_sharefile(si):
2169             return list(ss._iter_share_files(si))[0]
2170         def count_leases(si):
2171             return len(list(_get_sharefile(si).get_leases()))
2172
2173         sf0 = _get_sharefile(immutable_si_0)
2174         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2175         sf1 = _get_sharefile(immutable_si_1)
2176         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2177         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2178         sf2 = _get_sharefile(mutable_si_2)
2179         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2180         sf3 = _get_sharefile(mutable_si_3)
2181         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2182         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2183
2184         ss.setServiceParent(self.s)
2185         def _wait():
2186             return bool(lc.get_state()["last-cycle-finished"] is not None)
2187         d = self.poll(_wait)
2188
2189         def _after_first_cycle(ignored):
2190             self.failUnlessEqual(count_shares(immutable_si_0), 1)
2191             self.failUnlessEqual(count_leases(immutable_si_0), 1)
2192             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2193             self.failUnlessEqual(count_leases(immutable_si_1), 2)
2194             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2195             self.failUnlessEqual(count_shares(mutable_si_3), 0)
2196         d.addCallback(_after_first_cycle)
2197         d.addCallback(lambda ign: self.render1(webstatus))
2198         def _check_html(html):
2199             s = remove_tags(html)
2200             self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2201         d.addCallback(_check_html)
2202         return d
2203
2204     def test_bad_mode(self):
2205         basedir = "storage/LeaseCrawler/bad_mode"
2206         fileutil.make_dirs(basedir)
2207         e = self.failUnlessRaises(ValueError,
2208                                   StorageServer, basedir, "\x00" * 20,
2209                                   expiration_mode="bogus")
2210         self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
2211
2212     def test_parse_duration(self):
2213         DAY = 24*60*60
2214         MONTH = 31*DAY
2215         YEAR = 365*DAY
2216         p = time_format.parse_duration
2217         self.failUnlessEqual(p("7days"), 7*DAY)
2218         self.failUnlessEqual(p("31day"), 31*DAY)
2219         self.failUnlessEqual(p("60 days"), 60*DAY)
2220         self.failUnlessEqual(p("2mo"), 2*MONTH)
2221         self.failUnlessEqual(p("3 month"), 3*MONTH)
2222         self.failUnlessEqual(p("2years"), 2*YEAR)
2223         e = self.failUnlessRaises(ValueError, p, "2kumquats")
2224         self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
2225
2226     def test_parse_date(self):
2227         p = time_format.parse_date
2228         self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
2229         self.failUnlessEqual(p("2009-03-18"), 1237334400)
2230
2231     def test_limited_history(self):
2232         basedir = "storage/LeaseCrawler/limited_history"
2233         fileutil.make_dirs(basedir)
2234         ss = StorageServer(basedir, "\x00" * 20)
2235         # make it start sooner than usual.
2236         lc = ss.lease_checker
2237         lc.slow_start = 0
2238         lc.cpu_slice = 500
2239
2240         # create a few shares, with some leases on them
2241         self.make_shares(ss)
2242
2243         ss.setServiceParent(self.s)
2244
2245         def _wait_until_15_cycles_done():
2246             last = lc.state["last-cycle-finished"]
2247             if last is not None and last >= 15:
2248                 return True
2249             if lc.timer:
2250                 lc.timer.reset(0)
2251             return False
2252         d = self.poll(_wait_until_15_cycles_done)
2253
2254         def _check(ignored):
2255             s = lc.get_state()
2256             h = s["history"]
2257             self.failUnlessEqual(len(h), 10)
2258             self.failUnlessEqual(max(h.keys()), 15)
2259             self.failUnlessEqual(min(h.keys()), 6)
2260         d.addCallback(_check)
2261         return d
2262
2263     def test_unpredictable_future(self):
2264         basedir = "storage/LeaseCrawler/unpredictable_future"
2265         fileutil.make_dirs(basedir)
2266         ss = StorageServer(basedir, "\x00" * 20)
2267         # make it start sooner than usual.
2268         lc = ss.lease_checker
2269         lc.slow_start = 0
2270         lc.cpu_slice = -1.0 # stop quickly
2271
2272         self.make_shares(ss)
2273
2274         ss.setServiceParent(self.s)
2275
2276         d = fireEventually()
2277         def _check(ignored):
2278             # this should fire after the first bucket is complete, but before
2279             # the first prefix is complete, so the progress-measurer won't
2280             # think we've gotten far enough to raise our percent-complete
2281             # above 0%, triggering the cannot-predict-the-future code in
2282             # expirer.py . This will have to change if/when the
2283             # progress-measurer gets smart enough to count buckets (we'll
2284             # have to interrupt it even earlier, before it's finished the
2285             # first bucket).
2286             s = lc.get_state()
2287             if "cycle-to-date" not in s:
2288                 d2 = fireEventually()
2289                 d2.addCallback(_check)
2290                 return d2
2291             self.failUnlessIn("cycle-to-date", s)
2292             self.failUnlessIn("estimated-remaining-cycle", s)
2293             self.failUnlessIn("estimated-current-cycle", s)
2294
2295             left = s["estimated-remaining-cycle"]["space-recovered"]
2296             self.failUnlessEqual(left["actual-buckets"], None)
2297             self.failUnlessEqual(left["original-buckets"], None)
2298             self.failUnlessEqual(left["configured-buckets"], None)
2299             self.failUnlessEqual(left["actual-shares"], None)
2300             self.failUnlessEqual(left["original-shares"], None)
2301             self.failUnlessEqual(left["configured-shares"], None)
2302             self.failUnlessEqual(left["actual-diskbytes"], None)
2303             self.failUnlessEqual(left["original-diskbytes"], None)
2304             self.failUnlessEqual(left["configured-diskbytes"], None)
2305             self.failUnlessEqual(left["actual-sharebytes"], None)
2306             self.failUnlessEqual(left["original-sharebytes"], None)
2307             self.failUnlessEqual(left["configured-sharebytes"], None)
2308
2309             full = s["estimated-remaining-cycle"]["space-recovered"]
2310             self.failUnlessEqual(full["actual-buckets"], None)
2311             self.failUnlessEqual(full["original-buckets"], None)
2312             self.failUnlessEqual(full["configured-buckets"], None)
2313             self.failUnlessEqual(full["actual-shares"], None)
2314             self.failUnlessEqual(full["original-shares"], None)
2315             self.failUnlessEqual(full["configured-shares"], None)
2316             self.failUnlessEqual(full["actual-diskbytes"], None)
2317             self.failUnlessEqual(full["original-diskbytes"], None)
2318             self.failUnlessEqual(full["configured-diskbytes"], None)
2319             self.failUnlessEqual(full["actual-sharebytes"], None)
2320             self.failUnlessEqual(full["original-sharebytes"], None)
2321             self.failUnlessEqual(full["configured-sharebytes"], None)
2322
2323         d.addCallback(_check)
2324         return d
2325
2326     def test_no_st_blocks(self):
2327         basedir = "storage/LeaseCrawler/no_st_blocks"
2328         fileutil.make_dirs(basedir)
2329         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2330                                         expiration_mode="age",
2331                                         expiration_override_lease_duration=-1000)
2332         # a negative expiration_time= means the "configured-"
2333         # space-recovered counts will be non-zero, since all shares will have
2334         # expired by then
2335
2336         # make it start sooner than usual.
2337         lc = ss.lease_checker
2338         lc.slow_start = 0
2339
2340         self.make_shares(ss)
2341         ss.setServiceParent(self.s)
2342         def _wait():
2343             return bool(lc.get_state()["last-cycle-finished"] is not None)
2344         d = self.poll(_wait)
2345
2346         def _check(ignored):
2347             s = lc.get_state()
2348             last = s["history"][0]
2349             rec = last["space-recovered"]
2350             self.failUnlessEqual(rec["configured-buckets"], 4)
2351             self.failUnlessEqual(rec["configured-shares"], 4)
2352             self.failUnless(rec["configured-sharebytes"] > 0,
2353                             rec["configured-sharebytes"])
2354             # without the .st_blocks field in os.stat() results, we should be
2355             # reporting diskbytes==sharebytes
2356             self.failUnlessEqual(rec["configured-sharebytes"],
2357                                  rec["configured-diskbytes"])
2358         d.addCallback(_check)
2359         return d
2360
2361     def test_share_corruption(self):
2362         self._poll_should_ignore_these_errors = [
2363             UnknownMutableContainerVersionError,
2364             UnknownImmutableContainerVersionError,
2365             ]
2366         basedir = "storage/LeaseCrawler/share_corruption"
2367         fileutil.make_dirs(basedir)
2368         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2369         w = StorageStatus(ss)
2370         # make it start sooner than usual.
2371         lc = ss.lease_checker
2372         lc.stop_after_first_bucket = True
2373         lc.slow_start = 0
2374         lc.cpu_slice = 500
2375
2376         # create a few shares, with some leases on them
2377         self.make_shares(ss)
2378
2379         # now corrupt one, and make sure the lease-checker keeps going
2380         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2381         first = min(self.sis)
2382         first_b32 = base32.b2a(first)
2383         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2384         f = open(fn, "rb+")
2385         f.seek(0)
2386         f.write("BAD MAGIC")
2387         f.close()
2388         # if get_share_file() doesn't see the correct mutable magic, it
2389         # assumes the file is an immutable share, and then
2390         # immutable.ShareFile sees a bad version. So regardless of which kind
2391         # of share we corrupted, this will trigger an
2392         # UnknownImmutableContainerVersionError.
2393
2394         # also create an empty bucket
2395         empty_si = base32.b2a("\x04"*16)
2396         empty_bucket_dir = os.path.join(ss.sharedir,
2397                                         storage_index_to_dir(empty_si))
2398         fileutil.make_dirs(empty_bucket_dir)
2399
2400         ss.setServiceParent(self.s)
2401
2402         d = fireEventually()
2403
2404         # now examine the state right after the first bucket has been
2405         # processed.
2406         def _after_first_bucket(ignored):
2407             s = lc.get_state()
2408             if "cycle-to-date" not in s:
2409                 d2 = fireEventually()
2410                 d2.addCallback(_after_first_bucket)
2411                 return d2
2412             so_far = s["cycle-to-date"]
2413             rec = so_far["space-recovered"]
2414             self.failUnlessEqual(rec["examined-buckets"], 1)
2415             self.failUnlessEqual(rec["examined-shares"], 0)
2416             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2417         d.addCallback(_after_first_bucket)
2418
2419         d.addCallback(lambda ign: self.render_json(w))
2420         def _check_json(json):
2421             data = simplejson.loads(json)
2422             # grr. json turns all dict keys into strings.
2423             so_far = data["lease-checker"]["cycle-to-date"]
2424             corrupt_shares = so_far["corrupt-shares"]
2425             # it also turns all tuples into lists
2426             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2427         d.addCallback(_check_json)
2428         d.addCallback(lambda ign: self.render1(w))
2429         def _check_html(html):
2430             s = remove_tags(html)
2431             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2432         d.addCallback(_check_html)
2433
2434         def _wait():
2435             return bool(lc.get_state()["last-cycle-finished"] is not None)
2436         d.addCallback(lambda ign: self.poll(_wait))
2437
2438         def _after_first_cycle(ignored):
2439             s = lc.get_state()
2440             last = s["history"][0]
2441             rec = last["space-recovered"]
2442             self.failUnlessEqual(rec["examined-buckets"], 5)
2443             self.failUnlessEqual(rec["examined-shares"], 3)
2444             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2445         d.addCallback(_after_first_cycle)
2446         d.addCallback(lambda ign: self.render_json(w))
2447         def _check_json_history(json):
2448             data = simplejson.loads(json)
2449             last = data["lease-checker"]["history"]["0"]
2450             corrupt_shares = last["corrupt-shares"]
2451             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2452         d.addCallback(_check_json_history)
2453         d.addCallback(lambda ign: self.render1(w))
2454         def _check_html_history(html):
2455             s = remove_tags(html)
2456             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2457         d.addCallback(_check_html_history)
2458
2459         def _cleanup(res):
2460             self.flushLoggedErrors(UnknownMutableContainerVersionError,
2461                                    UnknownImmutableContainerVersionError)
2462             return res
2463         d.addBoth(_cleanup)
2464         return d
2465
2466     def render_json(self, page):
2467         d = self.render1(page, args={"t": ["json"]})
2468         return d
2469
2470 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2471
2472     def setUp(self):
2473         self.s = service.MultiService()
2474         self.s.startService()
2475     def tearDown(self):
2476         return self.s.stopService()
2477
2478     def test_no_server(self):
2479         w = StorageStatus(None)
2480         html = w.renderSynchronously()
2481         self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
2482
2483     def test_status(self):
2484         basedir = "storage/WebStatus/status"
2485         fileutil.make_dirs(basedir)
2486         ss = StorageServer(basedir, "\x00" * 20)
2487         ss.setServiceParent(self.s)
2488         w = StorageStatus(ss)
2489         d = self.render1(w)
2490         def _check_html(html):
2491             self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2492             s = remove_tags(html)
2493             self.failUnlessIn("Accepting new shares: Yes", s)
2494             self.failUnlessIn("Reserved space: - 0 B (0)", s)
2495         d.addCallback(_check_html)
2496         d.addCallback(lambda ign: self.render_json(w))
2497         def _check_json(json):
2498             data = simplejson.loads(json)
2499             s = data["stats"]
2500             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2501             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2502             self.failUnlessIn("bucket-counter", data)
2503             self.failUnlessIn("lease-checker", data)
2504         d.addCallback(_check_json)
2505         return d
2506
2507     def render_json(self, page):
2508         d = self.render1(page, args={"t": ["json"]})
2509         return d
2510
2511     @mock.patch('allmydata.util.fileutil.get_disk_stats')
2512     def test_status_no_disk_stats(self, mock_get_disk_stats):
2513         mock_get_disk_stats.side_effect = AttributeError()
2514
2515         # Some platforms may have no disk stats API. Make sure the code can handle that
2516         # (test runs on all platforms).
2517         basedir = "storage/WebStatus/status_no_disk_stats"
2518         fileutil.make_dirs(basedir)
2519         ss = StorageServer(basedir, "\x00" * 20)
2520         ss.setServiceParent(self.s)
2521         w = StorageStatus(ss)
2522         html = w.renderSynchronously()
2523         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2524         s = remove_tags(html)
2525         self.failUnlessIn("Accepting new shares: Yes", s)
2526         self.failUnlessIn("Total disk space: ?", s)
2527         self.failUnlessIn("Space Available to Tahoe: ?", s)
2528         self.failUnless(ss.get_available_space() is None)
2529
2530     @mock.patch('allmydata.util.fileutil.get_disk_stats')
2531     def test_status_bad_disk_stats(self, mock_get_disk_stats):
2532         mock_get_disk_stats.side_effect = OSError()
2533
2534         # If the API to get disk stats exists but a call to it fails, then the status should
2535         # show that no shares will be accepted, and get_available_space() should be 0.
2536         basedir = "storage/WebStatus/status_bad_disk_stats"
2537         fileutil.make_dirs(basedir)
2538         ss = StorageServer(basedir, "\x00" * 20)
2539         ss.setServiceParent(self.s)
2540         w = StorageStatus(ss)
2541         html = w.renderSynchronously()
2542         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2543         s = remove_tags(html)
2544         self.failUnlessIn("Accepting new shares: No", s)
2545         self.failUnlessIn("Total disk space: ?", s)
2546         self.failUnlessIn("Space Available to Tahoe: ?", s)
2547         self.failUnlessEqual(ss.get_available_space(), 0)
2548
2549     @mock.patch('allmydata.util.fileutil.get_disk_stats')
2550     def test_status_right_disk_stats(self, mock_get_disk_stats):
2551         GB = 1000000000
2552         total            = 5*GB
2553         free_for_root    = 4*GB
2554         free_for_nonroot = 3*GB
2555         reserved_space   = 1*GB
2556         used = total - free_for_root
2557         avail = max(free_for_nonroot - reserved_space, 0)
2558         mock_get_disk_stats.return_value = {
2559             'total': total,
2560             'free_for_root': free_for_root,
2561             'free_for_nonroot': free_for_nonroot,
2562             'used': used,
2563             'avail': avail,
2564         }
2565
2566         basedir = "storage/WebStatus/status_right_disk_stats"
2567         fileutil.make_dirs(basedir)
2568         ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved_space)
2569         expecteddir = ss.sharedir
2570         ss.setServiceParent(self.s)
2571         w = StorageStatus(ss)
2572         html = w.renderSynchronously()
2573
2574         self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
2575                     mock_get_disk_stats.call_args_list)
2576
2577         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2578         s = remove_tags(html)
2579         self.failUnlessIn("Total disk space: 5.00 GB", s)
2580         self.failUnlessIn("Disk space used: - 1.00 GB", s)
2581         self.failUnlessIn("Disk space free (root): 4.00 GB", s)
2582         self.failUnlessIn("Disk space free (non-root): 3.00 GB", s)
2583         self.failUnlessIn("Reserved space: - 1.00 GB", s)
2584         self.failUnlessIn("Space Available to Tahoe: 2.00 GB", s)
2585         self.failUnlessEqual(ss.get_available_space(), 2*GB)
2586
2587     def test_readonly(self):
2588         basedir = "storage/WebStatus/readonly"
2589         fileutil.make_dirs(basedir)
2590         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2591         ss.setServiceParent(self.s)
2592         w = StorageStatus(ss)
2593         html = w.renderSynchronously()
2594         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2595         s = remove_tags(html)
2596         self.failUnlessIn("Accepting new shares: No", s)
2597
2598     def test_reserved(self):
2599         basedir = "storage/WebStatus/reserved"
2600         fileutil.make_dirs(basedir)
2601         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2602         ss.setServiceParent(self.s)
2603         w = StorageStatus(ss)
2604         html = w.renderSynchronously()
2605         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2606         s = remove_tags(html)
2607         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2608
2609     def test_huge_reserved(self):
2610         basedir = "storage/WebStatus/reserved"
2611         fileutil.make_dirs(basedir)
2612         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2613         ss.setServiceParent(self.s)
2614         w = StorageStatus(ss)
2615         html = w.renderSynchronously()
2616         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2617         s = remove_tags(html)
2618         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2619
2620     def test_util(self):
2621         w = StorageStatus(None)
2622         self.failUnlessEqual(w.render_space(None, None), "?")
2623         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2624         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2625         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2626         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2627         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)