]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
server.py: get_latencies now reports percentiles _only_ if there are sufficient...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1 import time, os.path, platform, stat, re, simplejson, struct
2
3 import mock
4
5 from twisted.trial import unittest
6
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
10 import itertools
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22      ReadBucketProxy
23 from allmydata.interfaces import BadWriteEnablerError
24 from allmydata.test.common import LoggingServiceParent
25 from allmydata.test.common_web import WebRenderingMixin
26 from allmydata.web.storage import StorageStatus, remove_prefix
27
28 class Marker:
29     pass
30 class FakeCanary:
31     def __init__(self, ignore_disconnectors=False):
32         self.ignore = ignore_disconnectors
33         self.disconnectors = {}
34     def notifyOnDisconnect(self, f, *args, **kwargs):
35         if self.ignore:
36             return
37         m = Marker()
38         self.disconnectors[m] = (f, args, kwargs)
39         return m
40     def dontNotifyOnDisconnect(self, marker):
41         if self.ignore:
42             return
43         del self.disconnectors[marker]
44
45 class FakeStatsProvider:
46     def count(self, name, delta=1):
47         pass
48     def register_producer(self, producer):
49         pass
50
51 class Bucket(unittest.TestCase):
52     def make_workdir(self, name):
53         basedir = os.path.join("storage", "Bucket", name)
54         incoming = os.path.join(basedir, "tmp", "bucket")
55         final = os.path.join(basedir, "bucket")
56         fileutil.make_dirs(basedir)
57         fileutil.make_dirs(os.path.join(basedir, "tmp"))
58         return incoming, final
59
60     def bucket_writer_closed(self, bw, consumed):
61         pass
62     def add_latency(self, category, latency):
63         pass
64     def count(self, name, delta=1):
65         pass
66
67     def make_lease(self):
68         owner_num = 0
69         renew_secret = os.urandom(32)
70         cancel_secret = os.urandom(32)
71         expiration_time = time.time() + 5000
72         return LeaseInfo(owner_num, renew_secret, cancel_secret,
73                          expiration_time, "\x00" * 20)
74
75     def test_create(self):
76         incoming, final = self.make_workdir("test_create")
77         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
78                           FakeCanary())
79         bw.remote_write(0, "a"*25)
80         bw.remote_write(25, "b"*25)
81         bw.remote_write(50, "c"*25)
82         bw.remote_write(75, "d"*7)
83         bw.remote_close()
84
85     def test_readwrite(self):
86         incoming, final = self.make_workdir("test_readwrite")
87         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
88                           FakeCanary())
89         bw.remote_write(0, "a"*25)
90         bw.remote_write(25, "b"*25)
91         bw.remote_write(50, "c"*7) # last block may be short
92         bw.remote_close()
93
94         # now read from it
95         br = BucketReader(self, bw.finalhome)
96         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
97         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
98         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
99
100 class RemoteBucket:
101
102     def callRemote(self, methname, *args, **kwargs):
103         def _call():
104             meth = getattr(self.target, "remote_" + methname)
105             return meth(*args, **kwargs)
106         return defer.maybeDeferred(_call)
107
108 class BucketProxy(unittest.TestCase):
109     def make_bucket(self, name, size):
110         basedir = os.path.join("storage", "BucketProxy", name)
111         incoming = os.path.join(basedir, "tmp", "bucket")
112         final = os.path.join(basedir, "bucket")
113         fileutil.make_dirs(basedir)
114         fileutil.make_dirs(os.path.join(basedir, "tmp"))
115         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
116                           FakeCanary())
117         rb = RemoteBucket()
118         rb.target = bw
119         return bw, rb, final
120
121     def make_lease(self):
122         owner_num = 0
123         renew_secret = os.urandom(32)
124         cancel_secret = os.urandom(32)
125         expiration_time = time.time() + 5000
126         return LeaseInfo(owner_num, renew_secret, cancel_secret,
127                          expiration_time, "\x00" * 20)
128
129     def bucket_writer_closed(self, bw, consumed):
130         pass
131     def add_latency(self, category, latency):
132         pass
133     def count(self, name, delta=1):
134         pass
135
136     def test_create(self):
137         bw, rb, sharefname = self.make_bucket("test_create", 500)
138         bp = WriteBucketProxy(rb,
139                               data_size=300,
140                               block_size=10,
141                               num_segments=5,
142                               num_share_hashes=3,
143                               uri_extension_size_max=500, nodeid=None)
144         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
145
146     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
147         # Let's pretend each share has 100 bytes of data, and that there are
148         # 4 segments (25 bytes each), and 8 shares total. So the two
149         # per-segment merkle trees (crypttext_hash_tree,
150         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
151         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
152         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
153         # long. That should make the whole share:
154         #
155         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
156         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
157
158         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
159
160         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
161                             for i in range(7)]
162         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
163                         for i in range(7)]
164         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
165                         for i in (1,9,13)]
166         uri_extension = "s" + "E"*498 + "e"
167
168         bw, rb, sharefname = self.make_bucket(name, sharesize)
169         bp = wbp_class(rb,
170                        data_size=95,
171                        block_size=25,
172                        num_segments=4,
173                        num_share_hashes=3,
174                        uri_extension_size_max=len(uri_extension),
175                        nodeid=None)
176
177         d = bp.put_header()
178         d.addCallback(lambda res: bp.put_block(0, "a"*25))
179         d.addCallback(lambda res: bp.put_block(1, "b"*25))
180         d.addCallback(lambda res: bp.put_block(2, "c"*25))
181         d.addCallback(lambda res: bp.put_block(3, "d"*20))
182         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
183         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
184         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
185         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
186         d.addCallback(lambda res: bp.close())
187
188         # now read everything back
189         def _start_reading(res):
190             br = BucketReader(self, sharefname)
191             rb = RemoteBucket()
192             rb.target = br
193             rbp = rbp_class(rb, peerid="abc", storage_index="")
194             self.failUnlessIn("to peer", repr(rbp))
195             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
196
197             d1 = rbp.get_block_data(0, 25, 25)
198             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
199             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
200             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
201             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
202             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
203             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
204             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
205
206             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
207             d1.addCallback(lambda res:
208                            self.failUnlessEqual(res, crypttext_hashes))
209             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
210             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
211             d1.addCallback(lambda res: rbp.get_share_hashes())
212             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
213             d1.addCallback(lambda res: rbp.get_uri_extension())
214             d1.addCallback(lambda res:
215                            self.failUnlessEqual(res, uri_extension))
216
217             return d1
218
219         d.addCallback(_start_reading)
220
221         return d
222
223     def test_readwrite_v1(self):
224         return self._do_test_readwrite("test_readwrite_v1",
225                                        0x24, WriteBucketProxy, ReadBucketProxy)
226
227     def test_readwrite_v2(self):
228         return self._do_test_readwrite("test_readwrite_v2",
229                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
230
231 class Server(unittest.TestCase):
232
233     def setUp(self):
234         self.sparent = LoggingServiceParent()
235         self.sparent.startService()
236         self._lease_secret = itertools.count()
237     def tearDown(self):
238         return self.sparent.stopService()
239
240     def workdir(self, name):
241         basedir = os.path.join("storage", "Server", name)
242         return basedir
243
244     def create(self, name, reserved_space=0, klass=StorageServer):
245         workdir = self.workdir(name)
246         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
247                    stats_provider=FakeStatsProvider())
248         ss.setServiceParent(self.sparent)
249         return ss
250
251     def test_create(self):
252         self.create("test_create")
253
254     def allocate(self, ss, storage_index, sharenums, size, canary=None):
255         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
256         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
257         if not canary:
258             canary = FakeCanary()
259         return ss.remote_allocate_buckets(storage_index,
260                                           renew_secret, cancel_secret,
261                                           sharenums, size, canary)
262
263     def test_large_share(self):
264         syslow = platform.system().lower()
265         if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
266             raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
267
268         avail = fileutil.get_available_space('.', 512*2**20)
269         if avail <= 4*2**30:
270             raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
271
272         ss = self.create("test_large_share")
273
274         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
275         self.failUnlessEqual(already, set())
276         self.failUnlessEqual(set(writers.keys()), set([0]))
277
278         shnum, bucket = writers.items()[0]
279         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
280         bucket.remote_write(2**32, "ab")
281         bucket.remote_close()
282
283         readers = ss.remote_get_buckets("allocate")
284         reader = readers[shnum]
285         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
286
287     def test_dont_overfill_dirs(self):
288         """
289         This test asserts that if you add a second share whose storage index
290         share lots of leading bits with an extant share (but isn't the exact
291         same storage index), this won't add an entry to the share directory.
292         """
293         ss = self.create("test_dont_overfill_dirs")
294         already, writers = self.allocate(ss, "storageindex", [0], 10)
295         for i, wb in writers.items():
296             wb.remote_write(0, "%10d" % i)
297             wb.remote_close()
298         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
299                                 "shares")
300         children_of_storedir = set(os.listdir(storedir))
301
302         # Now store another one under another storageindex that has leading
303         # chars the same as the first storageindex.
304         already, writers = self.allocate(ss, "storageindey", [0], 10)
305         for i, wb in writers.items():
306             wb.remote_write(0, "%10d" % i)
307             wb.remote_close()
308         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
309                                 "shares")
310         new_children_of_storedir = set(os.listdir(storedir))
311         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
312
313     def test_remove_incoming(self):
314         ss = self.create("test_remove_incoming")
315         already, writers = self.allocate(ss, "vid", range(3), 10)
316         for i,wb in writers.items():
317             wb.remote_write(0, "%10d" % i)
318             wb.remote_close()
319         incoming_share_dir = wb.incominghome
320         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
321         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
322         incoming_dir = os.path.dirname(incoming_prefix_dir)
323         self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
324         self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
325         self.failUnless(os.path.exists(incoming_dir), incoming_dir)
326
327     def test_abort(self):
328         # remote_abort, when called on a writer, should make sure that
329         # the allocated size of the bucket is not counted by the storage
330         # server when accounting for space.
331         ss = self.create("test_abort")
332         already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
333         self.failIfEqual(ss.allocated_size(), 0)
334
335         # Now abort the writers.
336         for writer in writers.itervalues():
337             writer.remote_abort()
338         self.failUnlessEqual(ss.allocated_size(), 0)
339
340
341     def test_allocate(self):
342         ss = self.create("test_allocate")
343
344         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
345
346         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
347         self.failUnlessEqual(already, set())
348         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
349
350         # while the buckets are open, they should not count as readable
351         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
352
353         # close the buckets
354         for i,wb in writers.items():
355             wb.remote_write(0, "%25d" % i)
356             wb.remote_close()
357             # aborting a bucket that was already closed is a no-op
358             wb.remote_abort()
359
360         # now they should be readable
361         b = ss.remote_get_buckets("allocate")
362         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
363         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
364         b_str = str(b[0])
365         self.failUnlessIn("BucketReader", b_str)
366         self.failUnlessIn("mfwgy33dmf2g 0", b_str)
367
368         # now if we ask about writing again, the server should offer those
369         # three buckets as already present. It should offer them even if we
370         # don't ask about those specific ones.
371         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
372         self.failUnlessEqual(already, set([0,1,2]))
373         self.failUnlessEqual(set(writers.keys()), set([3,4]))
374
375         # while those two buckets are open for writing, the server should
376         # refuse to offer them to uploaders
377
378         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
379         self.failUnlessEqual(already2, set([0,1,2]))
380         self.failUnlessEqual(set(writers2.keys()), set([5]))
381
382         # aborting the writes should remove the tempfiles
383         for i,wb in writers2.items():
384             wb.remote_abort()
385         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
386         self.failUnlessEqual(already2, set([0,1,2]))
387         self.failUnlessEqual(set(writers2.keys()), set([5]))
388
389         for i,wb in writers2.items():
390             wb.remote_abort()
391         for i,wb in writers.items():
392             wb.remote_abort()
393
394     def test_bad_container_version(self):
395         ss = self.create("test_bad_container_version")
396         a,w = self.allocate(ss, "si1", [0], 10)
397         w[0].remote_write(0, "\xff"*10)
398         w[0].remote_close()
399
400         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
401         f = open(fn, "rb+")
402         f.seek(0)
403         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
404         f.close()
405
406         ss.remote_get_buckets("allocate")
407
408         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
409                                   ss.remote_get_buckets, "si1")
410         self.failUnlessIn(" had version 0 but we wanted 1", str(e))
411
412     def test_disconnect(self):
413         # simulate a disconnection
414         ss = self.create("test_disconnect")
415         canary = FakeCanary()
416         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
417         self.failUnlessEqual(already, set())
418         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
419         for (f,args,kwargs) in canary.disconnectors.values():
420             f(*args, **kwargs)
421         del already
422         del writers
423
424         # that ought to delete the incoming shares
425         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
426         self.failUnlessEqual(already, set())
427         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
428
429     @mock.patch('allmydata.util.fileutil.get_disk_stats')
430     def test_reserved_space(self, mock_get_disk_stats):
431         reserved_space=10000
432         mock_get_disk_stats.return_value = {
433             'free_for_nonroot': 15000,
434             'avail': max(15000 - reserved_space, 0),
435             }
436
437         ss = self.create("test_reserved_space", reserved_space=reserved_space)
438         # 15k available, 10k reserved, leaves 5k for shares
439
440         # a newly created and filled share incurs this much overhead, beyond
441         # the size we request.
442         OVERHEAD = 3*4
443         LEASE_SIZE = 4+32+32+4
444         canary = FakeCanary(True)
445         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
446         self.failUnlessEqual(len(writers), 3)
447         # now the StorageServer should have 3000 bytes provisionally
448         # allocated, allowing only 2000 more to be claimed
449         self.failUnlessEqual(len(ss._active_writers), 3)
450
451         # allocating 1001-byte shares only leaves room for one
452         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
453         self.failUnlessEqual(len(writers2), 1)
454         self.failUnlessEqual(len(ss._active_writers), 4)
455
456         # we abandon the first set, so their provisional allocation should be
457         # returned
458         del already
459         del writers
460         self.failUnlessEqual(len(ss._active_writers), 1)
461         # now we have a provisional allocation of 1001 bytes
462
463         # and we close the second set, so their provisional allocation should
464         # become real, long-term allocation, and grows to include the
465         # overhead.
466         for bw in writers2.values():
467             bw.remote_write(0, "a"*25)
468             bw.remote_close()
469         del already2
470         del writers2
471         del bw
472         self.failUnlessEqual(len(ss._active_writers), 0)
473
474         allocated = 1001 + OVERHEAD + LEASE_SIZE
475
476         # we have to manually increase available, since we're not doing real
477         # disk measurements
478         mock_get_disk_stats.return_value = {
479             'free_for_nonroot': 15000 - allocated,
480             'avail': max(15000 - allocated - reserved_space, 0),
481             }
482
483         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
484         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
485         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
486         self.failUnlessEqual(len(writers3), 39)
487         self.failUnlessEqual(len(ss._active_writers), 39)
488
489         del already3
490         del writers3
491         self.failUnlessEqual(len(ss._active_writers), 0)
492         ss.disownServiceParent()
493         del ss
494
495     def test_seek(self):
496         basedir = self.workdir("test_seek_behavior")
497         fileutil.make_dirs(basedir)
498         filename = os.path.join(basedir, "testfile")
499         f = open(filename, "wb")
500         f.write("start")
501         f.close()
502         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
503         # files. mode="a" preserves previous contents but does not allow
504         # seeking-to-create-holes. mode="r+" allows both.
505         f = open(filename, "rb+")
506         f.seek(100)
507         f.write("100")
508         f.close()
509         filelen = os.stat(filename)[stat.ST_SIZE]
510         self.failUnlessEqual(filelen, 100+3)
511         f2 = open(filename, "rb")
512         self.failUnlessEqual(f2.read(5), "start")
513
514
515     def test_leases(self):
516         ss = self.create("test_leases")
517         canary = FakeCanary()
518         sharenums = range(5)
519         size = 100
520
521         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
522                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
523         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
524                                                      sharenums, size, canary)
525         self.failUnlessEqual(len(already), 0)
526         self.failUnlessEqual(len(writers), 5)
527         for wb in writers.values():
528             wb.remote_close()
529
530         leases = list(ss.get_leases("si0"))
531         self.failUnlessEqual(len(leases), 1)
532         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
533
534         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
535                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
536         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
537                                                      sharenums, size, canary)
538         for wb in writers.values():
539             wb.remote_close()
540
541         # take out a second lease on si1
542         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
543                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
544         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
545                                                      sharenums, size, canary)
546         self.failUnlessEqual(len(already), 5)
547         self.failUnlessEqual(len(writers), 0)
548
549         leases = list(ss.get_leases("si1"))
550         self.failUnlessEqual(len(leases), 2)
551         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
552
553         # and a third lease, using add-lease
554         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
555                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
556         ss.remote_add_lease("si1", rs2a, cs2a)
557         leases = list(ss.get_leases("si1"))
558         self.failUnlessEqual(len(leases), 3)
559         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
560
561         # add-lease on a missing storage index is silently ignored
562         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
563
564         # check that si0 is readable
565         readers = ss.remote_get_buckets("si0")
566         self.failUnlessEqual(len(readers), 5)
567
568         # renew the first lease. Only the proper renew_secret should work
569         ss.remote_renew_lease("si0", rs0)
570         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
571         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
572
573         # check that si0 is still readable
574         readers = ss.remote_get_buckets("si0")
575         self.failUnlessEqual(len(readers), 5)
576
577         # now cancel it
578         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
579         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
580         ss.remote_cancel_lease("si0", cs0)
581
582         # si0 should now be gone
583         readers = ss.remote_get_buckets("si0")
584         self.failUnlessEqual(len(readers), 0)
585         # and the renew should no longer work
586         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
587
588
589         # cancel the first lease on si1, leaving the second and third in place
590         ss.remote_cancel_lease("si1", cs1)
591         readers = ss.remote_get_buckets("si1")
592         self.failUnlessEqual(len(readers), 5)
593         # the corresponding renew should no longer work
594         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
595
596         leases = list(ss.get_leases("si1"))
597         self.failUnlessEqual(len(leases), 2)
598         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
599
600         ss.remote_renew_lease("si1", rs2)
601         # cancelling the second and third should make it go away
602         ss.remote_cancel_lease("si1", cs2)
603         ss.remote_cancel_lease("si1", cs2a)
604         readers = ss.remote_get_buckets("si1")
605         self.failUnlessEqual(len(readers), 0)
606         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
607         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
608         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
609
610         leases = list(ss.get_leases("si1"))
611         self.failUnlessEqual(len(leases), 0)
612
613
614         # test overlapping uploads
615         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
616                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
617         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
618                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
619         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
620                                                      sharenums, size, canary)
621         self.failUnlessEqual(len(already), 0)
622         self.failUnlessEqual(len(writers), 5)
623         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
624                                                        sharenums, size, canary)
625         self.failUnlessEqual(len(already2), 0)
626         self.failUnlessEqual(len(writers2), 0)
627         for wb in writers.values():
628             wb.remote_close()
629
630         leases = list(ss.get_leases("si3"))
631         self.failUnlessEqual(len(leases), 1)
632
633         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
634                                                        sharenums, size, canary)
635         self.failUnlessEqual(len(already3), 5)
636         self.failUnlessEqual(len(writers3), 0)
637
638         leases = list(ss.get_leases("si3"))
639         self.failUnlessEqual(len(leases), 2)
640
641     def test_readonly(self):
642         workdir = self.workdir("test_readonly")
643         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
644         ss.setServiceParent(self.sparent)
645
646         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
647         self.failUnlessEqual(already, set())
648         self.failUnlessEqual(writers, {})
649
650         stats = ss.get_stats()
651         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
652         if "storage_server.disk_avail" in stats:
653             # Some platforms may not have an API to get disk stats.
654             # But if there are stats, readonly_storage means disk_avail=0
655             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
656
657     def test_discard(self):
658         # discard is really only used for other tests, but we test it anyways
659         workdir = self.workdir("test_discard")
660         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
661         ss.setServiceParent(self.sparent)
662
663         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
664         self.failUnlessEqual(already, set())
665         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
666         for i,wb in writers.items():
667             wb.remote_write(0, "%25d" % i)
668             wb.remote_close()
669         # since we discard the data, the shares should be present but sparse.
670         # Since we write with some seeks, the data we read back will be all
671         # zeros.
672         b = ss.remote_get_buckets("vid")
673         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
674         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
675
676     def test_advise_corruption(self):
677         workdir = self.workdir("test_advise_corruption")
678         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
679         ss.setServiceParent(self.sparent)
680
681         si0_s = base32.b2a("si0")
682         ss.remote_advise_corrupt_share("immutable", "si0", 0,
683                                        "This share smells funny.\n")
684         reportdir = os.path.join(workdir, "corruption-advisories")
685         reports = os.listdir(reportdir)
686         self.failUnlessEqual(len(reports), 1)
687         report_si0 = reports[0]
688         self.failUnlessIn(si0_s, report_si0)
689         f = open(os.path.join(reportdir, report_si0), "r")
690         report = f.read()
691         f.close()
692         self.failUnlessIn("type: immutable", report)
693         self.failUnlessIn("storage_index: %s" % si0_s, report)
694         self.failUnlessIn("share_number: 0", report)
695         self.failUnlessIn("This share smells funny.", report)
696
697         # test the RIBucketWriter version too
698         si1_s = base32.b2a("si1")
699         already,writers = self.allocate(ss, "si1", [1], 75)
700         self.failUnlessEqual(already, set())
701         self.failUnlessEqual(set(writers.keys()), set([1]))
702         writers[1].remote_write(0, "data")
703         writers[1].remote_close()
704
705         b = ss.remote_get_buckets("si1")
706         self.failUnlessEqual(set(b.keys()), set([1]))
707         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
708
709         reports = os.listdir(reportdir)
710         self.failUnlessEqual(len(reports), 2)
711         report_si1 = [r for r in reports if si1_s in r][0]
712         f = open(os.path.join(reportdir, report_si1), "r")
713         report = f.read()
714         f.close()
715         self.failUnlessIn("type: immutable", report)
716         self.failUnlessIn("storage_index: %s" % si1_s, report)
717         self.failUnlessIn("share_number: 1", report)
718         self.failUnlessIn("This share tastes like dust.", report)
719
720
721
722 class MutableServer(unittest.TestCase):
723
724     def setUp(self):
725         self.sparent = LoggingServiceParent()
726         self._lease_secret = itertools.count()
727     def tearDown(self):
728         return self.sparent.stopService()
729
730     def workdir(self, name):
731         basedir = os.path.join("storage", "MutableServer", name)
732         return basedir
733
734     def create(self, name):
735         workdir = self.workdir(name)
736         ss = StorageServer(workdir, "\x00" * 20)
737         ss.setServiceParent(self.sparent)
738         return ss
739
740     def test_create(self):
741         self.create("test_create")
742
743     def write_enabler(self, we_tag):
744         return hashutil.tagged_hash("we_blah", we_tag)
745
746     def renew_secret(self, tag):
747         return hashutil.tagged_hash("renew_blah", str(tag))
748
749     def cancel_secret(self, tag):
750         return hashutil.tagged_hash("cancel_blah", str(tag))
751
752     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
753         write_enabler = self.write_enabler(we_tag)
754         renew_secret = self.renew_secret(lease_tag)
755         cancel_secret = self.cancel_secret(lease_tag)
756         rstaraw = ss.remote_slot_testv_and_readv_and_writev
757         testandwritev = dict( [ (shnum, ([], [], None) )
758                          for shnum in sharenums ] )
759         readv = []
760         rc = rstaraw(storage_index,
761                      (write_enabler, renew_secret, cancel_secret),
762                      testandwritev,
763                      readv)
764         (did_write, readv_data) = rc
765         self.failUnless(did_write)
766         self.failUnless(isinstance(readv_data, dict))
767         self.failUnlessEqual(len(readv_data), 0)
768
769     def test_bad_magic(self):
770         ss = self.create("test_bad_magic")
771         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
772         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
773         f = open(fn, "rb+")
774         f.seek(0)
775         f.write("BAD MAGIC")
776         f.close()
777         read = ss.remote_slot_readv
778         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
779                                   read, "si1", [0], [(0,10)])
780         self.failUnlessIn(" had magic ", str(e))
781         self.failUnlessIn(" but we wanted ", str(e))
782
783     def test_container_size(self):
784         ss = self.create("test_container_size")
785         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
786                       set([0,1,2]), 100)
787         read = ss.remote_slot_readv
788         rstaraw = ss.remote_slot_testv_and_readv_and_writev
789         secrets = ( self.write_enabler("we1"),
790                     self.renew_secret("we1"),
791                     self.cancel_secret("we1") )
792         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
793         answer = rstaraw("si1", secrets,
794                          {0: ([], [(0,data)], len(data)+12)},
795                          [])
796         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
797
798         # trying to make the container too large will raise an exception
799         TOOBIG = MutableShareFile.MAX_SIZE + 10
800         self.failUnlessRaises(DataTooLargeError,
801                               rstaraw, "si1", secrets,
802                               {0: ([], [(0,data)], TOOBIG)},
803                               [])
804
805         # it should be possible to make the container smaller, although at
806         # the moment this doesn't actually affect the share, unless the
807         # container size is dropped to zero, in which case the share is
808         # deleted.
809         answer = rstaraw("si1", secrets,
810                          {0: ([], [(0,data)], len(data)+8)},
811                          [])
812         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
813
814         answer = rstaraw("si1", secrets,
815                          {0: ([], [(0,data)], 0)},
816                          [])
817         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
818
819         read_answer = read("si1", [0], [(0,10)])
820         self.failUnlessEqual(read_answer, {})
821
822     def test_allocate(self):
823         ss = self.create("test_allocate")
824         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
825                       set([0,1,2]), 100)
826
827         read = ss.remote_slot_readv
828         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
829                              {0: [""]})
830         self.failUnlessEqual(read("si1", [], [(0, 10)]),
831                              {0: [""], 1: [""], 2: [""]})
832         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
833                              {0: [""]})
834
835         # try writing to one
836         secrets = ( self.write_enabler("we1"),
837                     self.renew_secret("we1"),
838                     self.cancel_secret("we1") )
839         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
840         write = ss.remote_slot_testv_and_readv_and_writev
841         answer = write("si1", secrets,
842                        {0: ([], [(0,data)], None)},
843                        [])
844         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
845
846         self.failUnlessEqual(read("si1", [0], [(0,20)]),
847                              {0: ["00000000001111111111"]})
848         self.failUnlessEqual(read("si1", [0], [(95,10)]),
849                              {0: ["99999"]})
850         #self.failUnlessEqual(s0.remote_get_length(), 100)
851
852         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
853         f = self.failUnlessRaises(BadWriteEnablerError,
854                                   write, "si1", bad_secrets,
855                                   {}, [])
856         self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
857
858         # this testv should fail
859         answer = write("si1", secrets,
860                        {0: ([(0, 12, "eq", "444444444444"),
861                              (20, 5, "eq", "22222"),
862                              ],
863                             [(0, "x"*100)],
864                             None),
865                         },
866                        [(0,12), (20,5)],
867                        )
868         self.failUnlessEqual(answer, (False,
869                                       {0: ["000000000011", "22222"],
870                                        1: ["", ""],
871                                        2: ["", ""],
872                                        }))
873         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
874
875         # as should this one
876         answer = write("si1", secrets,
877                        {0: ([(10, 5, "lt", "11111"),
878                              ],
879                             [(0, "x"*100)],
880                             None),
881                         },
882                        [(10,5)],
883                        )
884         self.failUnlessEqual(answer, (False,
885                                       {0: ["11111"],
886                                        1: [""],
887                                        2: [""]},
888                                       ))
889         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
890
891
892     def test_operators(self):
893         # test operators, the data we're comparing is '11111' in all cases.
894         # test both fail+pass, reset data after each one.
895         ss = self.create("test_operators")
896
897         secrets = ( self.write_enabler("we1"),
898                     self.renew_secret("we1"),
899                     self.cancel_secret("we1") )
900         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
901         write = ss.remote_slot_testv_and_readv_and_writev
902         read = ss.remote_slot_readv
903
904         def reset():
905             write("si1", secrets,
906                   {0: ([], [(0,data)], None)},
907                   [])
908
909         reset()
910
911         #  lt
912         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
913                                              ],
914                                             [(0, "x"*100)],
915                                             None,
916                                             )}, [(10,5)])
917         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
918         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
919         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
920         reset()
921
922         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
923                                              ],
924                                             [(0, "x"*100)],
925                                             None,
926                                             )}, [(10,5)])
927         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
928         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
929         reset()
930
931         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
932                                              ],
933                                             [(0, "y"*100)],
934                                             None,
935                                             )}, [(10,5)])
936         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
937         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
938         reset()
939
940         #  le
941         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
942                                              ],
943                                             [(0, "x"*100)],
944                                             None,
945                                             )}, [(10,5)])
946         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
947         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
948         reset()
949
950         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
951                                              ],
952                                             [(0, "y"*100)],
953                                             None,
954                                             )}, [(10,5)])
955         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
956         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
957         reset()
958
959         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
960                                              ],
961                                             [(0, "y"*100)],
962                                             None,
963                                             )}, [(10,5)])
964         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
965         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
966         reset()
967
968         #  eq
969         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
970                                              ],
971                                             [(0, "x"*100)],
972                                             None,
973                                             )}, [(10,5)])
974         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
975         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
976         reset()
977
978         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
979                                              ],
980                                             [(0, "y"*100)],
981                                             None,
982                                             )}, [(10,5)])
983         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
984         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
985         reset()
986
987         #  ne
988         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
989                                              ],
990                                             [(0, "x"*100)],
991                                             None,
992                                             )}, [(10,5)])
993         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
994         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
995         reset()
996
997         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
998                                              ],
999                                             [(0, "y"*100)],
1000                                             None,
1001                                             )}, [(10,5)])
1002         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1003         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1004         reset()
1005
1006         #  ge
1007         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1008                                              ],
1009                                             [(0, "y"*100)],
1010                                             None,
1011                                             )}, [(10,5)])
1012         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1013         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1014         reset()
1015
1016         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1017                                              ],
1018                                             [(0, "y"*100)],
1019                                             None,
1020                                             )}, [(10,5)])
1021         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1022         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1023         reset()
1024
1025         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1026                                              ],
1027                                             [(0, "y"*100)],
1028                                             None,
1029                                             )}, [(10,5)])
1030         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1031         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1032         reset()
1033
1034         #  gt
1035         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1036                                              ],
1037                                             [(0, "y"*100)],
1038                                             None,
1039                                             )}, [(10,5)])
1040         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1041         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1042         reset()
1043
1044         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1045                                              ],
1046                                             [(0, "x"*100)],
1047                                             None,
1048                                             )}, [(10,5)])
1049         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1050         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1051         reset()
1052
1053         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1054                                              ],
1055                                             [(0, "x"*100)],
1056                                             None,
1057                                             )}, [(10,5)])
1058         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1059         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1060         reset()
1061
1062         # finally, test some operators against empty shares
1063         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1064                                              ],
1065                                             [(0, "x"*100)],
1066                                             None,
1067                                             )}, [(10,5)])
1068         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1069         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1070         reset()
1071
1072     def test_readv(self):
1073         ss = self.create("test_readv")
1074         secrets = ( self.write_enabler("we1"),
1075                     self.renew_secret("we1"),
1076                     self.cancel_secret("we1") )
1077         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1078         write = ss.remote_slot_testv_and_readv_and_writev
1079         read = ss.remote_slot_readv
1080         data = [("%d" % i) * 100 for i in range(3)]
1081         rc = write("si1", secrets,
1082                    {0: ([], [(0,data[0])], None),
1083                     1: ([], [(0,data[1])], None),
1084                     2: ([], [(0,data[2])], None),
1085                     }, [])
1086         self.failUnlessEqual(rc, (True, {}))
1087
1088         answer = read("si1", [], [(0, 10)])
1089         self.failUnlessEqual(answer, {0: ["0"*10],
1090                                       1: ["1"*10],
1091                                       2: ["2"*10]})
1092
1093     def compare_leases_without_timestamps(self, leases_a, leases_b):
1094         self.failUnlessEqual(len(leases_a), len(leases_b))
1095         for i in range(len(leases_a)):
1096             a = leases_a[i]
1097             b = leases_b[i]
1098             self.failUnlessEqual(a.owner_num,       b.owner_num)
1099             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1100             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1101             self.failUnlessEqual(a.nodeid,          b.nodeid)
1102
1103     def compare_leases(self, leases_a, leases_b):
1104         self.failUnlessEqual(len(leases_a), len(leases_b))
1105         for i in range(len(leases_a)):
1106             a = leases_a[i]
1107             b = leases_b[i]
1108             self.failUnlessEqual(a.owner_num,       b.owner_num)
1109             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1110             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1111             self.failUnlessEqual(a.nodeid,          b.nodeid)
1112             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1113
1114     def test_leases(self):
1115         ss = self.create("test_leases")
1116         def secrets(n):
1117             return ( self.write_enabler("we1"),
1118                      self.renew_secret("we1-%d" % n),
1119                      self.cancel_secret("we1-%d" % n) )
1120         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1121         write = ss.remote_slot_testv_and_readv_and_writev
1122         read = ss.remote_slot_readv
1123         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1124         self.failUnlessEqual(rc, (True, {}))
1125
1126         # create a random non-numeric file in the bucket directory, to
1127         # exercise the code that's supposed to ignore those.
1128         bucket_dir = os.path.join(self.workdir("test_leases"),
1129                                   "shares", storage_index_to_dir("si1"))
1130         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1131         f.write("you ought to be ignoring me\n")
1132         f.close()
1133
1134         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1135         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1136
1137         # add-lease on a missing storage index is silently ignored
1138         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1139
1140         # re-allocate the slots and use the same secrets, that should update
1141         # the lease
1142         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1143         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1144
1145         # renew it directly
1146         ss.remote_renew_lease("si1", secrets(0)[1])
1147         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1148
1149         # now allocate them with a bunch of different secrets, to trigger the
1150         # extended lease code. Use add_lease for one of them.
1151         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1152         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1153         secrets2 = secrets(2)
1154         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1155         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1156         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1157         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1158         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1159
1160         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1161
1162         # cancel one of them
1163         ss.remote_cancel_lease("si1", secrets(5)[2])
1164         self.failUnlessEqual(len(list(s0.get_leases())), 5)
1165
1166         all_leases = list(s0.get_leases())
1167         # and write enough data to expand the container, forcing the server
1168         # to move the leases
1169         write("si1", secrets(0),
1170               {0: ([], [(0,data)], 200), },
1171               [])
1172
1173         # read back the leases, make sure they're still intact.
1174         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1175
1176         ss.remote_renew_lease("si1", secrets(0)[1])
1177         ss.remote_renew_lease("si1", secrets(1)[1])
1178         ss.remote_renew_lease("si1", secrets(2)[1])
1179         ss.remote_renew_lease("si1", secrets(3)[1])
1180         ss.remote_renew_lease("si1", secrets(4)[1])
1181         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1182         # get a new copy of the leases, with the current timestamps. Reading
1183         # data and failing to renew/cancel leases should leave the timestamps
1184         # alone.
1185         all_leases = list(s0.get_leases())
1186         # renewing with a bogus token should prompt an error message
1187
1188         # examine the exception thus raised, make sure the old nodeid is
1189         # present, to provide for share migration
1190         e = self.failUnlessRaises(IndexError,
1191                                   ss.remote_renew_lease, "si1",
1192                                   secrets(20)[1])
1193         e_s = str(e)
1194         self.failUnlessIn("Unable to renew non-existent lease", e_s)
1195         self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1196         self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1197
1198         # same for cancelling
1199         self.failUnlessRaises(IndexError,
1200                               ss.remote_cancel_lease, "si1",
1201                               secrets(20)[2])
1202         self.compare_leases(all_leases, list(s0.get_leases()))
1203
1204         # reading shares should not modify the timestamp
1205         read("si1", [], [(0,200)])
1206         self.compare_leases(all_leases, list(s0.get_leases()))
1207
1208         write("si1", secrets(0),
1209               {0: ([], [(200, "make me bigger")], None)}, [])
1210         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1211
1212         write("si1", secrets(0),
1213               {0: ([], [(500, "make me really bigger")], None)}, [])
1214         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1215
1216         # now cancel them all
1217         ss.remote_cancel_lease("si1", secrets(0)[2])
1218         ss.remote_cancel_lease("si1", secrets(1)[2])
1219         ss.remote_cancel_lease("si1", secrets(2)[2])
1220         ss.remote_cancel_lease("si1", secrets(3)[2])
1221
1222         # the slot should still be there
1223         remaining_shares = read("si1", [], [(0,10)])
1224         self.failUnlessEqual(len(remaining_shares), 1)
1225         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1226
1227         # cancelling a non-existent lease should raise an IndexError
1228         self.failUnlessRaises(IndexError,
1229                               ss.remote_cancel_lease, "si1", "nonsecret")
1230
1231         # and the slot should still be there
1232         remaining_shares = read("si1", [], [(0,10)])
1233         self.failUnlessEqual(len(remaining_shares), 1)
1234         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1235
1236         ss.remote_cancel_lease("si1", secrets(4)[2])
1237         # now the slot should be gone
1238         no_shares = read("si1", [], [(0,10)])
1239         self.failUnlessEqual(no_shares, {})
1240
1241         # cancelling a lease on a non-existent share should raise an IndexError
1242         self.failUnlessRaises(IndexError,
1243                               ss.remote_cancel_lease, "si2", "nonsecret")
1244
1245     def test_remove(self):
1246         ss = self.create("test_remove")
1247         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1248                       set([0,1,2]), 100)
1249         readv = ss.remote_slot_readv
1250         writev = ss.remote_slot_testv_and_readv_and_writev
1251         secrets = ( self.write_enabler("we1"),
1252                     self.renew_secret("we1"),
1253                     self.cancel_secret("we1") )
1254         # delete sh0 by setting its size to zero
1255         answer = writev("si1", secrets,
1256                         {0: ([], [], 0)},
1257                         [])
1258         # the answer should mention all the shares that existed before the
1259         # write
1260         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1261         # but a new read should show only sh1 and sh2
1262         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1263                              {1: [""], 2: [""]})
1264
1265         # delete sh1 by setting its size to zero
1266         answer = writev("si1", secrets,
1267                         {1: ([], [], 0)},
1268                         [])
1269         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1270         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1271                              {2: [""]})
1272
1273         # delete sh2 by setting its size to zero
1274         answer = writev("si1", secrets,
1275                         {2: ([], [], 0)},
1276                         [])
1277         self.failUnlessEqual(answer, (True, {2:[]}) )
1278         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1279                              {})
1280         # and the bucket directory should now be gone
1281         si = base32.b2a("si1")
1282         # note: this is a detail of the storage server implementation, and
1283         # may change in the future
1284         prefix = si[:2]
1285         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1286         bucketdir = os.path.join(prefixdir, si)
1287         self.failUnless(os.path.exists(prefixdir), prefixdir)
1288         self.failIf(os.path.exists(bucketdir), bucketdir)
1289
1290 class Stats(unittest.TestCase):
1291
1292     def setUp(self):
1293         self.sparent = LoggingServiceParent()
1294         self._lease_secret = itertools.count()
1295     def tearDown(self):
1296         return self.sparent.stopService()
1297
1298     def workdir(self, name):
1299         basedir = os.path.join("storage", "Server", name)
1300         return basedir
1301
1302     def create(self, name):
1303         workdir = self.workdir(name)
1304         ss = StorageServer(workdir, "\x00" * 20)
1305         ss.setServiceParent(self.sparent)
1306         return ss
1307
1308     def test_latencies(self):
1309         ss = self.create("test_latencies")
1310         for i in range(10000):
1311             ss.add_latency("allocate", 1.0 * i)
1312         for i in range(1000):
1313             ss.add_latency("renew", 1.0 * i)
1314         for i in range(20):
1315             ss.add_latency("write", 1.0 * i)
1316         for i in range(10):
1317             ss.add_latency("cancel", 2.0 * i)
1318         ss.add_latency("get", 5.0)
1319
1320         output = ss.get_latencies()
1321
1322         self.failUnlessEqual(sorted(output.keys()),
1323                              sorted(["allocate", "renew", "cancel", "write", "get"]))
1324         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1325         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
1326         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
1327         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
1328         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
1329         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
1330         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
1331         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
1332         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
1333
1334         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1335         self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
1336         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1, output)
1337         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
1338         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
1339         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
1340         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
1341         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
1342         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
1343
1344         self.failUnlessEqual(len(ss.latencies["write"]), 20)
1345         self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
1346         self.failUnless(output["write"]["01_0_percentile"] is None, output)
1347         self.failUnless(abs(output["write"]["10_0_percentile"] -  2) < 1, output)
1348         self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
1349         self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
1350         self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
1351         self.failUnless(output["write"]["99_0_percentile"] is None, output)
1352         self.failUnless(output["write"]["99_9_percentile"] is None, output)
1353
1354         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1355         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
1356         self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
1357         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1, output)
1358         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
1359         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
1360         self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
1361         self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
1362         self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
1363
1364         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1365         self.failUnless(output["get"]["mean"] is None, output)
1366         self.failUnless(output["get"]["01_0_percentile"] is None, output)
1367         self.failUnless(output["get"]["10_0_percentile"] is None, output)
1368         self.failUnless(output["get"]["50_0_percentile"] is None, output)
1369         self.failUnless(output["get"]["90_0_percentile"] is None, output)
1370         self.failUnless(output["get"]["95_0_percentile"] is None, output)
1371         self.failUnless(output["get"]["99_0_percentile"] is None, output)
1372         self.failUnless(output["get"]["99_9_percentile"] is None, output)
1373
1374 def remove_tags(s):
1375     s = re.sub(r'<[^>]*>', ' ', s)
1376     s = re.sub(r'\s+', ' ', s)
1377     return s
1378
1379 class MyBucketCountingCrawler(BucketCountingCrawler):
1380     def finished_prefix(self, cycle, prefix):
1381         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1382         if self.hook_ds:
1383             d = self.hook_ds.pop(0)
1384             d.callback(None)
1385
1386 class MyStorageServer(StorageServer):
1387     def add_bucket_counter(self):
1388         statefile = os.path.join(self.storedir, "bucket_counter.state")
1389         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1390         self.bucket_counter.setServiceParent(self)
1391
1392 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1393
1394     def setUp(self):
1395         self.s = service.MultiService()
1396         self.s.startService()
1397     def tearDown(self):
1398         return self.s.stopService()
1399
1400     def test_bucket_counter(self):
1401         basedir = "storage/BucketCounter/bucket_counter"
1402         fileutil.make_dirs(basedir)
1403         ss = StorageServer(basedir, "\x00" * 20)
1404         # to make sure we capture the bucket-counting-crawler in the middle
1405         # of a cycle, we reach in and reduce its maximum slice time to 0. We
1406         # also make it start sooner than usual.
1407         ss.bucket_counter.slow_start = 0
1408         orig_cpu_slice = ss.bucket_counter.cpu_slice
1409         ss.bucket_counter.cpu_slice = 0
1410         ss.setServiceParent(self.s)
1411
1412         w = StorageStatus(ss)
1413
1414         # this sample is before the crawler has started doing anything
1415         html = w.renderSynchronously()
1416         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
1417         s = remove_tags(html)
1418         self.failUnlessIn("Accepting new shares: Yes", s)
1419         self.failUnlessIn("Reserved space: - 0 B (0)", s)
1420         self.failUnlessIn("Total buckets: Not computed yet", s)
1421         self.failUnlessIn("Next crawl in", s)
1422
1423         # give the bucket-counting-crawler one tick to get started. The
1424         # cpu_slice=0 will force it to yield right after it processes the
1425         # first prefix
1426
1427         d = fireEventually()
1428         def _check(ignored):
1429             # are we really right after the first prefix?
1430             state = ss.bucket_counter.get_state()
1431             if state["last-complete-prefix"] is None:
1432                 d2 = fireEventually()
1433                 d2.addCallback(_check)
1434                 return d2
1435             self.failUnlessEqual(state["last-complete-prefix"],
1436                                  ss.bucket_counter.prefixes[0])
1437             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1438             html = w.renderSynchronously()
1439             s = remove_tags(html)
1440             self.failUnlessIn(" Current crawl ", s)
1441             self.failUnlessIn(" (next work in ", s)
1442         d.addCallback(_check)
1443
1444         # now give it enough time to complete a full cycle
1445         def _watch():
1446             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1447         d.addCallback(lambda ignored: self.poll(_watch))
1448         def _check2(ignored):
1449             ss.bucket_counter.cpu_slice = orig_cpu_slice
1450             html = w.renderSynchronously()
1451             s = remove_tags(html)
1452             self.failUnlessIn("Total buckets: 0 (the number of", s)
1453             self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
1454         d.addCallback(_check2)
1455         return d
1456
1457     def test_bucket_counter_cleanup(self):
1458         basedir = "storage/BucketCounter/bucket_counter_cleanup"
1459         fileutil.make_dirs(basedir)
1460         ss = StorageServer(basedir, "\x00" * 20)
1461         # to make sure we capture the bucket-counting-crawler in the middle
1462         # of a cycle, we reach in and reduce its maximum slice time to 0.
1463         ss.bucket_counter.slow_start = 0
1464         orig_cpu_slice = ss.bucket_counter.cpu_slice
1465         ss.bucket_counter.cpu_slice = 0
1466         ss.setServiceParent(self.s)
1467
1468         d = fireEventually()
1469
1470         def _after_first_prefix(ignored):
1471             state = ss.bucket_counter.state
1472             if state["last-complete-prefix"] is None:
1473                 d2 = fireEventually()
1474                 d2.addCallback(_after_first_prefix)
1475                 return d2
1476             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1477             # now sneak in and mess with its state, to make sure it cleans up
1478             # properly at the end of the cycle
1479             self.failUnlessEqual(state["last-complete-prefix"],
1480                                  ss.bucket_counter.prefixes[0])
1481             state["bucket-counts"][-12] = {}
1482             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1483             ss.bucket_counter.save_state()
1484         d.addCallback(_after_first_prefix)
1485
1486         # now give it enough time to complete a cycle
1487         def _watch():
1488             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1489         d.addCallback(lambda ignored: self.poll(_watch))
1490         def _check2(ignored):
1491             ss.bucket_counter.cpu_slice = orig_cpu_slice
1492             s = ss.bucket_counter.get_state()
1493             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1494             self.failIf("bogusprefix!" in s["storage-index-samples"],
1495                         s["storage-index-samples"].keys())
1496         d.addCallback(_check2)
1497         return d
1498
1499     def test_bucket_counter_eta(self):
1500         basedir = "storage/BucketCounter/bucket_counter_eta"
1501         fileutil.make_dirs(basedir)
1502         ss = MyStorageServer(basedir, "\x00" * 20)
1503         ss.bucket_counter.slow_start = 0
1504         # these will be fired inside finished_prefix()
1505         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1506         w = StorageStatus(ss)
1507
1508         d = defer.Deferred()
1509
1510         def _check_1(ignored):
1511             # no ETA is available yet
1512             html = w.renderSynchronously()
1513             s = remove_tags(html)
1514             self.failUnlessIn("complete (next work", s)
1515
1516         def _check_2(ignored):
1517             # one prefix has finished, so an ETA based upon that elapsed time
1518             # should be available.
1519             html = w.renderSynchronously()
1520             s = remove_tags(html)
1521             self.failUnlessIn("complete (ETA ", s)
1522
1523         def _check_3(ignored):
1524             # two prefixes have finished
1525             html = w.renderSynchronously()
1526             s = remove_tags(html)
1527             self.failUnlessIn("complete (ETA ", s)
1528             d.callback("done")
1529
1530         hooks[0].addCallback(_check_1).addErrback(d.errback)
1531         hooks[1].addCallback(_check_2).addErrback(d.errback)
1532         hooks[2].addCallback(_check_3).addErrback(d.errback)
1533
1534         ss.setServiceParent(self.s)
1535         return d
1536
1537 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1538     stop_after_first_bucket = False
1539     def process_bucket(self, *args, **kwargs):
1540         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1541         if self.stop_after_first_bucket:
1542             self.stop_after_first_bucket = False
1543             self.cpu_slice = -1.0
1544     def yielding(self, sleep_time):
1545         if not self.stop_after_first_bucket:
1546             self.cpu_slice = 500
1547
1548 class BrokenStatResults:
1549     pass
1550 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1551     def stat(self, fn):
1552         s = os.stat(fn)
1553         bsr = BrokenStatResults()
1554         for attrname in dir(s):
1555             if attrname.startswith("_"):
1556                 continue
1557             if attrname == "st_blocks":
1558                 continue
1559             setattr(bsr, attrname, getattr(s, attrname))
1560         return bsr
1561
1562 class InstrumentedStorageServer(StorageServer):
1563     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1564 class No_ST_BLOCKS_StorageServer(StorageServer):
1565     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1566
1567 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1568
1569     def setUp(self):
1570         self.s = service.MultiService()
1571         self.s.startService()
1572     def tearDown(self):
1573         return self.s.stopService()
1574
1575     def make_shares(self, ss):
1576         def make(si):
1577             return (si, hashutil.tagged_hash("renew", si),
1578                     hashutil.tagged_hash("cancel", si))
1579         def make_mutable(si):
1580             return (si, hashutil.tagged_hash("renew", si),
1581                     hashutil.tagged_hash("cancel", si),
1582                     hashutil.tagged_hash("write-enabler", si))
1583         def make_extra_lease(si, num):
1584             return (hashutil.tagged_hash("renew-%d" % num, si),
1585                     hashutil.tagged_hash("cancel-%d" % num, si))
1586
1587         immutable_si_0, rs0, cs0 = make("\x00" * 16)
1588         immutable_si_1, rs1, cs1 = make("\x01" * 16)
1589         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1590         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1591         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1592         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1593         sharenums = [0]
1594         canary = FakeCanary()
1595         # note: 'tahoe debug dump-share' will not handle this file, since the
1596         # inner contents are not a valid CHK share
1597         data = "\xff" * 1000
1598
1599         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1600                                          1000, canary)
1601         w[0].remote_write(0, data)
1602         w[0].remote_close()
1603
1604         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1605                                          1000, canary)
1606         w[0].remote_write(0, data)
1607         w[0].remote_close()
1608         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1609
1610         writev = ss.remote_slot_testv_and_readv_and_writev
1611         writev(mutable_si_2, (we2, rs2, cs2),
1612                {0: ([], [(0,data)], len(data))}, [])
1613         writev(mutable_si_3, (we3, rs3, cs3),
1614                {0: ([], [(0,data)], len(data))}, [])
1615         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1616
1617         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1618         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1619         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1620
1621     def test_basic(self):
1622         basedir = "storage/LeaseCrawler/basic"
1623         fileutil.make_dirs(basedir)
1624         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1625         # make it start sooner than usual.
1626         lc = ss.lease_checker
1627         lc.slow_start = 0
1628         lc.cpu_slice = 500
1629         lc.stop_after_first_bucket = True
1630         webstatus = StorageStatus(ss)
1631
1632         # create a few shares, with some leases on them
1633         self.make_shares(ss)
1634         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1635
1636         # add a non-sharefile to exercise another code path
1637         fn = os.path.join(ss.sharedir,
1638                           storage_index_to_dir(immutable_si_0),
1639                           "not-a-share")
1640         f = open(fn, "wb")
1641         f.write("I am not a share.\n")
1642         f.close()
1643
1644         # this is before the crawl has started, so we're not in a cycle yet
1645         initial_state = lc.get_state()
1646         self.failIf(lc.get_progress()["cycle-in-progress"])
1647         self.failIfIn("cycle-to-date", initial_state)
1648         self.failIfIn("estimated-remaining-cycle", initial_state)
1649         self.failIfIn("estimated-current-cycle", initial_state)
1650         self.failUnlessIn("history", initial_state)
1651         self.failUnlessEqual(initial_state["history"], {})
1652
1653         ss.setServiceParent(self.s)
1654
1655         DAY = 24*60*60
1656
1657         d = fireEventually()
1658
1659         # now examine the state right after the first bucket has been
1660         # processed.
1661         def _after_first_bucket(ignored):
1662             initial_state = lc.get_state()
1663             if "cycle-to-date" not in initial_state:
1664                 d2 = fireEventually()
1665                 d2.addCallback(_after_first_bucket)
1666                 return d2
1667             self.failUnlessIn("cycle-to-date", initial_state)
1668             self.failUnlessIn("estimated-remaining-cycle", initial_state)
1669             self.failUnlessIn("estimated-current-cycle", initial_state)
1670             self.failUnlessIn("history", initial_state)
1671             self.failUnlessEqual(initial_state["history"], {})
1672
1673             so_far = initial_state["cycle-to-date"]
1674             self.failUnlessEqual(so_far["expiration-enabled"], False)
1675             self.failUnlessIn("configured-expiration-mode", so_far)
1676             self.failUnlessIn("lease-age-histogram", so_far)
1677             lah = so_far["lease-age-histogram"]
1678             self.failUnlessEqual(type(lah), list)
1679             self.failUnlessEqual(len(lah), 1)
1680             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1681             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1682             self.failUnlessEqual(so_far["corrupt-shares"], [])
1683             sr1 = so_far["space-recovered"]
1684             self.failUnlessEqual(sr1["examined-buckets"], 1)
1685             self.failUnlessEqual(sr1["examined-shares"], 1)
1686             self.failUnlessEqual(sr1["actual-shares"], 0)
1687             self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1688             self.failUnlessEqual(sr1["original-sharebytes"], 0)
1689             left = initial_state["estimated-remaining-cycle"]
1690             sr2 = left["space-recovered"]
1691             self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1692             self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1693             self.failIfEqual(sr2["actual-shares"], None)
1694             self.failIfEqual(sr2["configured-diskbytes"], None)
1695             self.failIfEqual(sr2["original-sharebytes"], None)
1696         d.addCallback(_after_first_bucket)
1697         d.addCallback(lambda ign: self.render1(webstatus))
1698         def _check_html_in_cycle(html):
1699             s = remove_tags(html)
1700             self.failUnlessIn("So far, this cycle has examined "
1701                               "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1702             self.failUnlessIn("and has recovered: "
1703                               "0 shares, 0 buckets (0 mutable / 0 immutable), "
1704                               "0 B (0 B / 0 B)", s)
1705             self.failUnlessIn("If expiration were enabled, "
1706                               "we would have recovered: "
1707                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1708                               " 0 B (0 B / 0 B) by now", s)
1709             self.failUnlessIn("and the remainder of this cycle "
1710                               "would probably recover: "
1711                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1712                               " 0 B (0 B / 0 B)", s)
1713             self.failUnlessIn("and the whole cycle would probably recover: "
1714                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
1715                               " 0 B (0 B / 0 B)", s)
1716             self.failUnlessIn("if we were strictly using each lease's default "
1717                               "31-day lease lifetime", s)
1718             self.failUnlessIn("this cycle would be expected to recover: ", s)
1719         d.addCallback(_check_html_in_cycle)
1720
1721         # wait for the crawler to finish the first cycle. Nothing should have
1722         # been removed.
1723         def _wait():
1724             return bool(lc.get_state()["last-cycle-finished"] is not None)
1725         d.addCallback(lambda ign: self.poll(_wait))
1726
1727         def _after_first_cycle(ignored):
1728             s = lc.get_state()
1729             self.failIf("cycle-to-date" in s)
1730             self.failIf("estimated-remaining-cycle" in s)
1731             self.failIf("estimated-current-cycle" in s)
1732             last = s["history"][0]
1733             self.failUnlessIn("cycle-start-finish-times", last)
1734             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1735             self.failUnlessEqual(last["expiration-enabled"], False)
1736             self.failUnlessIn("configured-expiration-mode", last)
1737
1738             self.failUnlessIn("lease-age-histogram", last)
1739             lah = last["lease-age-histogram"]
1740             self.failUnlessEqual(type(lah), list)
1741             self.failUnlessEqual(len(lah), 1)
1742             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1743
1744             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1745             self.failUnlessEqual(last["corrupt-shares"], [])
1746
1747             rec = last["space-recovered"]
1748             self.failUnlessEqual(rec["examined-buckets"], 4)
1749             self.failUnlessEqual(rec["examined-shares"], 4)
1750             self.failUnlessEqual(rec["actual-buckets"], 0)
1751             self.failUnlessEqual(rec["original-buckets"], 0)
1752             self.failUnlessEqual(rec["configured-buckets"], 0)
1753             self.failUnlessEqual(rec["actual-shares"], 0)
1754             self.failUnlessEqual(rec["original-shares"], 0)
1755             self.failUnlessEqual(rec["configured-shares"], 0)
1756             self.failUnlessEqual(rec["actual-diskbytes"], 0)
1757             self.failUnlessEqual(rec["original-diskbytes"], 0)
1758             self.failUnlessEqual(rec["configured-diskbytes"], 0)
1759             self.failUnlessEqual(rec["actual-sharebytes"], 0)
1760             self.failUnlessEqual(rec["original-sharebytes"], 0)
1761             self.failUnlessEqual(rec["configured-sharebytes"], 0)
1762
1763             def _get_sharefile(si):
1764                 return list(ss._iter_share_files(si))[0]
1765             def count_leases(si):
1766                 return len(list(_get_sharefile(si).get_leases()))
1767             self.failUnlessEqual(count_leases(immutable_si_0), 1)
1768             self.failUnlessEqual(count_leases(immutable_si_1), 2)
1769             self.failUnlessEqual(count_leases(mutable_si_2), 1)
1770             self.failUnlessEqual(count_leases(mutable_si_3), 2)
1771         d.addCallback(_after_first_cycle)
1772         d.addCallback(lambda ign: self.render1(webstatus))
1773         def _check_html(html):
1774             s = remove_tags(html)
1775             self.failUnlessIn("recovered: 0 shares, 0 buckets "
1776                               "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
1777             self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
1778                               "(2 mutable / 2 immutable),", s)
1779             self.failUnlessIn("but expiration was not enabled", s)
1780         d.addCallback(_check_html)
1781         d.addCallback(lambda ign: self.render_json(webstatus))
1782         def _check_json(json):
1783             data = simplejson.loads(json)
1784             self.failUnlessIn("lease-checker", data)
1785             self.failUnlessIn("lease-checker-progress", data)
1786         d.addCallback(_check_json)
1787         return d
1788
1789     def backdate_lease(self, sf, renew_secret, new_expire_time):
1790         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1791         # "renew" a lease with a new_expire_time that is older than what the
1792         # current lease has), so we have to reach inside it.
1793         for i,lease in enumerate(sf.get_leases()):
1794             if lease.renew_secret == renew_secret:
1795                 lease.expiration_time = new_expire_time
1796                 f = open(sf.home, 'rb+')
1797                 sf._write_lease_record(f, i, lease)
1798                 f.close()
1799                 return
1800         raise IndexError("unable to renew non-existent lease")
1801
1802     def test_expire_age(self):
1803         basedir = "storage/LeaseCrawler/expire_age"
1804         fileutil.make_dirs(basedir)
1805         # setting expiration_time to 2000 means that any lease which is more
1806         # than 2000s old will be expired.
1807         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1808                                        expiration_enabled=True,
1809                                        expiration_mode="age",
1810                                        expiration_override_lease_duration=2000)
1811         # make it start sooner than usual.
1812         lc = ss.lease_checker
1813         lc.slow_start = 0
1814         lc.stop_after_first_bucket = True
1815         webstatus = StorageStatus(ss)
1816
1817         # create a few shares, with some leases on them
1818         self.make_shares(ss)
1819         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1820
1821         def count_shares(si):
1822             return len(list(ss._iter_share_files(si)))
1823         def _get_sharefile(si):
1824             return list(ss._iter_share_files(si))[0]
1825         def count_leases(si):
1826             return len(list(_get_sharefile(si).get_leases()))
1827
1828         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1829         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1830         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1831         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1832         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1833         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1834         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1835         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1836
1837         # artificially crank back the expiration time on the first lease of
1838         # each share, to make it look like it expired already (age=1000s).
1839         # Some shares have an extra lease which is set to expire at the
1840         # default time in 31 days from now (age=31days). We then run the
1841         # crawler, which will expire the first lease, making some shares get
1842         # deleted and others stay alive (with one remaining lease)
1843         now = time.time()
1844
1845         sf0 = _get_sharefile(immutable_si_0)
1846         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1847         sf0_size = os.stat(sf0.home).st_size
1848
1849         # immutable_si_1 gets an extra lease
1850         sf1 = _get_sharefile(immutable_si_1)
1851         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1852
1853         sf2 = _get_sharefile(mutable_si_2)
1854         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1855         sf2_size = os.stat(sf2.home).st_size
1856
1857         # mutable_si_3 gets an extra lease
1858         sf3 = _get_sharefile(mutable_si_3)
1859         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1860
1861         ss.setServiceParent(self.s)
1862
1863         d = fireEventually()
1864         # examine the state right after the first bucket has been processed
1865         def _after_first_bucket(ignored):
1866             p = lc.get_progress()
1867             if not p["cycle-in-progress"]:
1868                 d2 = fireEventually()
1869                 d2.addCallback(_after_first_bucket)
1870                 return d2
1871         d.addCallback(_after_first_bucket)
1872         d.addCallback(lambda ign: self.render1(webstatus))
1873         def _check_html_in_cycle(html):
1874             s = remove_tags(html)
1875             # the first bucket encountered gets deleted, and its prefix
1876             # happens to be about 1/5th of the way through the ring, so the
1877             # predictor thinks we'll have 5 shares and that we'll delete them
1878             # all. This part of the test depends upon the SIs landing right
1879             # where they do now.
1880             self.failUnlessIn("The remainder of this cycle is expected to "
1881                               "recover: 4 shares, 4 buckets", s)
1882             self.failUnlessIn("The whole cycle is expected to examine "
1883                               "5 shares in 5 buckets and to recover: "
1884                               "5 shares, 5 buckets", s)
1885         d.addCallback(_check_html_in_cycle)
1886
1887         # wait for the crawler to finish the first cycle. Two shares should
1888         # have been removed
1889         def _wait():
1890             return bool(lc.get_state()["last-cycle-finished"] is not None)
1891         d.addCallback(lambda ign: self.poll(_wait))
1892
1893         def _after_first_cycle(ignored):
1894             self.failUnlessEqual(count_shares(immutable_si_0), 0)
1895             self.failUnlessEqual(count_shares(immutable_si_1), 1)
1896             self.failUnlessEqual(count_leases(immutable_si_1), 1)
1897             self.failUnlessEqual(count_shares(mutable_si_2), 0)
1898             self.failUnlessEqual(count_shares(mutable_si_3), 1)
1899             self.failUnlessEqual(count_leases(mutable_si_3), 1)
1900
1901             s = lc.get_state()
1902             last = s["history"][0]
1903
1904             self.failUnlessEqual(last["expiration-enabled"], True)
1905             self.failUnlessEqual(last["configured-expiration-mode"],
1906                                  ("age", 2000, None, ("mutable", "immutable")))
1907             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1908
1909             rec = last["space-recovered"]
1910             self.failUnlessEqual(rec["examined-buckets"], 4)
1911             self.failUnlessEqual(rec["examined-shares"], 4)
1912             self.failUnlessEqual(rec["actual-buckets"], 2)
1913             self.failUnlessEqual(rec["original-buckets"], 2)
1914             self.failUnlessEqual(rec["configured-buckets"], 2)
1915             self.failUnlessEqual(rec["actual-shares"], 2)
1916             self.failUnlessEqual(rec["original-shares"], 2)
1917             self.failUnlessEqual(rec["configured-shares"], 2)
1918             size = sf0_size + sf2_size
1919             self.failUnlessEqual(rec["actual-sharebytes"], size)
1920             self.failUnlessEqual(rec["original-sharebytes"], size)
1921             self.failUnlessEqual(rec["configured-sharebytes"], size)
1922             # different platforms have different notions of "blocks used by
1923             # this file", so merely assert that it's a number
1924             self.failUnless(rec["actual-diskbytes"] >= 0,
1925                             rec["actual-diskbytes"])
1926             self.failUnless(rec["original-diskbytes"] >= 0,
1927                             rec["original-diskbytes"])
1928             self.failUnless(rec["configured-diskbytes"] >= 0,
1929                             rec["configured-diskbytes"])
1930         d.addCallback(_after_first_cycle)
1931         d.addCallback(lambda ign: self.render1(webstatus))
1932         def _check_html(html):
1933             s = remove_tags(html)
1934             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1935             self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1936             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1937         d.addCallback(_check_html)
1938         return d
1939
1940     def test_expire_cutoff_date(self):
1941         basedir = "storage/LeaseCrawler/expire_cutoff_date"
1942         fileutil.make_dirs(basedir)
1943         # setting cutoff-date to 2000 seconds ago means that any lease which
1944         # is more than 2000s old will be expired.
1945         now = time.time()
1946         then = int(now - 2000)
1947         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1948                                        expiration_enabled=True,
1949                                        expiration_mode="cutoff-date",
1950                                        expiration_cutoff_date=then)
1951         # make it start sooner than usual.
1952         lc = ss.lease_checker
1953         lc.slow_start = 0
1954         lc.stop_after_first_bucket = True
1955         webstatus = StorageStatus(ss)
1956
1957         # create a few shares, with some leases on them
1958         self.make_shares(ss)
1959         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1960
1961         def count_shares(si):
1962             return len(list(ss._iter_share_files(si)))
1963         def _get_sharefile(si):
1964             return list(ss._iter_share_files(si))[0]
1965         def count_leases(si):
1966             return len(list(_get_sharefile(si).get_leases()))
1967
1968         self.failUnlessEqual(count_shares(immutable_si_0), 1)
1969         self.failUnlessEqual(count_leases(immutable_si_0), 1)
1970         self.failUnlessEqual(count_shares(immutable_si_1), 1)
1971         self.failUnlessEqual(count_leases(immutable_si_1), 2)
1972         self.failUnlessEqual(count_shares(mutable_si_2), 1)
1973         self.failUnlessEqual(count_leases(mutable_si_2), 1)
1974         self.failUnlessEqual(count_shares(mutable_si_3), 1)
1975         self.failUnlessEqual(count_leases(mutable_si_3), 2)
1976
1977         # artificially crank back the expiration time on the first lease of
1978         # each share, to make it look like was renewed 3000s ago. To achieve
1979         # this, we need to set the expiration time to now-3000+31days. This
1980         # will change when the lease format is improved to contain both
1981         # create/renew time and duration.
1982         new_expiration_time = now - 3000 + 31*24*60*60
1983
1984         # Some shares have an extra lease which is set to expire at the
1985         # default time in 31 days from now (age=31days). We then run the
1986         # crawler, which will expire the first lease, making some shares get
1987         # deleted and others stay alive (with one remaining lease)
1988
1989         sf0 = _get_sharefile(immutable_si_0)
1990         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1991         sf0_size = os.stat(sf0.home).st_size
1992
1993         # immutable_si_1 gets an extra lease
1994         sf1 = _get_sharefile(immutable_si_1)
1995         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1996
1997         sf2 = _get_sharefile(mutable_si_2)
1998         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1999         sf2_size = os.stat(sf2.home).st_size
2000
2001         # mutable_si_3 gets an extra lease
2002         sf3 = _get_sharefile(mutable_si_3)
2003         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2004
2005         ss.setServiceParent(self.s)
2006
2007         d = fireEventually()
2008         # examine the state right after the first bucket has been processed
2009         def _after_first_bucket(ignored):
2010             p = lc.get_progress()
2011             if not p["cycle-in-progress"]:
2012                 d2 = fireEventually()
2013                 d2.addCallback(_after_first_bucket)
2014                 return d2
2015         d.addCallback(_after_first_bucket)
2016         d.addCallback(lambda ign: self.render1(webstatus))
2017         def _check_html_in_cycle(html):
2018             s = remove_tags(html)
2019             # the first bucket encountered gets deleted, and its prefix
2020             # happens to be about 1/5th of the way through the ring, so the
2021             # predictor thinks we'll have 5 shares and that we'll delete them
2022             # all. This part of the test depends upon the SIs landing right
2023             # where they do now.
2024             self.failUnlessIn("The remainder of this cycle is expected to "
2025                               "recover: 4 shares, 4 buckets", s)
2026             self.failUnlessIn("The whole cycle is expected to examine "
2027                               "5 shares in 5 buckets and to recover: "
2028                               "5 shares, 5 buckets", s)
2029         d.addCallback(_check_html_in_cycle)
2030
2031         # wait for the crawler to finish the first cycle. Two shares should
2032         # have been removed
2033         def _wait():
2034             return bool(lc.get_state()["last-cycle-finished"] is not None)
2035         d.addCallback(lambda ign: self.poll(_wait))
2036
2037         def _after_first_cycle(ignored):
2038             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2039             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2040             self.failUnlessEqual(count_leases(immutable_si_1), 1)
2041             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2042             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2043             self.failUnlessEqual(count_leases(mutable_si_3), 1)
2044
2045             s = lc.get_state()
2046             last = s["history"][0]
2047
2048             self.failUnlessEqual(last["expiration-enabled"], True)
2049             self.failUnlessEqual(last["configured-expiration-mode"],
2050                                  ("cutoff-date", None, then,
2051                                   ("mutable", "immutable")))
2052             self.failUnlessEqual(last["leases-per-share-histogram"],
2053                                  {1: 2, 2: 2})
2054
2055             rec = last["space-recovered"]
2056             self.failUnlessEqual(rec["examined-buckets"], 4)
2057             self.failUnlessEqual(rec["examined-shares"], 4)
2058             self.failUnlessEqual(rec["actual-buckets"], 2)
2059             self.failUnlessEqual(rec["original-buckets"], 0)
2060             self.failUnlessEqual(rec["configured-buckets"], 2)
2061             self.failUnlessEqual(rec["actual-shares"], 2)
2062             self.failUnlessEqual(rec["original-shares"], 0)
2063             self.failUnlessEqual(rec["configured-shares"], 2)
2064             size = sf0_size + sf2_size
2065             self.failUnlessEqual(rec["actual-sharebytes"], size)
2066             self.failUnlessEqual(rec["original-sharebytes"], 0)
2067             self.failUnlessEqual(rec["configured-sharebytes"], size)
2068             # different platforms have different notions of "blocks used by
2069             # this file", so merely assert that it's a number
2070             self.failUnless(rec["actual-diskbytes"] >= 0,
2071                             rec["actual-diskbytes"])
2072             self.failUnless(rec["original-diskbytes"] >= 0,
2073                             rec["original-diskbytes"])
2074             self.failUnless(rec["configured-diskbytes"] >= 0,
2075                             rec["configured-diskbytes"])
2076         d.addCallback(_after_first_cycle)
2077         d.addCallback(lambda ign: self.render1(webstatus))
2078         def _check_html(html):
2079             s = remove_tags(html)
2080             self.failUnlessIn("Expiration Enabled:"
2081                               " expired leases will be removed", s)
2082             date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2083             substr = "Leases created or last renewed before %s will be considered expired." % date
2084             self.failUnlessIn(substr, s)
2085             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2086         d.addCallback(_check_html)
2087         return d
2088
2089     def test_only_immutable(self):
2090         basedir = "storage/LeaseCrawler/only_immutable"
2091         fileutil.make_dirs(basedir)
2092         now = time.time()
2093         then = int(now - 2000)
2094         ss = StorageServer(basedir, "\x00" * 20,
2095                            expiration_enabled=True,
2096                            expiration_mode="cutoff-date",
2097                            expiration_cutoff_date=then,
2098                            expiration_sharetypes=("immutable",))
2099         lc = ss.lease_checker
2100         lc.slow_start = 0
2101         webstatus = StorageStatus(ss)
2102
2103         self.make_shares(ss)
2104         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2105         # set all leases to be expirable
2106         new_expiration_time = now - 3000 + 31*24*60*60
2107
2108         def count_shares(si):
2109             return len(list(ss._iter_share_files(si)))
2110         def _get_sharefile(si):
2111             return list(ss._iter_share_files(si))[0]
2112         def count_leases(si):
2113             return len(list(_get_sharefile(si).get_leases()))
2114
2115         sf0 = _get_sharefile(immutable_si_0)
2116         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2117         sf1 = _get_sharefile(immutable_si_1)
2118         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2119         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2120         sf2 = _get_sharefile(mutable_si_2)
2121         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2122         sf3 = _get_sharefile(mutable_si_3)
2123         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2124         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2125
2126         ss.setServiceParent(self.s)
2127         def _wait():
2128             return bool(lc.get_state()["last-cycle-finished"] is not None)
2129         d = self.poll(_wait)
2130
2131         def _after_first_cycle(ignored):
2132             self.failUnlessEqual(count_shares(immutable_si_0), 0)
2133             self.failUnlessEqual(count_shares(immutable_si_1), 0)
2134             self.failUnlessEqual(count_shares(mutable_si_2), 1)
2135             self.failUnlessEqual(count_leases(mutable_si_2), 1)
2136             self.failUnlessEqual(count_shares(mutable_si_3), 1)
2137             self.failUnlessEqual(count_leases(mutable_si_3), 2)
2138         d.addCallback(_after_first_cycle)
2139         d.addCallback(lambda ign: self.render1(webstatus))
2140         def _check_html(html):
2141             s = remove_tags(html)
2142             self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2143         d.addCallback(_check_html)
2144         return d
2145
2146     def test_only_mutable(self):
2147         basedir = "storage/LeaseCrawler/only_mutable"
2148         fileutil.make_dirs(basedir)
2149         now = time.time()
2150         then = int(now - 2000)
2151         ss = StorageServer(basedir, "\x00" * 20,
2152                            expiration_enabled=True,
2153                            expiration_mode="cutoff-date",
2154                            expiration_cutoff_date=then,
2155                            expiration_sharetypes=("mutable",))
2156         lc = ss.lease_checker
2157         lc.slow_start = 0
2158         webstatus = StorageStatus(ss)
2159
2160         self.make_shares(ss)
2161         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2162         # set all leases to be expirable
2163         new_expiration_time = now - 3000 + 31*24*60*60
2164
2165         def count_shares(si):
2166             return len(list(ss._iter_share_files(si)))
2167         def _get_sharefile(si):
2168             return list(ss._iter_share_files(si))[0]
2169         def count_leases(si):
2170             return len(list(_get_sharefile(si).get_leases()))
2171
2172         sf0 = _get_sharefile(immutable_si_0)
2173         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2174         sf1 = _get_sharefile(immutable_si_1)
2175         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2176         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2177         sf2 = _get_sharefile(mutable_si_2)
2178         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2179         sf3 = _get_sharefile(mutable_si_3)
2180         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2181         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2182
2183         ss.setServiceParent(self.s)
2184         def _wait():
2185             return bool(lc.get_state()["last-cycle-finished"] is not None)
2186         d = self.poll(_wait)
2187
2188         def _after_first_cycle(ignored):
2189             self.failUnlessEqual(count_shares(immutable_si_0), 1)
2190             self.failUnlessEqual(count_leases(immutable_si_0), 1)
2191             self.failUnlessEqual(count_shares(immutable_si_1), 1)
2192             self.failUnlessEqual(count_leases(immutable_si_1), 2)
2193             self.failUnlessEqual(count_shares(mutable_si_2), 0)
2194             self.failUnlessEqual(count_shares(mutable_si_3), 0)
2195         d.addCallback(_after_first_cycle)
2196         d.addCallback(lambda ign: self.render1(webstatus))
2197         def _check_html(html):
2198             s = remove_tags(html)
2199             self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2200         d.addCallback(_check_html)
2201         return d
2202
2203     def test_bad_mode(self):
2204         basedir = "storage/LeaseCrawler/bad_mode"
2205         fileutil.make_dirs(basedir)
2206         e = self.failUnlessRaises(ValueError,
2207                                   StorageServer, basedir, "\x00" * 20,
2208                                   expiration_mode="bogus")
2209         self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
2210
2211     def test_parse_duration(self):
2212         DAY = 24*60*60
2213         MONTH = 31*DAY
2214         YEAR = 365*DAY
2215         p = time_format.parse_duration
2216         self.failUnlessEqual(p("7days"), 7*DAY)
2217         self.failUnlessEqual(p("31day"), 31*DAY)
2218         self.failUnlessEqual(p("60 days"), 60*DAY)
2219         self.failUnlessEqual(p("2mo"), 2*MONTH)
2220         self.failUnlessEqual(p("3 month"), 3*MONTH)
2221         self.failUnlessEqual(p("2years"), 2*YEAR)
2222         e = self.failUnlessRaises(ValueError, p, "2kumquats")
2223         self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
2224
2225     def test_parse_date(self):
2226         p = time_format.parse_date
2227         self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
2228         self.failUnlessEqual(p("2009-03-18"), 1237334400)
2229
2230     def test_limited_history(self):
2231         basedir = "storage/LeaseCrawler/limited_history"
2232         fileutil.make_dirs(basedir)
2233         ss = StorageServer(basedir, "\x00" * 20)
2234         # make it start sooner than usual.
2235         lc = ss.lease_checker
2236         lc.slow_start = 0
2237         lc.cpu_slice = 500
2238
2239         # create a few shares, with some leases on them
2240         self.make_shares(ss)
2241
2242         ss.setServiceParent(self.s)
2243
2244         def _wait_until_15_cycles_done():
2245             last = lc.state["last-cycle-finished"]
2246             if last is not None and last >= 15:
2247                 return True
2248             if lc.timer:
2249                 lc.timer.reset(0)
2250             return False
2251         d = self.poll(_wait_until_15_cycles_done)
2252
2253         def _check(ignored):
2254             s = lc.get_state()
2255             h = s["history"]
2256             self.failUnlessEqual(len(h), 10)
2257             self.failUnlessEqual(max(h.keys()), 15)
2258             self.failUnlessEqual(min(h.keys()), 6)
2259         d.addCallback(_check)
2260         return d
2261
2262     def test_unpredictable_future(self):
2263         basedir = "storage/LeaseCrawler/unpredictable_future"
2264         fileutil.make_dirs(basedir)
2265         ss = StorageServer(basedir, "\x00" * 20)
2266         # make it start sooner than usual.
2267         lc = ss.lease_checker
2268         lc.slow_start = 0
2269         lc.cpu_slice = -1.0 # stop quickly
2270
2271         self.make_shares(ss)
2272
2273         ss.setServiceParent(self.s)
2274
2275         d = fireEventually()
2276         def _check(ignored):
2277             # this should fire after the first bucket is complete, but before
2278             # the first prefix is complete, so the progress-measurer won't
2279             # think we've gotten far enough to raise our percent-complete
2280             # above 0%, triggering the cannot-predict-the-future code in
2281             # expirer.py . This will have to change if/when the
2282             # progress-measurer gets smart enough to count buckets (we'll
2283             # have to interrupt it even earlier, before it's finished the
2284             # first bucket).
2285             s = lc.get_state()
2286             if "cycle-to-date" not in s:
2287                 d2 = fireEventually()
2288                 d2.addCallback(_check)
2289                 return d2
2290             self.failUnlessIn("cycle-to-date", s)
2291             self.failUnlessIn("estimated-remaining-cycle", s)
2292             self.failUnlessIn("estimated-current-cycle", s)
2293
2294             left = s["estimated-remaining-cycle"]["space-recovered"]
2295             self.failUnlessEqual(left["actual-buckets"], None)
2296             self.failUnlessEqual(left["original-buckets"], None)
2297             self.failUnlessEqual(left["configured-buckets"], None)
2298             self.failUnlessEqual(left["actual-shares"], None)
2299             self.failUnlessEqual(left["original-shares"], None)
2300             self.failUnlessEqual(left["configured-shares"], None)
2301             self.failUnlessEqual(left["actual-diskbytes"], None)
2302             self.failUnlessEqual(left["original-diskbytes"], None)
2303             self.failUnlessEqual(left["configured-diskbytes"], None)
2304             self.failUnlessEqual(left["actual-sharebytes"], None)
2305             self.failUnlessEqual(left["original-sharebytes"], None)
2306             self.failUnlessEqual(left["configured-sharebytes"], None)
2307
2308             full = s["estimated-remaining-cycle"]["space-recovered"]
2309             self.failUnlessEqual(full["actual-buckets"], None)
2310             self.failUnlessEqual(full["original-buckets"], None)
2311             self.failUnlessEqual(full["configured-buckets"], None)
2312             self.failUnlessEqual(full["actual-shares"], None)
2313             self.failUnlessEqual(full["original-shares"], None)
2314             self.failUnlessEqual(full["configured-shares"], None)
2315             self.failUnlessEqual(full["actual-diskbytes"], None)
2316             self.failUnlessEqual(full["original-diskbytes"], None)
2317             self.failUnlessEqual(full["configured-diskbytes"], None)
2318             self.failUnlessEqual(full["actual-sharebytes"], None)
2319             self.failUnlessEqual(full["original-sharebytes"], None)
2320             self.failUnlessEqual(full["configured-sharebytes"], None)
2321
2322         d.addCallback(_check)
2323         return d
2324
2325     def test_no_st_blocks(self):
2326         basedir = "storage/LeaseCrawler/no_st_blocks"
2327         fileutil.make_dirs(basedir)
2328         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2329                                         expiration_mode="age",
2330                                         expiration_override_lease_duration=-1000)
2331         # a negative expiration_time= means the "configured-"
2332         # space-recovered counts will be non-zero, since all shares will have
2333         # expired by then
2334
2335         # make it start sooner than usual.
2336         lc = ss.lease_checker
2337         lc.slow_start = 0
2338
2339         self.make_shares(ss)
2340         ss.setServiceParent(self.s)
2341         def _wait():
2342             return bool(lc.get_state()["last-cycle-finished"] is not None)
2343         d = self.poll(_wait)
2344
2345         def _check(ignored):
2346             s = lc.get_state()
2347             last = s["history"][0]
2348             rec = last["space-recovered"]
2349             self.failUnlessEqual(rec["configured-buckets"], 4)
2350             self.failUnlessEqual(rec["configured-shares"], 4)
2351             self.failUnless(rec["configured-sharebytes"] > 0,
2352                             rec["configured-sharebytes"])
2353             # without the .st_blocks field in os.stat() results, we should be
2354             # reporting diskbytes==sharebytes
2355             self.failUnlessEqual(rec["configured-sharebytes"],
2356                                  rec["configured-diskbytes"])
2357         d.addCallback(_check)
2358         return d
2359
2360     def test_share_corruption(self):
2361         self._poll_should_ignore_these_errors = [
2362             UnknownMutableContainerVersionError,
2363             UnknownImmutableContainerVersionError,
2364             ]
2365         basedir = "storage/LeaseCrawler/share_corruption"
2366         fileutil.make_dirs(basedir)
2367         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2368         w = StorageStatus(ss)
2369         # make it start sooner than usual.
2370         lc = ss.lease_checker
2371         lc.stop_after_first_bucket = True
2372         lc.slow_start = 0
2373         lc.cpu_slice = 500
2374
2375         # create a few shares, with some leases on them
2376         self.make_shares(ss)
2377
2378         # now corrupt one, and make sure the lease-checker keeps going
2379         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2380         first = min(self.sis)
2381         first_b32 = base32.b2a(first)
2382         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2383         f = open(fn, "rb+")
2384         f.seek(0)
2385         f.write("BAD MAGIC")
2386         f.close()
2387         # if get_share_file() doesn't see the correct mutable magic, it
2388         # assumes the file is an immutable share, and then
2389         # immutable.ShareFile sees a bad version. So regardless of which kind
2390         # of share we corrupted, this will trigger an
2391         # UnknownImmutableContainerVersionError.
2392
2393         # also create an empty bucket
2394         empty_si = base32.b2a("\x04"*16)
2395         empty_bucket_dir = os.path.join(ss.sharedir,
2396                                         storage_index_to_dir(empty_si))
2397         fileutil.make_dirs(empty_bucket_dir)
2398
2399         ss.setServiceParent(self.s)
2400
2401         d = fireEventually()
2402
2403         # now examine the state right after the first bucket has been
2404         # processed.
2405         def _after_first_bucket(ignored):
2406             s = lc.get_state()
2407             if "cycle-to-date" not in s:
2408                 d2 = fireEventually()
2409                 d2.addCallback(_after_first_bucket)
2410                 return d2
2411             so_far = s["cycle-to-date"]
2412             rec = so_far["space-recovered"]
2413             self.failUnlessEqual(rec["examined-buckets"], 1)
2414             self.failUnlessEqual(rec["examined-shares"], 0)
2415             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2416         d.addCallback(_after_first_bucket)
2417
2418         d.addCallback(lambda ign: self.render_json(w))
2419         def _check_json(json):
2420             data = simplejson.loads(json)
2421             # grr. json turns all dict keys into strings.
2422             so_far = data["lease-checker"]["cycle-to-date"]
2423             corrupt_shares = so_far["corrupt-shares"]
2424             # it also turns all tuples into lists
2425             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2426         d.addCallback(_check_json)
2427         d.addCallback(lambda ign: self.render1(w))
2428         def _check_html(html):
2429             s = remove_tags(html)
2430             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2431         d.addCallback(_check_html)
2432
2433         def _wait():
2434             return bool(lc.get_state()["last-cycle-finished"] is not None)
2435         d.addCallback(lambda ign: self.poll(_wait))
2436
2437         def _after_first_cycle(ignored):
2438             s = lc.get_state()
2439             last = s["history"][0]
2440             rec = last["space-recovered"]
2441             self.failUnlessEqual(rec["examined-buckets"], 5)
2442             self.failUnlessEqual(rec["examined-shares"], 3)
2443             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2444         d.addCallback(_after_first_cycle)
2445         d.addCallback(lambda ign: self.render_json(w))
2446         def _check_json_history(json):
2447             data = simplejson.loads(json)
2448             last = data["lease-checker"]["history"]["0"]
2449             corrupt_shares = last["corrupt-shares"]
2450             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2451         d.addCallback(_check_json_history)
2452         d.addCallback(lambda ign: self.render1(w))
2453         def _check_html_history(html):
2454             s = remove_tags(html)
2455             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2456         d.addCallback(_check_html_history)
2457
2458         def _cleanup(res):
2459             self.flushLoggedErrors(UnknownMutableContainerVersionError,
2460                                    UnknownImmutableContainerVersionError)
2461             return res
2462         d.addBoth(_cleanup)
2463         return d
2464
2465     def render_json(self, page):
2466         d = self.render1(page, args={"t": ["json"]})
2467         return d
2468
2469 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2470
2471     def setUp(self):
2472         self.s = service.MultiService()
2473         self.s.startService()
2474     def tearDown(self):
2475         return self.s.stopService()
2476
2477     def test_no_server(self):
2478         w = StorageStatus(None)
2479         html = w.renderSynchronously()
2480         self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
2481
2482     def test_status(self):
2483         basedir = "storage/WebStatus/status"
2484         fileutil.make_dirs(basedir)
2485         ss = StorageServer(basedir, "\x00" * 20)
2486         ss.setServiceParent(self.s)
2487         w = StorageStatus(ss)
2488         d = self.render1(w)
2489         def _check_html(html):
2490             self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2491             s = remove_tags(html)
2492             self.failUnlessIn("Accepting new shares: Yes", s)
2493             self.failUnlessIn("Reserved space: - 0 B (0)", s)
2494         d.addCallback(_check_html)
2495         d.addCallback(lambda ign: self.render_json(w))
2496         def _check_json(json):
2497             data = simplejson.loads(json)
2498             s = data["stats"]
2499             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2500             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2501             self.failUnlessIn("bucket-counter", data)
2502             self.failUnlessIn("lease-checker", data)
2503         d.addCallback(_check_json)
2504         return d
2505
2506     def render_json(self, page):
2507         d = self.render1(page, args={"t": ["json"]})
2508         return d
2509
2510     @mock.patch('allmydata.util.fileutil.get_disk_stats')
2511     def test_status_no_disk_stats(self, mock_get_disk_stats):
2512         mock_get_disk_stats.side_effect = AttributeError()
2513
2514         # Some platforms may have no disk stats API. Make sure the code can handle that
2515         # (test runs on all platforms).
2516         basedir = "storage/WebStatus/status_no_disk_stats"
2517         fileutil.make_dirs(basedir)
2518         ss = StorageServer(basedir, "\x00" * 20)
2519         ss.setServiceParent(self.s)
2520         w = StorageStatus(ss)
2521         html = w.renderSynchronously()
2522         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2523         s = remove_tags(html)
2524         self.failUnlessIn("Accepting new shares: Yes", s)
2525         self.failUnlessIn("Total disk space: ?", s)
2526         self.failUnlessIn("Space Available to Tahoe: ?", s)
2527         self.failUnless(ss.get_available_space() is None)
2528
2529     @mock.patch('allmydata.util.fileutil.get_disk_stats')
2530     def test_status_bad_disk_stats(self, mock_get_disk_stats):
2531         mock_get_disk_stats.side_effect = OSError()
2532
2533         # If the API to get disk stats exists but a call to it fails, then the status should
2534         # show that no shares will be accepted, and get_available_space() should be 0.
2535         basedir = "storage/WebStatus/status_bad_disk_stats"
2536         fileutil.make_dirs(basedir)
2537         ss = StorageServer(basedir, "\x00" * 20)
2538         ss.setServiceParent(self.s)
2539         w = StorageStatus(ss)
2540         html = w.renderSynchronously()
2541         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2542         s = remove_tags(html)
2543         self.failUnlessIn("Accepting new shares: No", s)
2544         self.failUnlessIn("Total disk space: ?", s)
2545         self.failUnlessIn("Space Available to Tahoe: ?", s)
2546         self.failUnlessEqual(ss.get_available_space(), 0)
2547
2548     def test_readonly(self):
2549         basedir = "storage/WebStatus/readonly"
2550         fileutil.make_dirs(basedir)
2551         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2552         ss.setServiceParent(self.s)
2553         w = StorageStatus(ss)
2554         html = w.renderSynchronously()
2555         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2556         s = remove_tags(html)
2557         self.failUnlessIn("Accepting new shares: No", s)
2558
2559     def test_reserved(self):
2560         basedir = "storage/WebStatus/reserved"
2561         fileutil.make_dirs(basedir)
2562         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2563         ss.setServiceParent(self.s)
2564         w = StorageStatus(ss)
2565         html = w.renderSynchronously()
2566         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2567         s = remove_tags(html)
2568         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2569
2570     def test_huge_reserved(self):
2571         basedir = "storage/WebStatus/reserved"
2572         fileutil.make_dirs(basedir)
2573         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2574         ss.setServiceParent(self.s)
2575         w = StorageStatus(ss)
2576         html = w.renderSynchronously()
2577         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2578         s = remove_tags(html)
2579         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
2580
2581     def test_util(self):
2582         w = StorageStatus(None)
2583         self.failUnlessEqual(w.render_space(None, None), "?")
2584         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2585         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2586         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2587         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2588         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)