]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
2765a913447db544d1f6a2f1a7d59a178c9da9af
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1 import time, os.path, platform, stat, re, simplejson, struct, shutil
2
3 import mock
4
5 from twisted.trial import unittest
6
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
10 import itertools
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22      ReadBucketProxy
23 from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
24                                      LayoutInvalid, MDMFSIGNABLEHEADER, \
25                                      SIGNED_PREFIX, MDMFHEADER, \
26                                      MDMFOFFSETS, SDMFSlotWriteProxy, \
27                                      PRIVATE_KEY_SIZE, \
28                                      SIGNATURE_SIZE, \
29                                      VERIFICATION_KEY_SIZE, \
30                                      SHARE_HASH_CHAIN_SIZE
31 from allmydata.interfaces import BadWriteEnablerError
32 from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
33 from allmydata.test.common_web import WebRenderingMixin
34 from allmydata.test.no_network import NoNetworkServer
35 from allmydata.web.storage import StorageStatus, remove_prefix
36
37 class Marker:
38     pass
39 class FakeCanary:
40     def __init__(self, ignore_disconnectors=False):
41         self.ignore = ignore_disconnectors
42         self.disconnectors = {}
43     def notifyOnDisconnect(self, f, *args, **kwargs):
44         if self.ignore:
45             return
46         m = Marker()
47         self.disconnectors[m] = (f, args, kwargs)
48         return m
49     def dontNotifyOnDisconnect(self, marker):
50         if self.ignore:
51             return
52         del self.disconnectors[marker]
53
54 class FakeStatsProvider:
55     def count(self, name, delta=1):
56         pass
57     def register_producer(self, producer):
58         pass
59
60 class Bucket(unittest.TestCase):
61     def make_workdir(self, name):
62         basedir = os.path.join("storage", "Bucket", name)
63         incoming = os.path.join(basedir, "tmp", "bucket")
64         final = os.path.join(basedir, "bucket")
65         fileutil.make_dirs(basedir)
66         fileutil.make_dirs(os.path.join(basedir, "tmp"))
67         return incoming, final
68
69     def bucket_writer_closed(self, bw, consumed):
70         pass
71     def add_latency(self, category, latency):
72         pass
73     def count(self, name, delta=1):
74         pass
75
76     def make_lease(self):
77         owner_num = 0
78         renew_secret = os.urandom(32)
79         cancel_secret = os.urandom(32)
80         expiration_time = time.time() + 5000
81         return LeaseInfo(owner_num, renew_secret, cancel_secret,
82                          expiration_time, "\x00" * 20)
83
84     def test_create(self):
85         incoming, final = self.make_workdir("test_create")
86         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
87                           FakeCanary())
88         bw.remote_write(0, "a"*25)
89         bw.remote_write(25, "b"*25)
90         bw.remote_write(50, "c"*25)
91         bw.remote_write(75, "d"*7)
92         bw.remote_close()
93
94     def test_readwrite(self):
95         incoming, final = self.make_workdir("test_readwrite")
96         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
97                           FakeCanary())
98         bw.remote_write(0, "a"*25)
99         bw.remote_write(25, "b"*25)
100         bw.remote_write(50, "c"*7) # last block may be short
101         bw.remote_close()
102
103         # now read from it
104         br = BucketReader(self, bw.finalhome)
105         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
106         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
107         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
108
109 class RemoteBucket:
110
111     def __init__(self):
112         self.read_count = 0
113         self.write_count = 0
114
115     def callRemote(self, methname, *args, **kwargs):
116         def _call():
117             meth = getattr(self.target, "remote_" + methname)
118             return meth(*args, **kwargs)
119
120         if methname == "slot_readv":
121             self.read_count += 1
122         if "writev" in methname:
123             self.write_count += 1
124
125         return defer.maybeDeferred(_call)
126
127
128 class BucketProxy(unittest.TestCase):
129     def make_bucket(self, name, size):
130         basedir = os.path.join("storage", "BucketProxy", name)
131         incoming = os.path.join(basedir, "tmp", "bucket")
132         final = os.path.join(basedir, "bucket")
133         fileutil.make_dirs(basedir)
134         fileutil.make_dirs(os.path.join(basedir, "tmp"))
135         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
136                           FakeCanary())
137         rb = RemoteBucket()
138         rb.target = bw
139         return bw, rb, final
140
141     def make_lease(self):
142         owner_num = 0
143         renew_secret = os.urandom(32)
144         cancel_secret = os.urandom(32)
145         expiration_time = time.time() + 5000
146         return LeaseInfo(owner_num, renew_secret, cancel_secret,
147                          expiration_time, "\x00" * 20)
148
149     def bucket_writer_closed(self, bw, consumed):
150         pass
151     def add_latency(self, category, latency):
152         pass
153     def count(self, name, delta=1):
154         pass
155
156     def test_create(self):
157         bw, rb, sharefname = self.make_bucket("test_create", 500)
158         bp = WriteBucketProxy(rb, None,
159                               data_size=300,
160                               block_size=10,
161                               num_segments=5,
162                               num_share_hashes=3,
163                               uri_extension_size_max=500)
164         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
165
166     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
167         # Let's pretend each share has 100 bytes of data, and that there are
168         # 4 segments (25 bytes each), and 8 shares total. So the two
169         # per-segment merkle trees (crypttext_hash_tree,
170         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
171         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
172         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
173         # long. That should make the whole share:
174         #
175         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
176         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
177
178         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
179
180         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
181                             for i in range(7)]
182         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
183                         for i in range(7)]
184         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
185                         for i in (1,9,13)]
186         uri_extension = "s" + "E"*498 + "e"
187
188         bw, rb, sharefname = self.make_bucket(name, sharesize)
189         bp = wbp_class(rb, None,
190                        data_size=95,
191                        block_size=25,
192                        num_segments=4,
193                        num_share_hashes=3,
194                        uri_extension_size_max=len(uri_extension))
195
196         d = bp.put_header()
197         d.addCallback(lambda res: bp.put_block(0, "a"*25))
198         d.addCallback(lambda res: bp.put_block(1, "b"*25))
199         d.addCallback(lambda res: bp.put_block(2, "c"*25))
200         d.addCallback(lambda res: bp.put_block(3, "d"*20))
201         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
202         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
203         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
204         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
205         d.addCallback(lambda res: bp.close())
206
207         # now read everything back
208         def _start_reading(res):
209             br = BucketReader(self, sharefname)
210             rb = RemoteBucket()
211             rb.target = br
212             server = NoNetworkServer("abc", None)
213             rbp = rbp_class(rb, server, storage_index="")
214             self.failUnlessIn("to peer", repr(rbp))
215             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
216
217             d1 = rbp.get_block_data(0, 25, 25)
218             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
219             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
220             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
221             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
222             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
223             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
224             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
225
226             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
227             d1.addCallback(lambda res:
228                            self.failUnlessEqual(res, crypttext_hashes))
229             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
230             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
231             d1.addCallback(lambda res: rbp.get_share_hashes())
232             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
233             d1.addCallback(lambda res: rbp.get_uri_extension())
234             d1.addCallback(lambda res:
235                            self.failUnlessEqual(res, uri_extension))
236
237             return d1
238
239         d.addCallback(_start_reading)
240
241         return d
242
243     def test_readwrite_v1(self):
244         return self._do_test_readwrite("test_readwrite_v1",
245                                        0x24, WriteBucketProxy, ReadBucketProxy)
246
247     def test_readwrite_v2(self):
248         return self._do_test_readwrite("test_readwrite_v2",
249                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
250
251 class Server(unittest.TestCase):
252
253     def setUp(self):
254         self.sparent = LoggingServiceParent()
255         self.sparent.startService()
256         self._lease_secret = itertools.count()
257     def tearDown(self):
258         return self.sparent.stopService()
259
260     def workdir(self, name):
261         basedir = os.path.join("storage", "Server", name)
262         return basedir
263
264     def create(self, name, reserved_space=0, klass=StorageServer):
265         workdir = self.workdir(name)
266         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
267                    stats_provider=FakeStatsProvider())
268         ss.setServiceParent(self.sparent)
269         return ss
270
271     def test_create(self):
272         self.create("test_create")
273
274     def allocate(self, ss, storage_index, sharenums, size, canary=None):
275         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
276         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
277         if not canary:
278             canary = FakeCanary()
279         return ss.remote_allocate_buckets(storage_index,
280                                           renew_secret, cancel_secret,
281                                           sharenums, size, canary)
282
283     def test_large_share(self):
284         syslow = platform.system().lower()
285         if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
286             raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
287
288         avail = fileutil.get_available_space('.', 512*2**20)
289         if avail <= 4*2**30:
290             raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
291
292         ss = self.create("test_large_share")
293
294         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
295         self.failUnlessEqual(already, set())
296         self.failUnlessEqual(set(writers.keys()), set([0]))
297
298         shnum, bucket = writers.items()[0]
299         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
300         bucket.remote_write(2**32, "ab")
301         bucket.remote_close()
302
303         readers = ss.remote_get_buckets("allocate")
304         reader = readers[shnum]
305         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
306
307     def test_dont_overfill_dirs(self):
308         """
309         This test asserts that if you add a second share whose storage index
310         share lots of leading bits with an extant share (but isn't the exact
311         same storage index), this won't add an entry to the share directory.
312         """
313         ss = self.create("test_dont_overfill_dirs")
314         already, writers = self.allocate(ss, "storageindex", [0], 10)
315         for i, wb in writers.items():
316             wb.remote_write(0, "%10d" % i)
317             wb.remote_close()
318         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
319                                 "shares")
320         children_of_storedir = set(os.listdir(storedir))
321
322         # Now store another one under another storageindex that has leading
323         # chars the same as the first storageindex.
324         already, writers = self.allocate(ss, "storageindey", [0], 10)
325         for i, wb in writers.items():
326             wb.remote_write(0, "%10d" % i)
327             wb.remote_close()
328         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
329                                 "shares")
330         new_children_of_storedir = set(os.listdir(storedir))
331         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
332
333     def test_remove_incoming(self):
334         ss = self.create("test_remove_incoming")
335         already, writers = self.allocate(ss, "vid", range(3), 10)
336         for i,wb in writers.items():
337             wb.remote_write(0, "%10d" % i)
338             wb.remote_close()
339         incoming_share_dir = wb.incominghome
340         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
341         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
342         incoming_dir = os.path.dirname(incoming_prefix_dir)
343         self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
344         self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
345         self.failUnless(os.path.exists(incoming_dir), incoming_dir)
346
347     def test_abort(self):
348         # remote_abort, when called on a writer, should make sure that
349         # the allocated size of the bucket is not counted by the storage
350         # server when accounting for space.
351         ss = self.create("test_abort")
352         already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
353         self.failIfEqual(ss.allocated_size(), 0)
354
355         # Now abort the writers.
356         for writer in writers.itervalues():
357             writer.remote_abort()
358         self.failUnlessEqual(ss.allocated_size(), 0)
359
360
361     def test_allocate(self):
362         ss = self.create("test_allocate")
363
364         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
365
366         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
367         self.failUnlessEqual(already, set())
368         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
369
370         # while the buckets are open, they should not count as readable
371         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
372
373         # close the buckets
374         for i,wb in writers.items():
375             wb.remote_write(0, "%25d" % i)
376             wb.remote_close()
377             # aborting a bucket that was already closed is a no-op
378             wb.remote_abort()
379
380         # now they should be readable
381         b = ss.remote_get_buckets("allocate")
382         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
383         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
384         b_str = str(b[0])
385         self.failUnlessIn("BucketReader", b_str)
386         self.failUnlessIn("mfwgy33dmf2g 0", b_str)
387
388         # now if we ask about writing again, the server should offer those
389         # three buckets as already present. It should offer them even if we
390         # don't ask about those specific ones.
391         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
392         self.failUnlessEqual(already, set([0,1,2]))
393         self.failUnlessEqual(set(writers.keys()), set([3,4]))
394
395         # while those two buckets are open for writing, the server should
396         # refuse to offer them to uploaders
397
398         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
399         self.failUnlessEqual(already2, set([0,1,2]))
400         self.failUnlessEqual(set(writers2.keys()), set([5]))
401
402         # aborting the writes should remove the tempfiles
403         for i,wb in writers2.items():
404             wb.remote_abort()
405         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
406         self.failUnlessEqual(already2, set([0,1,2]))
407         self.failUnlessEqual(set(writers2.keys()), set([5]))
408
409         for i,wb in writers2.items():
410             wb.remote_abort()
411         for i,wb in writers.items():
412             wb.remote_abort()
413
414     def test_bad_container_version(self):
415         ss = self.create("test_bad_container_version")
416         a,w = self.allocate(ss, "si1", [0], 10)
417         w[0].remote_write(0, "\xff"*10)
418         w[0].remote_close()
419
420         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
421         f = open(fn, "rb+")
422         f.seek(0)
423         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
424         f.close()
425
426         ss.remote_get_buckets("allocate")
427
428         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
429                                   ss.remote_get_buckets, "si1")
430         self.failUnlessIn(" had version 0 but we wanted 1", str(e))
431
432     def test_disconnect(self):
433         # simulate a disconnection
434         ss = self.create("test_disconnect")
435         canary = FakeCanary()
436         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
437         self.failUnlessEqual(already, set())
438         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
439         for (f,args,kwargs) in canary.disconnectors.values():
440             f(*args, **kwargs)
441         del already
442         del writers
443
444         # that ought to delete the incoming shares
445         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
446         self.failUnlessEqual(already, set())
447         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
448
449     @mock.patch('allmydata.util.fileutil.get_disk_stats')
450     def test_reserved_space(self, mock_get_disk_stats):
451         reserved_space=10000
452         mock_get_disk_stats.return_value = {
453             'free_for_nonroot': 15000,
454             'avail': max(15000 - reserved_space, 0),
455             }
456
457         ss = self.create("test_reserved_space", reserved_space=reserved_space)
458         # 15k available, 10k reserved, leaves 5k for shares
459
460         # a newly created and filled share incurs this much overhead, beyond
461         # the size we request.
462         OVERHEAD = 3*4
463         LEASE_SIZE = 4+32+32+4
464         canary = FakeCanary(True)
465         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
466         self.failUnlessEqual(len(writers), 3)
467         # now the StorageServer should have 3000 bytes provisionally
468         # allocated, allowing only 2000 more to be claimed
469         self.failUnlessEqual(len(ss._active_writers), 3)
470
471         # allocating 1001-byte shares only leaves room for one
472         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
473         self.failUnlessEqual(len(writers2), 1)
474         self.failUnlessEqual(len(ss._active_writers), 4)
475
476         # we abandon the first set, so their provisional allocation should be
477         # returned
478         del already
479         del writers
480         self.failUnlessEqual(len(ss._active_writers), 1)
481         # now we have a provisional allocation of 1001 bytes
482
483         # and we close the second set, so their provisional allocation should
484         # become real, long-term allocation, and grows to include the
485         # overhead.
486         for bw in writers2.values():
487             bw.remote_write(0, "a"*25)
488             bw.remote_close()
489         del already2
490         del writers2
491         del bw
492         self.failUnlessEqual(len(ss._active_writers), 0)
493
494         allocated = 1001 + OVERHEAD + LEASE_SIZE
495
496         # we have to manually increase available, since we're not doing real
497         # disk measurements
498         mock_get_disk_stats.return_value = {
499             'free_for_nonroot': 15000 - allocated,
500             'avail': max(15000 - allocated - reserved_space, 0),
501             }
502
503         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
504         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
505         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
506         self.failUnlessEqual(len(writers3), 39)
507         self.failUnlessEqual(len(ss._active_writers), 39)
508
509         del already3
510         del writers3
511         self.failUnlessEqual(len(ss._active_writers), 0)
512         ss.disownServiceParent()
513         del ss
514
515     def test_seek(self):
516         basedir = self.workdir("test_seek_behavior")
517         fileutil.make_dirs(basedir)
518         filename = os.path.join(basedir, "testfile")
519         f = open(filename, "wb")
520         f.write("start")
521         f.close()
522         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
523         # files. mode="a" preserves previous contents but does not allow
524         # seeking-to-create-holes. mode="r+" allows both.
525         f = open(filename, "rb+")
526         f.seek(100)
527         f.write("100")
528         f.close()
529         filelen = os.stat(filename)[stat.ST_SIZE]
530         self.failUnlessEqual(filelen, 100+3)
531         f2 = open(filename, "rb")
532         self.failUnlessEqual(f2.read(5), "start")
533
534
535     def test_leases(self):
536         ss = self.create("test_leases")
537         canary = FakeCanary()
538         sharenums = range(5)
539         size = 100
540
541         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
542                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
543         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
544                                                      sharenums, size, canary)
545         self.failUnlessEqual(len(already), 0)
546         self.failUnlessEqual(len(writers), 5)
547         for wb in writers.values():
548             wb.remote_close()
549
550         leases = list(ss.get_leases("si0"))
551         self.failUnlessEqual(len(leases), 1)
552         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
553
554         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
555                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
556         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
557                                                      sharenums, size, canary)
558         for wb in writers.values():
559             wb.remote_close()
560
561         # take out a second lease on si1
562         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
563                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
564         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
565                                                      sharenums, size, canary)
566         self.failUnlessEqual(len(already), 5)
567         self.failUnlessEqual(len(writers), 0)
568
569         leases = list(ss.get_leases("si1"))
570         self.failUnlessEqual(len(leases), 2)
571         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
572
573         # and a third lease, using add-lease
574         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
575                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
576         ss.remote_add_lease("si1", rs2a, cs2a)
577         leases = list(ss.get_leases("si1"))
578         self.failUnlessEqual(len(leases), 3)
579         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
580
581         # add-lease on a missing storage index is silently ignored
582         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
583
584         # check that si0 is readable
585         readers = ss.remote_get_buckets("si0")
586         self.failUnlessEqual(len(readers), 5)
587
588         # renew the first lease. Only the proper renew_secret should work
589         ss.remote_renew_lease("si0", rs0)
590         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
591         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
592
593         # check that si0 is still readable
594         readers = ss.remote_get_buckets("si0")
595         self.failUnlessEqual(len(readers), 5)
596
597         # now cancel it
598         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
599         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
600         ss.remote_cancel_lease("si0", cs0)
601
602         # si0 should now be gone
603         readers = ss.remote_get_buckets("si0")
604         self.failUnlessEqual(len(readers), 0)
605         # and the renew should no longer work
606         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
607
608
609         # cancel the first lease on si1, leaving the second and third in place
610         ss.remote_cancel_lease("si1", cs1)
611         readers = ss.remote_get_buckets("si1")
612         self.failUnlessEqual(len(readers), 5)
613         # the corresponding renew should no longer work
614         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
615
616         leases = list(ss.get_leases("si1"))
617         self.failUnlessEqual(len(leases), 2)
618         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
619
620         ss.remote_renew_lease("si1", rs2)
621         # cancelling the second and third should make it go away
622         ss.remote_cancel_lease("si1", cs2)
623         ss.remote_cancel_lease("si1", cs2a)
624         readers = ss.remote_get_buckets("si1")
625         self.failUnlessEqual(len(readers), 0)
626         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
627         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
628         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
629
630         leases = list(ss.get_leases("si1"))
631         self.failUnlessEqual(len(leases), 0)
632
633
634         # test overlapping uploads
635         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
636                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
637         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
638                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
639         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
640                                                      sharenums, size, canary)
641         self.failUnlessEqual(len(already), 0)
642         self.failUnlessEqual(len(writers), 5)
643         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
644                                                        sharenums, size, canary)
645         self.failUnlessEqual(len(already2), 0)
646         self.failUnlessEqual(len(writers2), 0)
647         for wb in writers.values():
648             wb.remote_close()
649
650         leases = list(ss.get_leases("si3"))
651         self.failUnlessEqual(len(leases), 1)
652
653         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
654                                                        sharenums, size, canary)
655         self.failUnlessEqual(len(already3), 5)
656         self.failUnlessEqual(len(writers3), 0)
657
658         leases = list(ss.get_leases("si3"))
659         self.failUnlessEqual(len(leases), 2)
660
661     def test_readonly(self):
662         workdir = self.workdir("test_readonly")
663         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
664         ss.setServiceParent(self.sparent)
665
666         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
667         self.failUnlessEqual(already, set())
668         self.failUnlessEqual(writers, {})
669
670         stats = ss.get_stats()
671         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
672         if "storage_server.disk_avail" in stats:
673             # Some platforms may not have an API to get disk stats.
674             # But if there are stats, readonly_storage means disk_avail=0
675             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
676
677     def test_discard(self):
678         # discard is really only used for other tests, but we test it anyways
679         workdir = self.workdir("test_discard")
680         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
681         ss.setServiceParent(self.sparent)
682
683         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
684         self.failUnlessEqual(already, set())
685         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
686         for i,wb in writers.items():
687             wb.remote_write(0, "%25d" % i)
688             wb.remote_close()
689         # since we discard the data, the shares should be present but sparse.
690         # Since we write with some seeks, the data we read back will be all
691         # zeros.
692         b = ss.remote_get_buckets("vid")
693         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
694         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
695
696     def test_advise_corruption(self):
697         workdir = self.workdir("test_advise_corruption")
698         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
699         ss.setServiceParent(self.sparent)
700
701         si0_s = base32.b2a("si0")
702         ss.remote_advise_corrupt_share("immutable", "si0", 0,
703                                        "This share smells funny.\n")
704         reportdir = os.path.join(workdir, "corruption-advisories")
705         reports = os.listdir(reportdir)
706         self.failUnlessEqual(len(reports), 1)
707         report_si0 = reports[0]
708         self.failUnlessIn(si0_s, report_si0)
709         f = open(os.path.join(reportdir, report_si0), "r")
710         report = f.read()
711         f.close()
712         self.failUnlessIn("type: immutable", report)
713         self.failUnlessIn("storage_index: %s" % si0_s, report)
714         self.failUnlessIn("share_number: 0", report)
715         self.failUnlessIn("This share smells funny.", report)
716
717         # test the RIBucketWriter version too
718         si1_s = base32.b2a("si1")
719         already,writers = self.allocate(ss, "si1", [1], 75)
720         self.failUnlessEqual(already, set())
721         self.failUnlessEqual(set(writers.keys()), set([1]))
722         writers[1].remote_write(0, "data")
723         writers[1].remote_close()
724
725         b = ss.remote_get_buckets("si1")
726         self.failUnlessEqual(set(b.keys()), set([1]))
727         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
728
729         reports = os.listdir(reportdir)
730         self.failUnlessEqual(len(reports), 2)
731         report_si1 = [r for r in reports if si1_s in r][0]
732         f = open(os.path.join(reportdir, report_si1), "r")
733         report = f.read()
734         f.close()
735         self.failUnlessIn("type: immutable", report)
736         self.failUnlessIn("storage_index: %s" % si1_s, report)
737         self.failUnlessIn("share_number: 1", report)
738         self.failUnlessIn("This share tastes like dust.", report)
739
740
741
742 class MutableServer(unittest.TestCase):
743
744     def setUp(self):
745         self.sparent = LoggingServiceParent()
746         self._lease_secret = itertools.count()
747     def tearDown(self):
748         return self.sparent.stopService()
749
750     def workdir(self, name):
751         basedir = os.path.join("storage", "MutableServer", name)
752         return basedir
753
754     def create(self, name):
755         workdir = self.workdir(name)
756         ss = StorageServer(workdir, "\x00" * 20)
757         ss.setServiceParent(self.sparent)
758         return ss
759
760     def test_create(self):
761         self.create("test_create")
762
763     def write_enabler(self, we_tag):
764         return hashutil.tagged_hash("we_blah", we_tag)
765
766     def renew_secret(self, tag):
767         return hashutil.tagged_hash("renew_blah", str(tag))
768
769     def cancel_secret(self, tag):
770         return hashutil.tagged_hash("cancel_blah", str(tag))
771
772     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
773         write_enabler = self.write_enabler(we_tag)
774         renew_secret = self.renew_secret(lease_tag)
775         cancel_secret = self.cancel_secret(lease_tag)
776         rstaraw = ss.remote_slot_testv_and_readv_and_writev
777         testandwritev = dict( [ (shnum, ([], [], None) )
778                          for shnum in sharenums ] )
779         readv = []
780         rc = rstaraw(storage_index,
781                      (write_enabler, renew_secret, cancel_secret),
782                      testandwritev,
783                      readv)
784         (did_write, readv_data) = rc
785         self.failUnless(did_write)
786         self.failUnless(isinstance(readv_data, dict))
787         self.failUnlessEqual(len(readv_data), 0)
788
789     def test_bad_magic(self):
790         ss = self.create("test_bad_magic")
791         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
792         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
793         f = open(fn, "rb+")
794         f.seek(0)
795         f.write("BAD MAGIC")
796         f.close()
797         read = ss.remote_slot_readv
798         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
799                                   read, "si1", [0], [(0,10)])
800         self.failUnlessIn(" had magic ", str(e))
801         self.failUnlessIn(" but we wanted ", str(e))
802
803     def test_container_size(self):
804         ss = self.create("test_container_size")
805         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
806                       set([0,1,2]), 100)
807         read = ss.remote_slot_readv
808         rstaraw = ss.remote_slot_testv_and_readv_and_writev
809         secrets = ( self.write_enabler("we1"),
810                     self.renew_secret("we1"),
811                     self.cancel_secret("we1") )
812         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
813         answer = rstaraw("si1", secrets,
814                          {0: ([], [(0,data)], len(data)+12)},
815                          [])
816         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
817
818         # trying to make the container too large will raise an exception
819         TOOBIG = MutableShareFile.MAX_SIZE + 10
820         self.failUnlessRaises(DataTooLargeError,
821                               rstaraw, "si1", secrets,
822                               {0: ([], [(0,data)], TOOBIG)},
823                               [])
824
825         # it should be possible to make the container smaller, although at
826         # the moment this doesn't actually affect the share, unless the
827         # container size is dropped to zero, in which case the share is
828         # deleted.
829         answer = rstaraw("si1", secrets,
830                          {0: ([], [(0,data)], len(data)+8)},
831                          [])
832         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
833
834         answer = rstaraw("si1", secrets,
835                          {0: ([], [(0,data)], 0)},
836                          [])
837         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
838
839         read_answer = read("si1", [0], [(0,10)])
840         self.failUnlessEqual(read_answer, {})
841
842     def test_allocate(self):
843         ss = self.create("test_allocate")
844         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
845                       set([0,1,2]), 100)
846
847         read = ss.remote_slot_readv
848         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
849                              {0: [""]})
850         self.failUnlessEqual(read("si1", [], [(0, 10)]),
851                              {0: [""], 1: [""], 2: [""]})
852         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
853                              {0: [""]})
854
855         # try writing to one
856         secrets = ( self.write_enabler("we1"),
857                     self.renew_secret("we1"),
858                     self.cancel_secret("we1") )
859         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
860         write = ss.remote_slot_testv_and_readv_and_writev
861         answer = write("si1", secrets,
862                        {0: ([], [(0,data)], None)},
863                        [])
864         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
865
866         self.failUnlessEqual(read("si1", [0], [(0,20)]),
867                              {0: ["00000000001111111111"]})
868         self.failUnlessEqual(read("si1", [0], [(95,10)]),
869                              {0: ["99999"]})
870         #self.failUnlessEqual(s0.remote_get_length(), 100)
871
872         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
873         f = self.failUnlessRaises(BadWriteEnablerError,
874                                   write, "si1", bad_secrets,
875                                   {}, [])
876         self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
877
878         # this testv should fail
879         answer = write("si1", secrets,
880                        {0: ([(0, 12, "eq", "444444444444"),
881                              (20, 5, "eq", "22222"),
882                              ],
883                             [(0, "x"*100)],
884                             None),
885                         },
886                        [(0,12), (20,5)],
887                        )
888         self.failUnlessEqual(answer, (False,
889                                       {0: ["000000000011", "22222"],
890                                        1: ["", ""],
891                                        2: ["", ""],
892                                        }))
893         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
894
895         # as should this one
896         answer = write("si1", secrets,
897                        {0: ([(10, 5, "lt", "11111"),
898                              ],
899                             [(0, "x"*100)],
900                             None),
901                         },
902                        [(10,5)],
903                        )
904         self.failUnlessEqual(answer, (False,
905                                       {0: ["11111"],
906                                        1: [""],
907                                        2: [""]},
908                                       ))
909         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
910
911
912     def test_operators(self):
913         # test operators, the data we're comparing is '11111' in all cases.
914         # test both fail+pass, reset data after each one.
915         ss = self.create("test_operators")
916
917         secrets = ( self.write_enabler("we1"),
918                     self.renew_secret("we1"),
919                     self.cancel_secret("we1") )
920         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
921         write = ss.remote_slot_testv_and_readv_and_writev
922         read = ss.remote_slot_readv
923
924         def reset():
925             write("si1", secrets,
926                   {0: ([], [(0,data)], None)},
927                   [])
928
929         reset()
930
931         #  lt
932         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
933                                              ],
934                                             [(0, "x"*100)],
935                                             None,
936                                             )}, [(10,5)])
937         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
938         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
939         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
940         reset()
941
942         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
943                                              ],
944                                             [(0, "x"*100)],
945                                             None,
946                                             )}, [(10,5)])
947         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
948         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
949         reset()
950
951         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
952                                              ],
953                                             [(0, "y"*100)],
954                                             None,
955                                             )}, [(10,5)])
956         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
957         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
958         reset()
959
960         #  le
961         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
962                                              ],
963                                             [(0, "x"*100)],
964                                             None,
965                                             )}, [(10,5)])
966         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
967         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
968         reset()
969
970         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
971                                              ],
972                                             [(0, "y"*100)],
973                                             None,
974                                             )}, [(10,5)])
975         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
976         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
977         reset()
978
979         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
980                                              ],
981                                             [(0, "y"*100)],
982                                             None,
983                                             )}, [(10,5)])
984         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
985         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
986         reset()
987
988         #  eq
989         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
990                                              ],
991                                             [(0, "x"*100)],
992                                             None,
993                                             )}, [(10,5)])
994         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
995         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
996         reset()
997
998         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
999                                              ],
1000                                             [(0, "y"*100)],
1001                                             None,
1002                                             )}, [(10,5)])
1003         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1004         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1005         reset()
1006
1007         #  ne
1008         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
1009                                              ],
1010                                             [(0, "x"*100)],
1011                                             None,
1012                                             )}, [(10,5)])
1013         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1014         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1015         reset()
1016
1017         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1018                                              ],
1019                                             [(0, "y"*100)],
1020                                             None,
1021                                             )}, [(10,5)])
1022         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1023         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1024         reset()
1025
1026         #  ge
1027         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1028                                              ],
1029                                             [(0, "y"*100)],
1030                                             None,
1031                                             )}, [(10,5)])
1032         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1033         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1034         reset()
1035
1036         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1037                                              ],
1038                                             [(0, "y"*100)],
1039                                             None,
1040                                             )}, [(10,5)])
1041         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1042         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1043         reset()
1044
1045         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1046                                              ],
1047                                             [(0, "y"*100)],
1048                                             None,
1049                                             )}, [(10,5)])
1050         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1051         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1052         reset()
1053
1054         #  gt
1055         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1056                                              ],
1057                                             [(0, "y"*100)],
1058                                             None,
1059                                             )}, [(10,5)])
1060         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1061         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1062         reset()
1063
1064         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1065                                              ],
1066                                             [(0, "x"*100)],
1067                                             None,
1068                                             )}, [(10,5)])
1069         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1070         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1071         reset()
1072
1073         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1074                                              ],
1075                                             [(0, "x"*100)],
1076                                             None,
1077                                             )}, [(10,5)])
1078         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1079         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1080         reset()
1081
1082         # finally, test some operators against empty shares
1083         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1084                                              ],
1085                                             [(0, "x"*100)],
1086                                             None,
1087                                             )}, [(10,5)])
1088         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1089         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1090         reset()
1091
1092     def test_readv(self):
1093         ss = self.create("test_readv")
1094         secrets = ( self.write_enabler("we1"),
1095                     self.renew_secret("we1"),
1096                     self.cancel_secret("we1") )
1097         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1098         write = ss.remote_slot_testv_and_readv_and_writev
1099         read = ss.remote_slot_readv
1100         data = [("%d" % i) * 100 for i in range(3)]
1101         rc = write("si1", secrets,
1102                    {0: ([], [(0,data[0])], None),
1103                     1: ([], [(0,data[1])], None),
1104                     2: ([], [(0,data[2])], None),
1105                     }, [])
1106         self.failUnlessEqual(rc, (True, {}))
1107
1108         answer = read("si1", [], [(0, 10)])
1109         self.failUnlessEqual(answer, {0: ["0"*10],
1110                                       1: ["1"*10],
1111                                       2: ["2"*10]})
1112
1113     def compare_leases_without_timestamps(self, leases_a, leases_b):
1114         self.failUnlessEqual(len(leases_a), len(leases_b))
1115         for i in range(len(leases_a)):
1116             a = leases_a[i]
1117             b = leases_b[i]
1118             self.failUnlessEqual(a.owner_num,       b.owner_num)
1119             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1120             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1121             self.failUnlessEqual(a.nodeid,          b.nodeid)
1122
1123     def compare_leases(self, leases_a, leases_b):
1124         self.failUnlessEqual(len(leases_a), len(leases_b))
1125         for i in range(len(leases_a)):
1126             a = leases_a[i]
1127             b = leases_b[i]
1128             self.failUnlessEqual(a.owner_num,       b.owner_num)
1129             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1130             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1131             self.failUnlessEqual(a.nodeid,          b.nodeid)
1132             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1133
1134     def test_leases(self):
1135         ss = self.create("test_leases")
1136         def secrets(n):
1137             return ( self.write_enabler("we1"),
1138                      self.renew_secret("we1-%d" % n),
1139                      self.cancel_secret("we1-%d" % n) )
1140         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1141         write = ss.remote_slot_testv_and_readv_and_writev
1142         read = ss.remote_slot_readv
1143         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1144         self.failUnlessEqual(rc, (True, {}))
1145
1146         # create a random non-numeric file in the bucket directory, to
1147         # exercise the code that's supposed to ignore those.
1148         bucket_dir = os.path.join(self.workdir("test_leases"),
1149                                   "shares", storage_index_to_dir("si1"))
1150         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1151         f.write("you ought to be ignoring me\n")
1152         f.close()
1153
1154         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1155         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1156
1157         # add-lease on a missing storage index is silently ignored
1158         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1159
1160         # re-allocate the slots and use the same secrets, that should update
1161         # the lease
1162         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1163         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1164
1165         # renew it directly
1166         ss.remote_renew_lease("si1", secrets(0)[1])
1167         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1168
1169         # now allocate them with a bunch of different secrets, to trigger the
1170         # extended lease code. Use add_lease for one of them.
1171         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1172         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1173         secrets2 = secrets(2)
1174         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1175         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1176         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1177         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1178         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1179
1180         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1181
1182         # cancel one of them
1183         ss.remote_cancel_lease("si1", secrets(5)[2])
1184         self.failUnlessEqual(len(list(s0.get_leases())), 5)
1185
1186         all_leases = list(s0.get_leases())
1187         # and write enough data to expand the container, forcing the server
1188         # to move the leases
1189         write("si1", secrets(0),
1190               {0: ([], [(0,data)], 200), },
1191               [])
1192
1193         # read back the leases, make sure they're still intact.
1194         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1195
1196         ss.remote_renew_lease("si1", secrets(0)[1])
1197         ss.remote_renew_lease("si1", secrets(1)[1])
1198         ss.remote_renew_lease("si1", secrets(2)[1])
1199         ss.remote_renew_lease("si1", secrets(3)[1])
1200         ss.remote_renew_lease("si1", secrets(4)[1])
1201         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1202         # get a new copy of the leases, with the current timestamps. Reading
1203         # data and failing to renew/cancel leases should leave the timestamps
1204         # alone.
1205         all_leases = list(s0.get_leases())
1206         # renewing with a bogus token should prompt an error message
1207
1208         # examine the exception thus raised, make sure the old nodeid is
1209         # present, to provide for share migration
1210         e = self.failUnlessRaises(IndexError,
1211                                   ss.remote_renew_lease, "si1",
1212                                   secrets(20)[1])
1213         e_s = str(e)
1214         self.failUnlessIn("Unable to renew non-existent lease", e_s)
1215         self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1216         self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1217
1218         # same for cancelling
1219         self.failUnlessRaises(IndexError,
1220                               ss.remote_cancel_lease, "si1",
1221                               secrets(20)[2])
1222         self.compare_leases(all_leases, list(s0.get_leases()))
1223
1224         # reading shares should not modify the timestamp
1225         read("si1", [], [(0,200)])
1226         self.compare_leases(all_leases, list(s0.get_leases()))
1227
1228         write("si1", secrets(0),
1229               {0: ([], [(200, "make me bigger")], None)}, [])
1230         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1231
1232         write("si1", secrets(0),
1233               {0: ([], [(500, "make me really bigger")], None)}, [])
1234         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1235
1236         # now cancel them all
1237         ss.remote_cancel_lease("si1", secrets(0)[2])
1238         ss.remote_cancel_lease("si1", secrets(1)[2])
1239         ss.remote_cancel_lease("si1", secrets(2)[2])
1240         ss.remote_cancel_lease("si1", secrets(3)[2])
1241
1242         # the slot should still be there
1243         remaining_shares = read("si1", [], [(0,10)])
1244         self.failUnlessEqual(len(remaining_shares), 1)
1245         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1246
1247         # cancelling a non-existent lease should raise an IndexError
1248         self.failUnlessRaises(IndexError,
1249                               ss.remote_cancel_lease, "si1", "nonsecret")
1250
1251         # and the slot should still be there
1252         remaining_shares = read("si1", [], [(0,10)])
1253         self.failUnlessEqual(len(remaining_shares), 1)
1254         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1255
1256         ss.remote_cancel_lease("si1", secrets(4)[2])
1257         # now the slot should be gone
1258         no_shares = read("si1", [], [(0,10)])
1259         self.failUnlessEqual(no_shares, {})
1260
1261         # cancelling a lease on a non-existent share should raise an IndexError
1262         self.failUnlessRaises(IndexError,
1263                               ss.remote_cancel_lease, "si2", "nonsecret")
1264
1265     def test_remove(self):
1266         ss = self.create("test_remove")
1267         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1268                       set([0,1,2]), 100)
1269         readv = ss.remote_slot_readv
1270         writev = ss.remote_slot_testv_and_readv_and_writev
1271         secrets = ( self.write_enabler("we1"),
1272                     self.renew_secret("we1"),
1273                     self.cancel_secret("we1") )
1274         # delete sh0 by setting its size to zero
1275         answer = writev("si1", secrets,
1276                         {0: ([], [], 0)},
1277                         [])
1278         # the answer should mention all the shares that existed before the
1279         # write
1280         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1281         # but a new read should show only sh1 and sh2
1282         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1283                              {1: [""], 2: [""]})
1284
1285         # delete sh1 by setting its size to zero
1286         answer = writev("si1", secrets,
1287                         {1: ([], [], 0)},
1288                         [])
1289         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1290         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1291                              {2: [""]})
1292
1293         # delete sh2 by setting its size to zero
1294         answer = writev("si1", secrets,
1295                         {2: ([], [], 0)},
1296                         [])
1297         self.failUnlessEqual(answer, (True, {2:[]}) )
1298         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1299                              {})
1300         # and the bucket directory should now be gone
1301         si = base32.b2a("si1")
1302         # note: this is a detail of the storage server implementation, and
1303         # may change in the future
1304         prefix = si[:2]
1305         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1306         bucketdir = os.path.join(prefixdir, si)
1307         self.failUnless(os.path.exists(prefixdir), prefixdir)
1308         self.failIf(os.path.exists(bucketdir), bucketdir)
1309
1310
1311 class MDMFProxies(unittest.TestCase, ShouldFailMixin):
1312     def setUp(self):
1313         self.sparent = LoggingServiceParent()
1314         self._lease_secret = itertools.count()
1315         self.ss = self.create("MDMFProxies storage test server")
1316         self.rref = RemoteBucket()
1317         self.rref.target = self.ss
1318         self.secrets = (self.write_enabler("we_secret"),
1319                         self.renew_secret("renew_secret"),
1320                         self.cancel_secret("cancel_secret"))
1321         self.segment = "aaaaaa"
1322         self.block = "aa"
1323         self.salt = "a" * 16
1324         self.block_hash = "a" * 32
1325         self.block_hash_tree = [self.block_hash for i in xrange(6)]
1326         self.share_hash = self.block_hash
1327         self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
1328         self.signature = "foobarbaz"
1329         self.verification_key = "vvvvvv"
1330         self.encprivkey = "private"
1331         self.root_hash = self.block_hash
1332         self.salt_hash = self.root_hash
1333         self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
1334         self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
1335         self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
1336         # blockhashes and salt hashes are serialized in the same way,
1337         # only we lop off the first element and store that in the
1338         # header.
1339         self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
1340
1341
1342     def tearDown(self):
1343         self.sparent.stopService()
1344         shutil.rmtree(self.workdir("MDMFProxies storage test server"))
1345
1346
1347     def write_enabler(self, we_tag):
1348         return hashutil.tagged_hash("we_blah", we_tag)
1349
1350
1351     def renew_secret(self, tag):
1352         return hashutil.tagged_hash("renew_blah", str(tag))
1353
1354
1355     def cancel_secret(self, tag):
1356         return hashutil.tagged_hash("cancel_blah", str(tag))
1357
1358
1359     def workdir(self, name):
1360         basedir = os.path.join("storage", "MutableServer", name)
1361         return basedir
1362
1363
1364     def create(self, name):
1365         workdir = self.workdir(name)
1366         ss = StorageServer(workdir, "\x00" * 20)
1367         ss.setServiceParent(self.sparent)
1368         return ss
1369
1370
1371     def build_test_mdmf_share(self, tail_segment=False, empty=False):
1372         # Start with the checkstring
1373         data = struct.pack(">BQ32s",
1374                            1,
1375                            0,
1376                            self.root_hash)
1377         self.checkstring = data
1378         # Next, the encoding parameters
1379         if tail_segment:
1380             data += struct.pack(">BBQQ",
1381                                 3,
1382                                 10,
1383                                 6,
1384                                 33)
1385         elif empty:
1386             data += struct.pack(">BBQQ",
1387                                 3,
1388                                 10,
1389                                 0,
1390                                 0)
1391         else:
1392             data += struct.pack(">BBQQ",
1393                                 3,
1394                                 10,
1395                                 6,
1396                                 36)
1397         # Now we'll build the offsets.
1398         sharedata = ""
1399         if not tail_segment and not empty:
1400             for i in xrange(6):
1401                 sharedata += self.salt + self.block
1402         elif tail_segment:
1403             for i in xrange(5):
1404                 sharedata += self.salt + self.block
1405             sharedata += self.salt + "a"
1406
1407         # The encrypted private key comes after the shares + salts
1408         offset_size = struct.calcsize(MDMFOFFSETS)
1409         encrypted_private_key_offset = len(data) + offset_size
1410         # The share has chain comes after the private key
1411         sharehashes_offset = encrypted_private_key_offset + \
1412             len(self.encprivkey)
1413
1414         # The signature comes after the share hash chain.
1415         signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
1416
1417         verification_key_offset = signature_offset + len(self.signature)
1418         verification_key_end = verification_key_offset + \
1419             len(self.verification_key)
1420
1421         share_data_offset = offset_size
1422         share_data_offset += PRIVATE_KEY_SIZE
1423         share_data_offset += SIGNATURE_SIZE
1424         share_data_offset += VERIFICATION_KEY_SIZE
1425         share_data_offset += SHARE_HASH_CHAIN_SIZE
1426
1427         blockhashes_offset = share_data_offset + len(sharedata)
1428         eof_offset = blockhashes_offset + len(self.block_hash_tree_s)
1429
1430         data += struct.pack(MDMFOFFSETS,
1431                             encrypted_private_key_offset,
1432                             sharehashes_offset,
1433                             signature_offset,
1434                             verification_key_offset,
1435                             verification_key_end,
1436                             share_data_offset,
1437                             blockhashes_offset,
1438                             eof_offset)
1439
1440         self.offsets = {}
1441         self.offsets['enc_privkey'] = encrypted_private_key_offset
1442         self.offsets['block_hash_tree'] = blockhashes_offset
1443         self.offsets['share_hash_chain'] = sharehashes_offset
1444         self.offsets['signature'] = signature_offset
1445         self.offsets['verification_key'] = verification_key_offset
1446         self.offsets['share_data'] = share_data_offset
1447         self.offsets['verification_key_end'] = verification_key_end
1448         self.offsets['EOF'] = eof_offset
1449
1450         # the private key,
1451         data += self.encprivkey
1452         # the sharehashes
1453         data += self.share_hash_chain_s
1454         # the signature,
1455         data += self.signature
1456         # and the verification key
1457         data += self.verification_key
1458         # Then we'll add in gibberish until we get to the right point.
1459         nulls = "".join([" " for i in xrange(len(data), share_data_offset)])
1460         data += nulls
1461
1462         # Then the share data
1463         data += sharedata
1464         # the blockhashes
1465         data += self.block_hash_tree_s
1466         return data
1467
1468
1469     def write_test_share_to_server(self,
1470                                    storage_index,
1471                                    tail_segment=False,
1472                                    empty=False):
1473         """
1474         I write some data for the read tests to read to self.ss
1475
1476         If tail_segment=True, then I will write a share that has a
1477         smaller tail segment than other segments.
1478         """
1479         write = self.ss.remote_slot_testv_and_readv_and_writev
1480         data = self.build_test_mdmf_share(tail_segment, empty)
1481         # Finally, we write the whole thing to the storage server in one
1482         # pass.
1483         testvs = [(0, 1, "eq", "")]
1484         tws = {}
1485         tws[0] = (testvs, [(0, data)], None)
1486         readv = [(0, 1)]
1487         results = write(storage_index, self.secrets, tws, readv)
1488         self.failUnless(results[0])
1489
1490
1491     def build_test_sdmf_share(self, empty=False):
1492         if empty:
1493             sharedata = ""
1494         else:
1495             sharedata = self.segment * 6
1496         self.sharedata = sharedata
1497         blocksize = len(sharedata) / 3
1498         block = sharedata[:blocksize]
1499         self.blockdata = block
1500         prefix = struct.pack(">BQ32s16s BBQQ",
1501                              0, # version,
1502                              0,
1503                              self.root_hash,
1504                              self.salt,
1505                              3,
1506                              10,
1507                              len(sharedata),
1508                              len(sharedata),
1509                             )
1510         post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
1511         signature_offset = post_offset + len(self.verification_key)
1512         sharehashes_offset = signature_offset + len(self.signature)
1513         blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
1514         sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
1515         encprivkey_offset = sharedata_offset + len(block)
1516         eof_offset = encprivkey_offset + len(self.encprivkey)
1517         offsets = struct.pack(">LLLLQQ",
1518                               signature_offset,
1519                               sharehashes_offset,
1520                               blockhashes_offset,
1521                               sharedata_offset,
1522                               encprivkey_offset,
1523                               eof_offset)
1524         final_share = "".join([prefix,
1525                            offsets,
1526                            self.verification_key,
1527                            self.signature,
1528                            self.share_hash_chain_s,
1529                            self.block_hash_tree_s,
1530                            block,
1531                            self.encprivkey])
1532         self.offsets = {}
1533         self.offsets['signature'] = signature_offset
1534         self.offsets['share_hash_chain'] = sharehashes_offset
1535         self.offsets['block_hash_tree'] = blockhashes_offset
1536         self.offsets['share_data'] = sharedata_offset
1537         self.offsets['enc_privkey'] = encprivkey_offset
1538         self.offsets['EOF'] = eof_offset
1539         return final_share
1540
1541
1542     def write_sdmf_share_to_server(self,
1543                                    storage_index,
1544                                    empty=False):
1545         # Some tests need SDMF shares to verify that we can still 
1546         # read them. This method writes one, which resembles but is not
1547         assert self.rref
1548         write = self.ss.remote_slot_testv_and_readv_and_writev
1549         share = self.build_test_sdmf_share(empty)
1550         testvs = [(0, 1, "eq", "")]
1551         tws = {}
1552         tws[0] = (testvs, [(0, share)], None)
1553         readv = []
1554         results = write(storage_index, self.secrets, tws, readv)
1555         self.failUnless(results[0])
1556
1557
1558     def test_read(self):
1559         self.write_test_share_to_server("si1")
1560         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1561         # Check that every method equals what we expect it to.
1562         d = defer.succeed(None)
1563         def _check_block_and_salt((block, salt)):
1564             self.failUnlessEqual(block, self.block)
1565             self.failUnlessEqual(salt, self.salt)
1566
1567         for i in xrange(6):
1568             d.addCallback(lambda ignored, i=i:
1569                 mr.get_block_and_salt(i))
1570             d.addCallback(_check_block_and_salt)
1571
1572         d.addCallback(lambda ignored:
1573             mr.get_encprivkey())
1574         d.addCallback(lambda encprivkey:
1575             self.failUnlessEqual(self.encprivkey, encprivkey))
1576
1577         d.addCallback(lambda ignored:
1578             mr.get_blockhashes())
1579         d.addCallback(lambda blockhashes:
1580             self.failUnlessEqual(self.block_hash_tree, blockhashes))
1581
1582         d.addCallback(lambda ignored:
1583             mr.get_sharehashes())
1584         d.addCallback(lambda sharehashes:
1585             self.failUnlessEqual(self.share_hash_chain, sharehashes))
1586
1587         d.addCallback(lambda ignored:
1588             mr.get_signature())
1589         d.addCallback(lambda signature:
1590             self.failUnlessEqual(signature, self.signature))
1591
1592         d.addCallback(lambda ignored:
1593             mr.get_verification_key())
1594         d.addCallback(lambda verification_key:
1595             self.failUnlessEqual(verification_key, self.verification_key))
1596
1597         d.addCallback(lambda ignored:
1598             mr.get_seqnum())
1599         d.addCallback(lambda seqnum:
1600             self.failUnlessEqual(seqnum, 0))
1601
1602         d.addCallback(lambda ignored:
1603             mr.get_root_hash())
1604         d.addCallback(lambda root_hash:
1605             self.failUnlessEqual(self.root_hash, root_hash))
1606
1607         d.addCallback(lambda ignored:
1608             mr.get_seqnum())
1609         d.addCallback(lambda seqnum:
1610             self.failUnlessEqual(0, seqnum))
1611
1612         d.addCallback(lambda ignored:
1613             mr.get_encoding_parameters())
1614         def _check_encoding_parameters((k, n, segsize, datalen)):
1615             self.failUnlessEqual(k, 3)
1616             self.failUnlessEqual(n, 10)
1617             self.failUnlessEqual(segsize, 6)
1618             self.failUnlessEqual(datalen, 36)
1619         d.addCallback(_check_encoding_parameters)
1620
1621         d.addCallback(lambda ignored:
1622             mr.get_checkstring())
1623         d.addCallback(lambda checkstring:
1624             self.failUnlessEqual(checkstring, checkstring))
1625         return d
1626
1627
1628     def test_read_with_different_tail_segment_size(self):
1629         self.write_test_share_to_server("si1", tail_segment=True)
1630         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1631         d = mr.get_block_and_salt(5)
1632         def _check_tail_segment(results):
1633             block, salt = results
1634             self.failUnlessEqual(len(block), 1)
1635             self.failUnlessEqual(block, "a")
1636         d.addCallback(_check_tail_segment)
1637         return d
1638
1639
1640     def test_get_block_with_invalid_segnum(self):
1641         self.write_test_share_to_server("si1")
1642         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1643         d = defer.succeed(None)
1644         d.addCallback(lambda ignored:
1645             self.shouldFail(LayoutInvalid, "test invalid segnum",
1646                             None,
1647                             mr.get_block_and_salt, 7))
1648         return d
1649
1650
1651     def test_get_encoding_parameters_first(self):
1652         self.write_test_share_to_server("si1")
1653         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1654         d = mr.get_encoding_parameters()
1655         def _check_encoding_parameters((k, n, segment_size, datalen)):
1656             self.failUnlessEqual(k, 3)
1657             self.failUnlessEqual(n, 10)
1658             self.failUnlessEqual(segment_size, 6)
1659             self.failUnlessEqual(datalen, 36)
1660         d.addCallback(_check_encoding_parameters)
1661         return d
1662
1663
1664     def test_get_seqnum_first(self):
1665         self.write_test_share_to_server("si1")
1666         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1667         d = mr.get_seqnum()
1668         d.addCallback(lambda seqnum:
1669             self.failUnlessEqual(seqnum, 0))
1670         return d
1671
1672
1673     def test_get_root_hash_first(self):
1674         self.write_test_share_to_server("si1")
1675         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1676         d = mr.get_root_hash()
1677         d.addCallback(lambda root_hash:
1678             self.failUnlessEqual(root_hash, self.root_hash))
1679         return d
1680
1681
1682     def test_get_checkstring_first(self):
1683         self.write_test_share_to_server("si1")
1684         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1685         d = mr.get_checkstring()
1686         d.addCallback(lambda checkstring:
1687             self.failUnlessEqual(checkstring, self.checkstring))
1688         return d
1689
1690
1691     def test_write_read_vectors(self):
1692         # When writing for us, the storage server will return to us a
1693         # read vector, along with its result. If a write fails because
1694         # the test vectors failed, this read vector can help us to
1695         # diagnose the problem. This test ensures that the read vector
1696         # is working appropriately.
1697         mw = self._make_new_mw("si1", 0)
1698
1699         for i in xrange(6):
1700             mw.put_block(self.block, i, self.salt)
1701         mw.put_encprivkey(self.encprivkey)
1702         mw.put_blockhashes(self.block_hash_tree)
1703         mw.put_sharehashes(self.share_hash_chain)
1704         mw.put_root_hash(self.root_hash)
1705         mw.put_signature(self.signature)
1706         mw.put_verification_key(self.verification_key)
1707         d = mw.finish_publishing()
1708         def _then(results):
1709             self.failUnless(len(results), 2)
1710             result, readv = results
1711             self.failUnless(result)
1712             self.failIf(readv)
1713             self.old_checkstring = mw.get_checkstring()
1714             mw.set_checkstring("")
1715         d.addCallback(_then)
1716         d.addCallback(lambda ignored:
1717             mw.finish_publishing())
1718         def _then_again(results):
1719             self.failUnlessEqual(len(results), 2)
1720             result, readvs = results
1721             self.failIf(result)
1722             self.failUnlessIn(0, readvs)
1723             readv = readvs[0][0]
1724             self.failUnlessEqual(readv, self.old_checkstring)
1725         d.addCallback(_then_again)
1726         # The checkstring remains the same for the rest of the process.
1727         return d
1728
1729
1730     def test_private_key_after_share_hash_chain(self):
1731         mw = self._make_new_mw("si1", 0)
1732         d = defer.succeed(None)
1733         for i in xrange(6):
1734             d.addCallback(lambda ignored, i=i:
1735                 mw.put_block(self.block, i, self.salt))
1736         d.addCallback(lambda ignored:
1737             mw.put_encprivkey(self.encprivkey))
1738         d.addCallback(lambda ignored:
1739             mw.put_sharehashes(self.share_hash_chain))
1740
1741         # Now try to put the private key again.
1742         d.addCallback(lambda ignored:
1743             self.shouldFail(LayoutInvalid, "test repeat private key",
1744                             None,
1745                             mw.put_encprivkey, self.encprivkey))
1746         return d
1747
1748
1749     def test_signature_after_verification_key(self):
1750         mw = self._make_new_mw("si1", 0)
1751         d = defer.succeed(None)
1752         # Put everything up to and including the verification key.
1753         for i in xrange(6):
1754             d.addCallback(lambda ignored, i=i:
1755                 mw.put_block(self.block, i, self.salt))
1756         d.addCallback(lambda ignored:
1757             mw.put_encprivkey(self.encprivkey))
1758         d.addCallback(lambda ignored:
1759             mw.put_blockhashes(self.block_hash_tree))
1760         d.addCallback(lambda ignored:
1761             mw.put_sharehashes(self.share_hash_chain))
1762         d.addCallback(lambda ignored:
1763             mw.put_root_hash(self.root_hash))
1764         d.addCallback(lambda ignored:
1765             mw.put_signature(self.signature))
1766         d.addCallback(lambda ignored:
1767             mw.put_verification_key(self.verification_key))
1768         # Now try to put the signature again. This should fail
1769         d.addCallback(lambda ignored:
1770             self.shouldFail(LayoutInvalid, "signature after verification",
1771                             None,
1772                             mw.put_signature, self.signature))
1773         return d
1774
1775
1776     def test_uncoordinated_write(self):
1777         # Make two mutable writers, both pointing to the same storage
1778         # server, both at the same storage index, and try writing to the
1779         # same share.
1780         mw1 = self._make_new_mw("si1", 0)
1781         mw2 = self._make_new_mw("si1", 0)
1782
1783         def _check_success(results):
1784             result, readvs = results
1785             self.failUnless(result)
1786
1787         def _check_failure(results):
1788             result, readvs = results
1789             self.failIf(result)
1790
1791         def _write_share(mw):
1792             for i in xrange(6):
1793                 mw.put_block(self.block, i, self.salt)
1794             mw.put_encprivkey(self.encprivkey)
1795             mw.put_blockhashes(self.block_hash_tree)
1796             mw.put_sharehashes(self.share_hash_chain)
1797             mw.put_root_hash(self.root_hash)
1798             mw.put_signature(self.signature)
1799             mw.put_verification_key(self.verification_key)
1800             return mw.finish_publishing()
1801         d = _write_share(mw1)
1802         d.addCallback(_check_success)
1803         d.addCallback(lambda ignored:
1804             _write_share(mw2))
1805         d.addCallback(_check_failure)
1806         return d
1807
1808
1809     def test_invalid_salt_size(self):
1810         # Salts need to be 16 bytes in size. Writes that attempt to
1811         # write more or less than this should be rejected.
1812         mw = self._make_new_mw("si1", 0)
1813         invalid_salt = "a" * 17 # 17 bytes
1814         another_invalid_salt = "b" * 15 # 15 bytes
1815         d = defer.succeed(None)
1816         d.addCallback(lambda ignored:
1817             self.shouldFail(LayoutInvalid, "salt too big",
1818                             None,
1819                             mw.put_block, self.block, 0, invalid_salt))
1820         d.addCallback(lambda ignored:
1821             self.shouldFail(LayoutInvalid, "salt too small",
1822                             None,
1823                             mw.put_block, self.block, 0,
1824                             another_invalid_salt))
1825         return d
1826
1827
1828     def test_write_test_vectors(self):
1829         # If we give the write proxy a bogus test vector at 
1830         # any point during the process, it should fail to write when we 
1831         # tell it to write.
1832         def _check_failure(results):
1833             self.failUnlessEqual(len(results), 2)
1834             res, d = results
1835             self.failIf(res)
1836
1837         def _check_success(results):
1838             self.failUnlessEqual(len(results), 2)
1839             res, d = results
1840             self.failUnless(results)
1841
1842         mw = self._make_new_mw("si1", 0)
1843         mw.set_checkstring("this is a lie")
1844         for i in xrange(6):
1845             mw.put_block(self.block, i, self.salt)
1846         mw.put_encprivkey(self.encprivkey)
1847         mw.put_blockhashes(self.block_hash_tree)
1848         mw.put_sharehashes(self.share_hash_chain)
1849         mw.put_root_hash(self.root_hash)
1850         mw.put_signature(self.signature)
1851         mw.put_verification_key(self.verification_key)
1852         d = mw.finish_publishing()
1853         d.addCallback(_check_failure)
1854         d.addCallback(lambda ignored:
1855             mw.set_checkstring(""))
1856         d.addCallback(lambda ignored:
1857             mw.finish_publishing())
1858         d.addCallback(_check_success)
1859         return d
1860
1861
1862     def serialize_blockhashes(self, blockhashes):
1863         return "".join(blockhashes)
1864
1865
1866     def serialize_sharehashes(self, sharehashes):
1867         ret = "".join([struct.pack(">H32s", i, sharehashes[i])
1868                         for i in sorted(sharehashes.keys())])
1869         return ret
1870
1871
1872     def test_write(self):
1873         # This translates to a file with 6 6-byte segments, and with 2-byte
1874         # blocks.
1875         mw = self._make_new_mw("si1", 0)
1876         # Test writing some blocks.
1877         read = self.ss.remote_slot_readv
1878         expected_private_key_offset = struct.calcsize(MDMFHEADER)
1879         expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
1880                                     PRIVATE_KEY_SIZE + \
1881                                     SIGNATURE_SIZE + \
1882                                     VERIFICATION_KEY_SIZE + \
1883                                     SHARE_HASH_CHAIN_SIZE
1884         written_block_size = 2 + len(self.salt)
1885         written_block = self.block + self.salt
1886         for i in xrange(6):
1887             mw.put_block(self.block, i, self.salt)
1888
1889         mw.put_encprivkey(self.encprivkey)
1890         mw.put_blockhashes(self.block_hash_tree)
1891         mw.put_sharehashes(self.share_hash_chain)
1892         mw.put_root_hash(self.root_hash)
1893         mw.put_signature(self.signature)
1894         mw.put_verification_key(self.verification_key)
1895         d = mw.finish_publishing()
1896         def _check_publish(results):
1897             self.failUnlessEqual(len(results), 2)
1898             result, ign = results
1899             self.failUnless(result, "publish failed")
1900             for i in xrange(6):
1901                 self.failUnlessEqual(read("si1", [0], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]),
1902                                 {0: [written_block]})
1903
1904             self.failUnlessEqual(len(self.encprivkey), 7)
1905             self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]),
1906                                  {0: [self.encprivkey]})
1907
1908             expected_block_hash_offset = expected_sharedata_offset + \
1909                         (6 * written_block_size)
1910             self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6)
1911             self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]),
1912                                  {0: [self.block_hash_tree_s]})
1913
1914             expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey)
1915             self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
1916                                  {0: [self.share_hash_chain_s]})
1917
1918             self.failUnlessEqual(read("si1", [0], [(9, 32)]),
1919                                  {0: [self.root_hash]})
1920             expected_signature_offset = expected_share_hash_offset + \
1921                 len(self.share_hash_chain_s)
1922             self.failUnlessEqual(len(self.signature), 9)
1923             self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]),
1924                                  {0: [self.signature]})
1925
1926             expected_verification_key_offset = expected_signature_offset + len(self.signature)
1927             self.failUnlessEqual(len(self.verification_key), 6)
1928             self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]),
1929                                  {0: [self.verification_key]})
1930
1931             signable = mw.get_signable()
1932             verno, seq, roothash, k, n, segsize, datalen = \
1933                                             struct.unpack(">BQ32sBBQQ",
1934                                                           signable)
1935             self.failUnlessEqual(verno, 1)
1936             self.failUnlessEqual(seq, 0)
1937             self.failUnlessEqual(roothash, self.root_hash)
1938             self.failUnlessEqual(k, 3)
1939             self.failUnlessEqual(n, 10)
1940             self.failUnlessEqual(segsize, 6)
1941             self.failUnlessEqual(datalen, 36)
1942             expected_eof_offset = expected_block_hash_offset + \
1943                 len(self.block_hash_tree_s)
1944
1945             # Check the version number to make sure that it is correct.
1946             expected_version_number = struct.pack(">B", 1)
1947             self.failUnlessEqual(read("si1", [0], [(0, 1)]),
1948                                  {0: [expected_version_number]})
1949             # Check the sequence number to make sure that it is correct
1950             expected_sequence_number = struct.pack(">Q", 0)
1951             self.failUnlessEqual(read("si1", [0], [(1, 8)]),
1952                                  {0: [expected_sequence_number]})
1953             # Check that the encoding parameters (k, N, segement size, data
1954             # length) are what they should be. These are  3, 10, 6, 36
1955             expected_k = struct.pack(">B", 3)
1956             self.failUnlessEqual(read("si1", [0], [(41, 1)]),
1957                                  {0: [expected_k]})
1958             expected_n = struct.pack(">B", 10)
1959             self.failUnlessEqual(read("si1", [0], [(42, 1)]),
1960                                  {0: [expected_n]})
1961             expected_segment_size = struct.pack(">Q", 6)
1962             self.failUnlessEqual(read("si1", [0], [(43, 8)]),
1963                                  {0: [expected_segment_size]})
1964             expected_data_length = struct.pack(">Q", 36)
1965             self.failUnlessEqual(read("si1", [0], [(51, 8)]),
1966                                  {0: [expected_data_length]})
1967             expected_offset = struct.pack(">Q", expected_private_key_offset)
1968             self.failUnlessEqual(read("si1", [0], [(59, 8)]),
1969                                  {0: [expected_offset]})
1970             expected_offset = struct.pack(">Q", expected_share_hash_offset)
1971             self.failUnlessEqual(read("si1", [0], [(67, 8)]),
1972                                  {0: [expected_offset]})
1973             expected_offset = struct.pack(">Q", expected_signature_offset)
1974             self.failUnlessEqual(read("si1", [0], [(75, 8)]),
1975                                  {0: [expected_offset]})
1976             expected_offset = struct.pack(">Q", expected_verification_key_offset)
1977             self.failUnlessEqual(read("si1", [0], [(83, 8)]),
1978                                  {0: [expected_offset]})
1979             expected_offset = struct.pack(">Q", expected_verification_key_offset + len(self.verification_key))
1980             self.failUnlessEqual(read("si1", [0], [(91, 8)]),
1981                                  {0: [expected_offset]})
1982             expected_offset = struct.pack(">Q", expected_sharedata_offset)
1983             self.failUnlessEqual(read("si1", [0], [(99, 8)]),
1984                                  {0: [expected_offset]})
1985             expected_offset = struct.pack(">Q", expected_block_hash_offset)
1986             self.failUnlessEqual(read("si1", [0], [(107, 8)]),
1987                                  {0: [expected_offset]})
1988             expected_offset = struct.pack(">Q", expected_eof_offset)
1989             self.failUnlessEqual(read("si1", [0], [(115, 8)]),
1990                                  {0: [expected_offset]})
1991         d.addCallback(_check_publish)
1992         return d
1993
1994     def _make_new_mw(self, si, share, datalength=36):
1995         # This is a file of size 36 bytes. Since it has a segment
1996         # size of 6, we know that it has 6 byte segments, which will
1997         # be split into blocks of 2 bytes because our FEC k
1998         # parameter is 3.
1999         mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
2000                                 6, datalength)
2001         return mw
2002
2003
2004     def test_write_rejected_with_too_many_blocks(self):
2005         mw = self._make_new_mw("si0", 0)
2006
2007         # Try writing too many blocks. We should not be able to write
2008         # more than 6
2009         # blocks into each share.
2010         d = defer.succeed(None)
2011         for i in xrange(6):
2012             d.addCallback(lambda ignored, i=i:
2013                 mw.put_block(self.block, i, self.salt))
2014         d.addCallback(lambda ignored:
2015             self.shouldFail(LayoutInvalid, "too many blocks",
2016                             None,
2017                             mw.put_block, self.block, 7, self.salt))
2018         return d
2019
2020
2021     def test_write_rejected_with_invalid_salt(self):
2022         # Try writing an invalid salt. Salts are 16 bytes -- any more or
2023         # less should cause an error.
2024         mw = self._make_new_mw("si1", 0)
2025         bad_salt = "a" * 17 # 17 bytes
2026         d = defer.succeed(None)
2027         d.addCallback(lambda ignored:
2028             self.shouldFail(LayoutInvalid, "test_invalid_salt",
2029                             None, mw.put_block, self.block, 7, bad_salt))
2030         return d
2031
2032
2033     def test_write_rejected_with_invalid_root_hash(self):
2034         # Try writing an invalid root hash. This should be SHA256d, and
2035         # 32 bytes long as a result.
2036         mw = self._make_new_mw("si2", 0)
2037         # 17 bytes != 32 bytes
2038         invalid_root_hash = "a" * 17
2039         d = defer.succeed(None)
2040         # Before this test can work, we need to put some blocks + salts,
2041         # a block hash tree, and a share hash tree. Otherwise, we'll see
2042         # failures that match what we are looking for, but are caused by
2043         # the constraints imposed on operation ordering.
2044         for i in xrange(6):
2045             d.addCallback(lambda ignored, i=i:
2046                 mw.put_block(self.block, i, self.salt))
2047         d.addCallback(lambda ignored:
2048             mw.put_encprivkey(self.encprivkey))
2049         d.addCallback(lambda ignored:
2050             mw.put_blockhashes(self.block_hash_tree))
2051         d.addCallback(lambda ignored:
2052             mw.put_sharehashes(self.share_hash_chain))
2053         d.addCallback(lambda ignored:
2054             self.shouldFail(LayoutInvalid, "invalid root hash",
2055                             None, mw.put_root_hash, invalid_root_hash))
2056         return d
2057
2058
2059     def test_write_rejected_with_invalid_blocksize(self):
2060         # The blocksize implied by the writer that we get from
2061         # _make_new_mw is 2bytes -- any more or any less than this
2062         # should be cause for failure, unless it is the tail segment, in
2063         # which case it may not be failure.
2064         invalid_block = "a"
2065         mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
2066                                              # one byte blocks
2067         # 1 bytes != 2 bytes
2068         d = defer.succeed(None)
2069         d.addCallback(lambda ignored, invalid_block=invalid_block:
2070             self.shouldFail(LayoutInvalid, "test blocksize too small",
2071                             None, mw.put_block, invalid_block, 0,
2072                             self.salt))
2073         invalid_block = invalid_block * 3
2074         # 3 bytes != 2 bytes
2075         d.addCallback(lambda ignored:
2076             self.shouldFail(LayoutInvalid, "test blocksize too large",
2077                             None,
2078                             mw.put_block, invalid_block, 0, self.salt))
2079         for i in xrange(5):
2080             d.addCallback(lambda ignored, i=i:
2081                 mw.put_block(self.block, i, self.salt))
2082         # Try to put an invalid tail segment
2083         d.addCallback(lambda ignored:
2084             self.shouldFail(LayoutInvalid, "test invalid tail segment",
2085                             None,
2086                             mw.put_block, self.block, 5, self.salt))
2087         valid_block = "a"
2088         d.addCallback(lambda ignored:
2089             mw.put_block(valid_block, 5, self.salt))
2090         return d
2091
2092
2093     def test_write_enforces_order_constraints(self):
2094         # We require that the MDMFSlotWriteProxy be interacted with in a
2095         # specific way.
2096         # That way is:
2097         # 0: __init__
2098         # 1: write blocks and salts
2099         # 2: Write the encrypted private key
2100         # 3: Write the block hashes
2101         # 4: Write the share hashes
2102         # 5: Write the root hash and salt hash
2103         # 6: Write the signature and verification key
2104         # 7: Write the file.
2105         # 
2106         # Some of these can be performed out-of-order, and some can't.
2107         # The dependencies that I want to test here are:
2108         #  - Private key before block hashes
2109         #  - share hashes and block hashes before root hash
2110         #  - root hash before signature
2111         #  - signature before verification key
2112         mw0 = self._make_new_mw("si0", 0)
2113         # Write some shares
2114         d = defer.succeed(None)
2115         for i in xrange(6):
2116             d.addCallback(lambda ignored, i=i:
2117                 mw0.put_block(self.block, i, self.salt))
2118
2119         # Try to write the share hash chain without writing the
2120         # encrypted private key
2121         d.addCallback(lambda ignored:
2122             self.shouldFail(LayoutInvalid, "share hash chain before "
2123                                            "private key",
2124                             None,
2125                             mw0.put_sharehashes, self.share_hash_chain))
2126         # Write the private key.
2127         d.addCallback(lambda ignored:
2128             mw0.put_encprivkey(self.encprivkey))
2129
2130         # Now write the block hashes and try again
2131         d.addCallback(lambda ignored:
2132             mw0.put_blockhashes(self.block_hash_tree))
2133
2134         # We haven't yet put the root hash on the share, so we shouldn't
2135         # be able to sign it.
2136         d.addCallback(lambda ignored:
2137             self.shouldFail(LayoutInvalid, "signature before root hash",
2138                             None, mw0.put_signature, self.signature))
2139
2140         d.addCallback(lambda ignored:
2141             self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
2142
2143         # ..and, since that fails, we also shouldn't be able to put the
2144         # verification key.
2145         d.addCallback(lambda ignored:
2146             self.shouldFail(LayoutInvalid, "key before signature",
2147                             None, mw0.put_verification_key,
2148                             self.verification_key))
2149
2150         # Now write the share hashes.
2151         d.addCallback(lambda ignored:
2152             mw0.put_sharehashes(self.share_hash_chain))
2153         # We should be able to write the root hash now too
2154         d.addCallback(lambda ignored:
2155             mw0.put_root_hash(self.root_hash))
2156
2157         # We should still be unable to put the verification key
2158         d.addCallback(lambda ignored:
2159             self.shouldFail(LayoutInvalid, "key before signature",
2160                             None, mw0.put_verification_key,
2161                             self.verification_key))
2162
2163         d.addCallback(lambda ignored:
2164             mw0.put_signature(self.signature))
2165
2166         # We shouldn't be able to write the offsets to the remote server
2167         # until the offset table is finished; IOW, until we have written
2168         # the verification key.
2169         d.addCallback(lambda ignored:
2170             self.shouldFail(LayoutInvalid, "offsets before verification key",
2171                             None,
2172                             mw0.finish_publishing))
2173
2174         d.addCallback(lambda ignored:
2175             mw0.put_verification_key(self.verification_key))
2176         return d
2177
2178
2179     def test_end_to_end(self):
2180         mw = self._make_new_mw("si1", 0)
2181         # Write a share using the mutable writer, and make sure that the
2182         # reader knows how to read everything back to us.
2183         d = defer.succeed(None)
2184         for i in xrange(6):
2185             d.addCallback(lambda ignored, i=i:
2186                 mw.put_block(self.block, i, self.salt))
2187         d.addCallback(lambda ignored:
2188             mw.put_encprivkey(self.encprivkey))
2189         d.addCallback(lambda ignored:
2190             mw.put_blockhashes(self.block_hash_tree))
2191         d.addCallback(lambda ignored:
2192             mw.put_sharehashes(self.share_hash_chain))
2193         d.addCallback(lambda ignored:
2194             mw.put_root_hash(self.root_hash))
2195         d.addCallback(lambda ignored:
2196             mw.put_signature(self.signature))
2197         d.addCallback(lambda ignored:
2198             mw.put_verification_key(self.verification_key))
2199         d.addCallback(lambda ignored:
2200             mw.finish_publishing())
2201
2202         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2203         def _check_block_and_salt((block, salt)):
2204             self.failUnlessEqual(block, self.block)
2205             self.failUnlessEqual(salt, self.salt)
2206
2207         for i in xrange(6):
2208             d.addCallback(lambda ignored, i=i:
2209                 mr.get_block_and_salt(i))
2210             d.addCallback(_check_block_and_salt)
2211
2212         d.addCallback(lambda ignored:
2213             mr.get_encprivkey())
2214         d.addCallback(lambda encprivkey:
2215             self.failUnlessEqual(self.encprivkey, encprivkey))
2216
2217         d.addCallback(lambda ignored:
2218             mr.get_blockhashes())
2219         d.addCallback(lambda blockhashes:
2220             self.failUnlessEqual(self.block_hash_tree, blockhashes))
2221
2222         d.addCallback(lambda ignored:
2223             mr.get_sharehashes())
2224         d.addCallback(lambda sharehashes:
2225             self.failUnlessEqual(self.share_hash_chain, sharehashes))
2226
2227         d.addCallback(lambda ignored:
2228             mr.get_signature())
2229         d.addCallback(lambda signature:
2230             self.failUnlessEqual(signature, self.signature))
2231
2232         d.addCallback(lambda ignored:
2233             mr.get_verification_key())
2234         d.addCallback(lambda verification_key:
2235             self.failUnlessEqual(verification_key, self.verification_key))
2236
2237         d.addCallback(lambda ignored:
2238             mr.get_seqnum())
2239         d.addCallback(lambda seqnum:
2240             self.failUnlessEqual(seqnum, 0))
2241
2242         d.addCallback(lambda ignored:
2243             mr.get_root_hash())
2244         d.addCallback(lambda root_hash:
2245             self.failUnlessEqual(self.root_hash, root_hash))
2246
2247         d.addCallback(lambda ignored:
2248             mr.get_encoding_parameters())
2249         def _check_encoding_parameters((k, n, segsize, datalen)):
2250             self.failUnlessEqual(k, 3)
2251             self.failUnlessEqual(n, 10)
2252             self.failUnlessEqual(segsize, 6)
2253             self.failUnlessEqual(datalen, 36)
2254         d.addCallback(_check_encoding_parameters)
2255
2256         d.addCallback(lambda ignored:
2257             mr.get_checkstring())
2258         d.addCallback(lambda checkstring:
2259             self.failUnlessEqual(checkstring, mw.get_checkstring()))
2260         return d
2261
2262
2263     def test_is_sdmf(self):
2264         # The MDMFSlotReadProxy should also know how to read SDMF files,
2265         # since it will encounter them on the grid. Callers use the
2266         # is_sdmf method to test this.
2267         self.write_sdmf_share_to_server("si1")
2268         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2269         d = mr.is_sdmf()
2270         d.addCallback(lambda issdmf:
2271             self.failUnless(issdmf))
2272         return d
2273
2274
2275     def test_reads_sdmf(self):
2276         # The slot read proxy should, naturally, know how to tell us
2277         # about data in the SDMF format
2278         self.write_sdmf_share_to_server("si1")
2279         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2280         d = defer.succeed(None)
2281         d.addCallback(lambda ignored:
2282             mr.is_sdmf())
2283         d.addCallback(lambda issdmf:
2284             self.failUnless(issdmf))
2285
2286         # What do we need to read?
2287         #  - The sharedata
2288         #  - The salt
2289         d.addCallback(lambda ignored:
2290             mr.get_block_and_salt(0))
2291         def _check_block_and_salt(results):
2292             block, salt = results
2293             # Our original file is 36 bytes long. Then each share is 12
2294             # bytes in size. The share is composed entirely of the
2295             # letter a. self.block contains 2 as, so 6 * self.block is
2296             # what we are looking for.
2297             self.failUnlessEqual(block, self.block * 6)
2298             self.failUnlessEqual(salt, self.salt)
2299         d.addCallback(_check_block_and_salt)
2300
2301         #  - The blockhashes
2302         d.addCallback(lambda ignored:
2303             mr.get_blockhashes())
2304         d.addCallback(lambda blockhashes:
2305             self.failUnlessEqual(self.block_hash_tree,
2306                                  blockhashes,
2307                                  blockhashes))
2308         #  - The sharehashes
2309         d.addCallback(lambda ignored:
2310             mr.get_sharehashes())
2311         d.addCallback(lambda sharehashes:
2312             self.failUnlessEqual(self.share_hash_chain,
2313                                  sharehashes))
2314         #  - The keys
2315         d.addCallback(lambda ignored:
2316             mr.get_encprivkey())
2317         d.addCallback(lambda encprivkey:
2318             self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
2319         d.addCallback(lambda ignored:
2320             mr.get_verification_key())
2321         d.addCallback(lambda verification_key:
2322             self.failUnlessEqual(verification_key,
2323                                  self.verification_key,
2324                                  verification_key))
2325         #  - The signature
2326         d.addCallback(lambda ignored:
2327             mr.get_signature())
2328         d.addCallback(lambda signature:
2329             self.failUnlessEqual(signature, self.signature, signature))
2330
2331         #  - The sequence number
2332         d.addCallback(lambda ignored:
2333             mr.get_seqnum())
2334         d.addCallback(lambda seqnum:
2335             self.failUnlessEqual(seqnum, 0, seqnum))
2336
2337         #  - The root hash
2338         d.addCallback(lambda ignored:
2339             mr.get_root_hash())
2340         d.addCallback(lambda root_hash:
2341             self.failUnlessEqual(root_hash, self.root_hash, root_hash))
2342         return d
2343
2344
2345     def test_only_reads_one_segment_sdmf(self):
2346         # SDMF shares have only one segment, so it doesn't make sense to
2347         # read more segments than that. The reader should know this and
2348         # complain if we try to do that.
2349         self.write_sdmf_share_to_server("si1")
2350         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2351         d = defer.succeed(None)
2352         d.addCallback(lambda ignored:
2353             mr.is_sdmf())
2354         d.addCallback(lambda issdmf:
2355             self.failUnless(issdmf))
2356         d.addCallback(lambda ignored:
2357             self.shouldFail(LayoutInvalid, "test bad segment",
2358                             None,
2359                             mr.get_block_and_salt, 1))
2360         return d
2361
2362
2363     def test_read_with_prefetched_mdmf_data(self):
2364         # The MDMFSlotReadProxy will prefill certain fields if you pass
2365         # it data that you have already fetched. This is useful for
2366         # cases like the Servermap, which prefetches ~2kb of data while
2367         # finding out which shares are on the remote peer so that it
2368         # doesn't waste round trips.
2369         mdmf_data = self.build_test_mdmf_share()
2370         self.write_test_share_to_server("si1")
2371         def _make_mr(ignored, length):
2372             mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
2373             return mr
2374
2375         d = defer.succeed(None)
2376         # This should be enough to fill in both the encoding parameters
2377         # and the table of offsets, which will complete the version
2378         # information tuple.
2379         d.addCallback(_make_mr, 123)
2380         d.addCallback(lambda mr:
2381             mr.get_verinfo())
2382         def _check_verinfo(verinfo):
2383             self.failUnless(verinfo)
2384             self.failUnlessEqual(len(verinfo), 9)
2385             (seqnum,
2386              root_hash,
2387              salt_hash,
2388              segsize,
2389              datalen,
2390              k,
2391              n,
2392              prefix,
2393              offsets) = verinfo
2394             self.failUnlessEqual(seqnum, 0)
2395             self.failUnlessEqual(root_hash, self.root_hash)
2396             self.failUnlessEqual(segsize, 6)
2397             self.failUnlessEqual(datalen, 36)
2398             self.failUnlessEqual(k, 3)
2399             self.failUnlessEqual(n, 10)
2400             expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
2401                                           1,
2402                                           seqnum,
2403                                           root_hash,
2404                                           k,
2405                                           n,
2406                                           segsize,
2407                                           datalen)
2408             self.failUnlessEqual(expected_prefix, prefix)
2409             self.failUnlessEqual(self.rref.read_count, 0)
2410         d.addCallback(_check_verinfo)
2411         # This is not enough data to read a block and a share, so the
2412         # wrapper should attempt to read this from the remote server.
2413         d.addCallback(_make_mr, 123)
2414         d.addCallback(lambda mr:
2415             mr.get_block_and_salt(0))
2416         def _check_block_and_salt((block, salt)):
2417             self.failUnlessEqual(block, self.block)
2418             self.failUnlessEqual(salt, self.salt)
2419             self.failUnlessEqual(self.rref.read_count, 1)
2420         # This should be enough data to read one block.
2421         d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140)
2422         d.addCallback(lambda mr:
2423             mr.get_block_and_salt(0))
2424         d.addCallback(_check_block_and_salt)
2425         return d
2426
2427
2428     def test_read_with_prefetched_sdmf_data(self):
2429         sdmf_data = self.build_test_sdmf_share()
2430         self.write_sdmf_share_to_server("si1")
2431         def _make_mr(ignored, length):
2432             mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
2433             return mr
2434
2435         d = defer.succeed(None)
2436         # This should be enough to get us the encoding parameters,
2437         # offset table, and everything else we need to build a verinfo
2438         # string.
2439         d.addCallback(_make_mr, 123)
2440         d.addCallback(lambda mr:
2441             mr.get_verinfo())
2442         def _check_verinfo(verinfo):
2443             self.failUnless(verinfo)
2444             self.failUnlessEqual(len(verinfo), 9)
2445             (seqnum,
2446              root_hash,
2447              salt,
2448              segsize,
2449              datalen,
2450              k,
2451              n,
2452              prefix,
2453              offsets) = verinfo
2454             self.failUnlessEqual(seqnum, 0)
2455             self.failUnlessEqual(root_hash, self.root_hash)
2456             self.failUnlessEqual(salt, self.salt)
2457             self.failUnlessEqual(segsize, 36)
2458             self.failUnlessEqual(datalen, 36)
2459             self.failUnlessEqual(k, 3)
2460             self.failUnlessEqual(n, 10)
2461             expected_prefix = struct.pack(SIGNED_PREFIX,
2462                                           0,
2463                                           seqnum,
2464                                           root_hash,
2465                                           salt,
2466                                           k,
2467                                           n,
2468                                           segsize,
2469                                           datalen)
2470             self.failUnlessEqual(expected_prefix, prefix)
2471             self.failUnlessEqual(self.rref.read_count, 0)
2472         d.addCallback(_check_verinfo)
2473         # This shouldn't be enough to read any share data.
2474         d.addCallback(_make_mr, 123)
2475         d.addCallback(lambda mr:
2476             mr.get_block_and_salt(0))
2477         def _check_block_and_salt((block, salt)):
2478             self.failUnlessEqual(block, self.block * 6)
2479             self.failUnlessEqual(salt, self.salt)
2480             # TODO: Fix the read routine so that it reads only the data
2481             #       that it has cached if it can't read all of it.
2482             self.failUnlessEqual(self.rref.read_count, 2)
2483
2484         # This should be enough to read share data.
2485         d.addCallback(_make_mr, self.offsets['share_data'])
2486         d.addCallback(lambda mr:
2487             mr.get_block_and_salt(0))
2488         d.addCallback(_check_block_and_salt)
2489         return d
2490
2491
2492     def test_read_with_empty_mdmf_file(self):
2493         # Some tests upload a file with no contents to test things
2494         # unrelated to the actual handling of the content of the file.
2495         # The reader should behave intelligently in these cases.
2496         self.write_test_share_to_server("si1", empty=True)
2497         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2498         # We should be able to get the encoding parameters, and they
2499         # should be correct.
2500         d = defer.succeed(None)
2501         d.addCallback(lambda ignored:
2502             mr.get_encoding_parameters())
2503         def _check_encoding_parameters(params):
2504             self.failUnlessEqual(len(params), 4)
2505             k, n, segsize, datalen = params
2506             self.failUnlessEqual(k, 3)
2507             self.failUnlessEqual(n, 10)
2508             self.failUnlessEqual(segsize, 0)
2509             self.failUnlessEqual(datalen, 0)
2510         d.addCallback(_check_encoding_parameters)
2511
2512         # We should not be able to fetch a block, since there are no
2513         # blocks to fetch
2514         d.addCallback(lambda ignored:
2515             self.shouldFail(LayoutInvalid, "get block on empty file",
2516                             None,
2517                             mr.get_block_and_salt, 0))
2518         return d
2519
2520
2521     def test_read_with_empty_sdmf_file(self):
2522         self.write_sdmf_share_to_server("si1", empty=True)
2523         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2524         # We should be able to get the encoding parameters, and they
2525         # should be correct
2526         d = defer.succeed(None)
2527         d.addCallback(lambda ignored:
2528             mr.get_encoding_parameters())
2529         def _check_encoding_parameters(params):
2530             self.failUnlessEqual(len(params), 4)
2531             k, n, segsize, datalen = params
2532             self.failUnlessEqual(k, 3)
2533             self.failUnlessEqual(n, 10)
2534             self.failUnlessEqual(segsize, 0)
2535             self.failUnlessEqual(datalen, 0)
2536         d.addCallback(_check_encoding_parameters)
2537
2538         # It does not make sense to get a block in this format, so we
2539         # should not be able to.
2540         d.addCallback(lambda ignored:
2541             self.shouldFail(LayoutInvalid, "get block on an empty file",
2542                             None,
2543                             mr.get_block_and_salt, 0))
2544         return d
2545
2546
2547     def test_verinfo_with_sdmf_file(self):
2548         self.write_sdmf_share_to_server("si1")
2549         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2550         # We should be able to get the version information.
2551         d = defer.succeed(None)
2552         d.addCallback(lambda ignored:
2553             mr.get_verinfo())
2554         def _check_verinfo(verinfo):
2555             self.failUnless(verinfo)
2556             self.failUnlessEqual(len(verinfo), 9)
2557             (seqnum,
2558              root_hash,
2559              salt,
2560              segsize,
2561              datalen,
2562              k,
2563              n,
2564              prefix,
2565              offsets) = verinfo
2566             self.failUnlessEqual(seqnum, 0)
2567             self.failUnlessEqual(root_hash, self.root_hash)
2568             self.failUnlessEqual(salt, self.salt)
2569             self.failUnlessEqual(segsize, 36)
2570             self.failUnlessEqual(datalen, 36)
2571             self.failUnlessEqual(k, 3)
2572             self.failUnlessEqual(n, 10)
2573             expected_prefix = struct.pack(">BQ32s16s BBQQ",
2574                                           0,
2575                                           seqnum,
2576                                           root_hash,
2577                                           salt,
2578                                           k,
2579                                           n,
2580                                           segsize,
2581                                           datalen)
2582             self.failUnlessEqual(prefix, expected_prefix)
2583             self.failUnlessEqual(offsets, self.offsets)
2584         d.addCallback(_check_verinfo)
2585         return d
2586
2587
2588     def test_verinfo_with_mdmf_file(self):
2589         self.write_test_share_to_server("si1")
2590         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2591         d = defer.succeed(None)
2592         d.addCallback(lambda ignored:
2593             mr.get_verinfo())
2594         def _check_verinfo(verinfo):
2595             self.failUnless(verinfo)
2596             self.failUnlessEqual(len(verinfo), 9)
2597             (seqnum,
2598              root_hash,
2599              IV,
2600              segsize,
2601              datalen,
2602              k,
2603              n,
2604              prefix,
2605              offsets) = verinfo
2606             self.failUnlessEqual(seqnum, 0)
2607             self.failUnlessEqual(root_hash, self.root_hash)
2608             self.failIf(IV)
2609             self.failUnlessEqual(segsize, 6)
2610             self.failUnlessEqual(datalen, 36)
2611             self.failUnlessEqual(k, 3)
2612             self.failUnlessEqual(n, 10)
2613             expected_prefix = struct.pack(">BQ32s BBQQ",
2614                                           1,
2615                                           seqnum,
2616                                           root_hash,
2617                                           k,
2618                                           n,
2619                                           segsize,
2620                                           datalen)
2621             self.failUnlessEqual(prefix, expected_prefix)
2622             self.failUnlessEqual(offsets, self.offsets)
2623         d.addCallback(_check_verinfo)
2624         return d
2625
2626
2627     def test_sdmf_writer(self):
2628         # Go through the motions of writing an SDMF share to the storage
2629         # server. Then read the storage server to see that the share got
2630         # written in the way that we think it should have. 
2631
2632         # We do this first so that the necessary instance variables get
2633         # set the way we want them for the tests below.
2634         data = self.build_test_sdmf_share()
2635         sdmfr = SDMFSlotWriteProxy(0,
2636                                    self.rref,
2637                                    "si1",
2638                                    self.secrets,
2639                                    0, 3, 10, 36, 36)
2640         # Put the block and salt.
2641         sdmfr.put_block(self.blockdata, 0, self.salt)
2642
2643         # Put the encprivkey
2644         sdmfr.put_encprivkey(self.encprivkey)
2645
2646         # Put the block and share hash chains
2647         sdmfr.put_blockhashes(self.block_hash_tree)
2648         sdmfr.put_sharehashes(self.share_hash_chain)
2649         sdmfr.put_root_hash(self.root_hash)
2650
2651         # Put the signature
2652         sdmfr.put_signature(self.signature)
2653
2654         # Put the verification key
2655         sdmfr.put_verification_key(self.verification_key)
2656
2657         # Now check to make sure that nothing has been written yet.
2658         self.failUnlessEqual(self.rref.write_count, 0)
2659
2660         # Now finish publishing
2661         d = sdmfr.finish_publishing()
2662         def _then(ignored):
2663             self.failUnlessEqual(self.rref.write_count, 1)
2664             read = self.ss.remote_slot_readv
2665             self.failUnlessEqual(read("si1", [0], [(0, len(data))]),
2666                                  {0: [data]})
2667         d.addCallback(_then)
2668         return d
2669
2670
2671     def test_sdmf_writer_preexisting_share(self):
2672         data = self.build_test_sdmf_share()
2673         self.write_sdmf_share_to_server("si1")
2674
2675         # Now there is a share on the storage server. To successfully
2676         # write, we need to set the checkstring correctly. When we
2677         # don't, no write should occur.
2678         sdmfw = SDMFSlotWriteProxy(0,
2679                                    self.rref,
2680                                    "si1",
2681                                    self.secrets,
2682                                    1, 3, 10, 36, 36)
2683         sdmfw.put_block(self.blockdata, 0, self.salt)
2684
2685         # Put the encprivkey
2686         sdmfw.put_encprivkey(self.encprivkey)
2687
2688         # Put the block and share hash chains
2689         sdmfw.put_blockhashes(self.block_hash_tree)
2690         sdmfw.put_sharehashes(self.share_hash_chain)
2691
2692         # Put the root hash
2693         sdmfw.put_root_hash(self.root_hash)
2694
2695         # Put the signature
2696         sdmfw.put_signature(self.signature)
2697
2698         # Put the verification key
2699         sdmfw.put_verification_key(self.verification_key)
2700
2701         # We shouldn't have a checkstring yet
2702         self.failUnlessEqual(sdmfw.get_checkstring(), "")
2703
2704         d = sdmfw.finish_publishing()
2705         def _then(results):
2706             self.failIf(results[0])
2707             # this is the correct checkstring
2708             self._expected_checkstring = results[1][0][0]
2709             return self._expected_checkstring
2710
2711         d.addCallback(_then)
2712         d.addCallback(sdmfw.set_checkstring)
2713         d.addCallback(lambda ignored:
2714             sdmfw.get_checkstring())
2715         d.addCallback(lambda checkstring:
2716             self.failUnlessEqual(checkstring, self._expected_checkstring))
2717         d.addCallback(lambda ignored:
2718             sdmfw.finish_publishing())
2719         def _then_again(results):
2720             self.failUnless(results[0])
2721             read = self.ss.remote_slot_readv
2722             self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2723                                  {0: [struct.pack(">Q", 1)]})
2724             self.failUnlessEqual(read("si1", [0], [(9, len(data) - 9)]),
2725                                  {0: [data[9:]]})
2726         d.addCallback(_then_again)
2727         return d
2728
2729
2730 class Stats(unittest.TestCase):
2731
2732     def setUp(self):
2733         self.sparent = LoggingServiceParent()
2734         self._lease_secret = itertools.count()
2735     def tearDown(self):
2736         return self.sparent.stopService()
2737
2738     def workdir(self, name):
2739         basedir = os.path.join("storage", "Server", name)
2740         return basedir
2741
2742     def create(self, name):
2743         workdir = self.workdir(name)
2744         ss = StorageServer(workdir, "\x00" * 20)
2745         ss.setServiceParent(self.sparent)
2746         return ss
2747
2748     def test_latencies(self):
2749         ss = self.create("test_latencies")
2750         for i in range(10000):
2751             ss.add_latency("allocate", 1.0 * i)
2752         for i in range(1000):
2753             ss.add_latency("renew", 1.0 * i)
2754         for i in range(20):
2755             ss.add_latency("write", 1.0 * i)
2756         for i in range(10):
2757             ss.add_latency("cancel", 2.0 * i)
2758         ss.add_latency("get", 5.0)
2759
2760         output = ss.get_latencies()
2761
2762         self.failUnlessEqual(sorted(output.keys()),
2763                              sorted(["allocate", "renew", "cancel", "write", "get"]))
2764         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
2765         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
2766         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
2767         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
2768         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
2769         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
2770         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
2771         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
2772         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
2773
2774         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
2775         self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
2776         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1, output)
2777         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
2778         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
2779         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
2780         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
2781         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
2782         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
2783
2784         self.failUnlessEqual(len(ss.latencies["write"]), 20)
2785         self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
2786         self.failUnless(output["write"]["01_0_percentile"] is None, output)
2787         self.failUnless(abs(output["write"]["10_0_percentile"] -  2) < 1, output)
2788         self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
2789         self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
2790         self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
2791         self.failUnless(output["write"]["99_0_percentile"] is None, output)
2792         self.failUnless(output["write"]["99_9_percentile"] is None, output)
2793
2794         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
2795         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
2796         self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
2797         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1, output)
2798         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
2799         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
2800         self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
2801         self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
2802         self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
2803
2804         self.failUnlessEqual(len(ss.latencies["get"]), 1)
2805         self.failUnless(output["get"]["mean"] is None, output)
2806         self.failUnless(output["get"]["01_0_percentile"] is None, output)
2807         self.failUnless(output["get"]["10_0_percentile"] is None, output)
2808         self.failUnless(output["get"]["50_0_percentile"] is None, output)
2809         self.failUnless(output["get"]["90_0_percentile"] is None, output)
2810         self.failUnless(output["get"]["95_0_percentile"] is None, output)
2811         self.failUnless(output["get"]["99_0_percentile"] is None, output)
2812         self.failUnless(output["get"]["99_9_percentile"] is None, output)
2813
2814 def remove_tags(s):
2815     s = re.sub(r'<[^>]*>', ' ', s)
2816     s = re.sub(r'\s+', ' ', s)
2817     return s
2818
2819 class MyBucketCountingCrawler(BucketCountingCrawler):
2820     def finished_prefix(self, cycle, prefix):
2821         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
2822         if self.hook_ds:
2823             d = self.hook_ds.pop(0)
2824             d.callback(None)
2825
2826 class MyStorageServer(StorageServer):
2827     def add_bucket_counter(self):
2828         statefile = os.path.join(self.storedir, "bucket_counter.state")
2829         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
2830         self.bucket_counter.setServiceParent(self)
2831
2832 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
2833
2834     def setUp(self):
2835         self.s = service.MultiService()
2836         self.s.startService()
2837     def tearDown(self):
2838         return self.s.stopService()
2839
2840     def test_bucket_counter(self):
2841         basedir = "storage/BucketCounter/bucket_counter"
2842         fileutil.make_dirs(basedir)
2843         ss = StorageServer(basedir, "\x00" * 20)
2844         # to make sure we capture the bucket-counting-crawler in the middle
2845         # of a cycle, we reach in and reduce its maximum slice time to 0. We
2846         # also make it start sooner than usual.
2847         ss.bucket_counter.slow_start = 0
2848         orig_cpu_slice = ss.bucket_counter.cpu_slice
2849         ss.bucket_counter.cpu_slice = 0
2850         ss.setServiceParent(self.s)
2851
2852         w = StorageStatus(ss)
2853
2854         # this sample is before the crawler has started doing anything
2855         html = w.renderSynchronously()
2856         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2857         s = remove_tags(html)
2858         self.failUnlessIn("Accepting new shares: Yes", s)
2859         self.failUnlessIn("Reserved space: - 0 B (0)", s)
2860         self.failUnlessIn("Total buckets: Not computed yet", s)
2861         self.failUnlessIn("Next crawl in", s)
2862
2863         # give the bucket-counting-crawler one tick to get started. The
2864         # cpu_slice=0 will force it to yield right after it processes the
2865         # first prefix
2866
2867         d = fireEventually()
2868         def _check(ignored):
2869             # are we really right after the first prefix?
2870             state = ss.bucket_counter.get_state()
2871             if state["last-complete-prefix"] is None:
2872                 d2 = fireEventually()
2873                 d2.addCallback(_check)
2874                 return d2
2875             self.failUnlessEqual(state["last-complete-prefix"],
2876                                  ss.bucket_counter.prefixes[0])
2877             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2878             html = w.renderSynchronously()
2879             s = remove_tags(html)
2880             self.failUnlessIn(" Current crawl ", s)
2881             self.failUnlessIn(" (next work in ", s)
2882         d.addCallback(_check)
2883
2884         # now give it enough time to complete a full cycle
2885         def _watch():
2886             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2887         d.addCallback(lambda ignored: self.poll(_watch))
2888         def _check2(ignored):
2889             ss.bucket_counter.cpu_slice = orig_cpu_slice
2890             html = w.renderSynchronously()
2891             s = remove_tags(html)
2892             self.failUnlessIn("Total buckets: 0 (the number of", s)
2893             self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
2894         d.addCallback(_check2)
2895         return d
2896
2897     def test_bucket_counter_cleanup(self):
2898         basedir = "storage/BucketCounter/bucket_counter_cleanup"
2899         fileutil.make_dirs(basedir)
2900         ss = StorageServer(basedir, "\x00" * 20)
2901         # to make sure we capture the bucket-counting-crawler in the middle
2902         # of a cycle, we reach in and reduce its maximum slice time to 0.
2903         ss.bucket_counter.slow_start = 0
2904         orig_cpu_slice = ss.bucket_counter.cpu_slice
2905         ss.bucket_counter.cpu_slice = 0
2906         ss.setServiceParent(self.s)
2907
2908         d = fireEventually()
2909
2910         def _after_first_prefix(ignored):
2911             state = ss.bucket_counter.state
2912             if state["last-complete-prefix"] is None:
2913                 d2 = fireEventually()
2914                 d2.addCallback(_after_first_prefix)
2915                 return d2
2916             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2917             # now sneak in and mess with its state, to make sure it cleans up
2918             # properly at the end of the cycle
2919             self.failUnlessEqual(state["last-complete-prefix"],
2920                                  ss.bucket_counter.prefixes[0])
2921             state["bucket-counts"][-12] = {}
2922             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
2923             ss.bucket_counter.save_state()
2924         d.addCallback(_after_first_prefix)
2925
2926         # now give it enough time to complete a cycle
2927         def _watch():
2928             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2929         d.addCallback(lambda ignored: self.poll(_watch))
2930         def _check2(ignored):
2931             ss.bucket_counter.cpu_slice = orig_cpu_slice
2932             s = ss.bucket_counter.get_state()
2933             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
2934             self.failIf("bogusprefix!" in s["storage-index-samples"],
2935                         s["storage-index-samples"].keys())
2936         d.addCallback(_check2)
2937         return d
2938
2939     def test_bucket_counter_eta(self):
2940         basedir = "storage/BucketCounter/bucket_counter_eta"
2941         fileutil.make_dirs(basedir)
2942         ss = MyStorageServer(basedir, "\x00" * 20)
2943         ss.bucket_counter.slow_start = 0
2944         # these will be fired inside finished_prefix()
2945         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
2946         w = StorageStatus(ss)
2947
2948         d = defer.Deferred()
2949
2950         def _check_1(ignored):
2951             # no ETA is available yet
2952             html = w.renderSynchronously()
2953             s = remove_tags(html)
2954             self.failUnlessIn("complete (next work", s)
2955
2956         def _check_2(ignored):
2957             # one prefix has finished, so an ETA based upon that elapsed time
2958             # should be available.
2959             html = w.renderSynchronously()
2960             s = remove_tags(html)
2961             self.failUnlessIn("complete (ETA ", s)
2962
2963         def _check_3(ignored):
2964             # two prefixes have finished
2965             html = w.renderSynchronously()
2966             s = remove_tags(html)
2967             self.failUnlessIn("complete (ETA ", s)
2968             d.callback("done")
2969
2970         hooks[0].addCallback(_check_1).addErrback(d.errback)
2971         hooks[1].addCallback(_check_2).addErrback(d.errback)
2972         hooks[2].addCallback(_check_3).addErrback(d.errback)
2973
2974         ss.setServiceParent(self.s)
2975         return d
2976
2977 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
2978     stop_after_first_bucket = False
2979     def process_bucket(self, *args, **kwargs):
2980         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
2981         if self.stop_after_first_bucket:
2982             self.stop_after_first_bucket = False
2983             self.cpu_slice = -1.0
2984     def yielding(self, sleep_time):
2985         if not self.stop_after_first_bucket:
2986             self.cpu_slice = 500
2987
2988 class BrokenStatResults:
2989     pass
2990 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
2991     def stat(self, fn):
2992         s = os.stat(fn)
2993         bsr = BrokenStatResults()
2994         for attrname in dir(s):
2995             if attrname.startswith("_"):
2996                 continue
2997             if attrname == "st_blocks":
2998                 continue
2999             setattr(bsr, attrname, getattr(s, attrname))
3000         return bsr
3001
3002 class InstrumentedStorageServer(StorageServer):
3003     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
3004 class No_ST_BLOCKS_StorageServer(StorageServer):
3005     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
3006
3007 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3008
3009     def setUp(self):
3010         self.s = service.MultiService()
3011         self.s.startService()
3012     def tearDown(self):
3013         return self.s.stopService()
3014
3015     def make_shares(self, ss):
3016         def make(si):
3017             return (si, hashutil.tagged_hash("renew", si),
3018                     hashutil.tagged_hash("cancel", si))
3019         def make_mutable(si):
3020             return (si, hashutil.tagged_hash("renew", si),
3021                     hashutil.tagged_hash("cancel", si),
3022                     hashutil.tagged_hash("write-enabler", si))
3023         def make_extra_lease(si, num):
3024             return (hashutil.tagged_hash("renew-%d" % num, si),
3025                     hashutil.tagged_hash("cancel-%d" % num, si))
3026
3027         immutable_si_0, rs0, cs0 = make("\x00" * 16)
3028         immutable_si_1, rs1, cs1 = make("\x01" * 16)
3029         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
3030         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
3031         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
3032         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
3033         sharenums = [0]
3034         canary = FakeCanary()
3035         # note: 'tahoe debug dump-share' will not handle this file, since the
3036         # inner contents are not a valid CHK share
3037         data = "\xff" * 1000
3038
3039         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
3040                                          1000, canary)
3041         w[0].remote_write(0, data)
3042         w[0].remote_close()
3043
3044         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
3045                                          1000, canary)
3046         w[0].remote_write(0, data)
3047         w[0].remote_close()
3048         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
3049
3050         writev = ss.remote_slot_testv_and_readv_and_writev
3051         writev(mutable_si_2, (we2, rs2, cs2),
3052                {0: ([], [(0,data)], len(data))}, [])
3053         writev(mutable_si_3, (we3, rs3, cs3),
3054                {0: ([], [(0,data)], len(data))}, [])
3055         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
3056
3057         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
3058         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
3059         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
3060
3061     def test_basic(self):
3062         basedir = "storage/LeaseCrawler/basic"
3063         fileutil.make_dirs(basedir)
3064         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3065         # make it start sooner than usual.
3066         lc = ss.lease_checker
3067         lc.slow_start = 0
3068         lc.cpu_slice = 500
3069         lc.stop_after_first_bucket = True
3070         webstatus = StorageStatus(ss)
3071
3072         # create a few shares, with some leases on them
3073         self.make_shares(ss)
3074         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3075
3076         # add a non-sharefile to exercise another code path
3077         fn = os.path.join(ss.sharedir,
3078                           storage_index_to_dir(immutable_si_0),
3079                           "not-a-share")
3080         f = open(fn, "wb")
3081         f.write("I am not a share.\n")
3082         f.close()
3083
3084         # this is before the crawl has started, so we're not in a cycle yet
3085         initial_state = lc.get_state()
3086         self.failIf(lc.get_progress()["cycle-in-progress"])
3087         self.failIfIn("cycle-to-date", initial_state)
3088         self.failIfIn("estimated-remaining-cycle", initial_state)
3089         self.failIfIn("estimated-current-cycle", initial_state)
3090         self.failUnlessIn("history", initial_state)
3091         self.failUnlessEqual(initial_state["history"], {})
3092
3093         ss.setServiceParent(self.s)
3094
3095         DAY = 24*60*60
3096
3097         d = fireEventually()
3098
3099         # now examine the state right after the first bucket has been
3100         # processed.
3101         def _after_first_bucket(ignored):
3102             initial_state = lc.get_state()
3103             if "cycle-to-date" not in initial_state:
3104                 d2 = fireEventually()
3105                 d2.addCallback(_after_first_bucket)
3106                 return d2
3107             self.failUnlessIn("cycle-to-date", initial_state)
3108             self.failUnlessIn("estimated-remaining-cycle", initial_state)
3109             self.failUnlessIn("estimated-current-cycle", initial_state)
3110             self.failUnlessIn("history", initial_state)
3111             self.failUnlessEqual(initial_state["history"], {})
3112
3113             so_far = initial_state["cycle-to-date"]
3114             self.failUnlessEqual(so_far["expiration-enabled"], False)
3115             self.failUnlessIn("configured-expiration-mode", so_far)
3116             self.failUnlessIn("lease-age-histogram", so_far)
3117             lah = so_far["lease-age-histogram"]
3118             self.failUnlessEqual(type(lah), list)
3119             self.failUnlessEqual(len(lah), 1)
3120             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
3121             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
3122             self.failUnlessEqual(so_far["corrupt-shares"], [])
3123             sr1 = so_far["space-recovered"]
3124             self.failUnlessEqual(sr1["examined-buckets"], 1)
3125             self.failUnlessEqual(sr1["examined-shares"], 1)
3126             self.failUnlessEqual(sr1["actual-shares"], 0)
3127             self.failUnlessEqual(sr1["configured-diskbytes"], 0)
3128             self.failUnlessEqual(sr1["original-sharebytes"], 0)
3129             left = initial_state["estimated-remaining-cycle"]
3130             sr2 = left["space-recovered"]
3131             self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
3132             self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
3133             self.failIfEqual(sr2["actual-shares"], None)
3134             self.failIfEqual(sr2["configured-diskbytes"], None)
3135             self.failIfEqual(sr2["original-sharebytes"], None)
3136         d.addCallback(_after_first_bucket)
3137         d.addCallback(lambda ign: self.render1(webstatus))
3138         def _check_html_in_cycle(html):
3139             s = remove_tags(html)
3140             self.failUnlessIn("So far, this cycle has examined "
3141                               "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
3142             self.failUnlessIn("and has recovered: "
3143                               "0 shares, 0 buckets (0 mutable / 0 immutable), "
3144                               "0 B (0 B / 0 B)", s)
3145             self.failUnlessIn("If expiration were enabled, "
3146                               "we would have recovered: "
3147                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3148                               " 0 B (0 B / 0 B) by now", s)
3149             self.failUnlessIn("and the remainder of this cycle "
3150                               "would probably recover: "
3151                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3152                               " 0 B (0 B / 0 B)", s)
3153             self.failUnlessIn("and the whole cycle would probably recover: "
3154                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3155                               " 0 B (0 B / 0 B)", s)
3156             self.failUnlessIn("if we were strictly using each lease's default "
3157                               "31-day lease lifetime", s)
3158             self.failUnlessIn("this cycle would be expected to recover: ", s)
3159         d.addCallback(_check_html_in_cycle)
3160
3161         # wait for the crawler to finish the first cycle. Nothing should have
3162         # been removed.
3163         def _wait():
3164             return bool(lc.get_state()["last-cycle-finished"] is not None)
3165         d.addCallback(lambda ign: self.poll(_wait))
3166
3167         def _after_first_cycle(ignored):
3168             s = lc.get_state()
3169             self.failIf("cycle-to-date" in s)
3170             self.failIf("estimated-remaining-cycle" in s)
3171             self.failIf("estimated-current-cycle" in s)
3172             last = s["history"][0]
3173             self.failUnlessIn("cycle-start-finish-times", last)
3174             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
3175             self.failUnlessEqual(last["expiration-enabled"], False)
3176             self.failUnlessIn("configured-expiration-mode", last)
3177
3178             self.failUnlessIn("lease-age-histogram", last)
3179             lah = last["lease-age-histogram"]
3180             self.failUnlessEqual(type(lah), list)
3181             self.failUnlessEqual(len(lah), 1)
3182             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
3183
3184             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3185             self.failUnlessEqual(last["corrupt-shares"], [])
3186
3187             rec = last["space-recovered"]
3188             self.failUnlessEqual(rec["examined-buckets"], 4)
3189             self.failUnlessEqual(rec["examined-shares"], 4)
3190             self.failUnlessEqual(rec["actual-buckets"], 0)
3191             self.failUnlessEqual(rec["original-buckets"], 0)
3192             self.failUnlessEqual(rec["configured-buckets"], 0)
3193             self.failUnlessEqual(rec["actual-shares"], 0)
3194             self.failUnlessEqual(rec["original-shares"], 0)
3195             self.failUnlessEqual(rec["configured-shares"], 0)
3196             self.failUnlessEqual(rec["actual-diskbytes"], 0)
3197             self.failUnlessEqual(rec["original-diskbytes"], 0)
3198             self.failUnlessEqual(rec["configured-diskbytes"], 0)
3199             self.failUnlessEqual(rec["actual-sharebytes"], 0)
3200             self.failUnlessEqual(rec["original-sharebytes"], 0)
3201             self.failUnlessEqual(rec["configured-sharebytes"], 0)
3202
3203             def _get_sharefile(si):
3204                 return list(ss._iter_share_files(si))[0]
3205             def count_leases(si):
3206                 return len(list(_get_sharefile(si).get_leases()))
3207             self.failUnlessEqual(count_leases(immutable_si_0), 1)
3208             self.failUnlessEqual(count_leases(immutable_si_1), 2)
3209             self.failUnlessEqual(count_leases(mutable_si_2), 1)
3210             self.failUnlessEqual(count_leases(mutable_si_3), 2)
3211         d.addCallback(_after_first_cycle)
3212         d.addCallback(lambda ign: self.render1(webstatus))
3213         def _check_html(html):
3214             s = remove_tags(html)
3215             self.failUnlessIn("recovered: 0 shares, 0 buckets "
3216                               "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
3217             self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
3218                               "(2 mutable / 2 immutable),", s)
3219             self.failUnlessIn("but expiration was not enabled", s)
3220         d.addCallback(_check_html)
3221         d.addCallback(lambda ign: self.render_json(webstatus))
3222         def _check_json(json):
3223             data = simplejson.loads(json)
3224             self.failUnlessIn("lease-checker", data)
3225             self.failUnlessIn("lease-checker-progress", data)
3226         d.addCallback(_check_json)
3227         return d
3228
3229     def backdate_lease(self, sf, renew_secret, new_expire_time):
3230         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
3231         # "renew" a lease with a new_expire_time that is older than what the
3232         # current lease has), so we have to reach inside it.
3233         for i,lease in enumerate(sf.get_leases()):
3234             if lease.renew_secret == renew_secret:
3235                 lease.expiration_time = new_expire_time
3236                 f = open(sf.home, 'rb+')
3237                 sf._write_lease_record(f, i, lease)
3238                 f.close()
3239                 return
3240         raise IndexError("unable to renew non-existent lease")
3241
3242     def test_expire_age(self):
3243         basedir = "storage/LeaseCrawler/expire_age"
3244         fileutil.make_dirs(basedir)
3245         # setting expiration_time to 2000 means that any lease which is more
3246         # than 2000s old will be expired.
3247         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3248                                        expiration_enabled=True,
3249                                        expiration_mode="age",
3250                                        expiration_override_lease_duration=2000)
3251         # make it start sooner than usual.
3252         lc = ss.lease_checker
3253         lc.slow_start = 0
3254         lc.stop_after_first_bucket = True
3255         webstatus = StorageStatus(ss)
3256
3257         # create a few shares, with some leases on them
3258         self.make_shares(ss)
3259         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3260
3261         def count_shares(si):
3262             return len(list(ss._iter_share_files(si)))
3263         def _get_sharefile(si):
3264             return list(ss._iter_share_files(si))[0]
3265         def count_leases(si):
3266             return len(list(_get_sharefile(si).get_leases()))
3267
3268         self.failUnlessEqual(count_shares(immutable_si_0), 1)
3269         self.failUnlessEqual(count_leases(immutable_si_0), 1)
3270         self.failUnlessEqual(count_shares(immutable_si_1), 1)
3271         self.failUnlessEqual(count_leases(immutable_si_1), 2)
3272         self.failUnlessEqual(count_shares(mutable_si_2), 1)
3273         self.failUnlessEqual(count_leases(mutable_si_2), 1)
3274         self.failUnlessEqual(count_shares(mutable_si_3), 1)
3275         self.failUnlessEqual(count_leases(mutable_si_3), 2)
3276
3277         # artificially crank back the expiration time on the first lease of
3278         # each share, to make it look like it expired already (age=1000s).
3279         # Some shares have an extra lease which is set to expire at the
3280         # default time in 31 days from now (age=31days). We then run the
3281         # crawler, which will expire the first lease, making some shares get
3282         # deleted and others stay alive (with one remaining lease)
3283         now = time.time()
3284
3285         sf0 = _get_sharefile(immutable_si_0)
3286         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
3287         sf0_size = os.stat(sf0.home).st_size
3288
3289         # immutable_si_1 gets an extra lease
3290         sf1 = _get_sharefile(immutable_si_1)
3291         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
3292
3293         sf2 = _get_sharefile(mutable_si_2)
3294         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
3295         sf2_size = os.stat(sf2.home).st_size
3296
3297         # mutable_si_3 gets an extra lease
3298         sf3 = _get_sharefile(mutable_si_3)
3299         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
3300
3301         ss.setServiceParent(self.s)
3302
3303         d = fireEventually()
3304         # examine the state right after the first bucket has been processed
3305         def _after_first_bucket(ignored):
3306             p = lc.get_progress()
3307             if not p["cycle-in-progress"]:
3308                 d2 = fireEventually()
3309                 d2.addCallback(_after_first_bucket)
3310                 return d2
3311         d.addCallback(_after_first_bucket)
3312         d.addCallback(lambda ign: self.render1(webstatus))
3313         def _check_html_in_cycle(html):
3314             s = remove_tags(html)
3315             # the first bucket encountered gets deleted, and its prefix
3316             # happens to be about 1/5th of the way through the ring, so the
3317             # predictor thinks we'll have 5 shares and that we'll delete them
3318             # all. This part of the test depends upon the SIs landing right
3319             # where they do now.
3320             self.failUnlessIn("The remainder of this cycle is expected to "
3321                               "recover: 4 shares, 4 buckets", s)
3322             self.failUnlessIn("The whole cycle is expected to examine "
3323                               "5 shares in 5 buckets and to recover: "
3324                               "5 shares, 5 buckets", s)
3325         d.addCallback(_check_html_in_cycle)
3326
3327         # wait for the crawler to finish the first cycle. Two shares should
3328         # have been removed
3329         def _wait():
3330             return bool(lc.get_state()["last-cycle-finished"] is not None)
3331         d.addCallback(lambda ign: self.poll(_wait))
3332
3333         def _after_first_cycle(ignored):
3334             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3335             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3336             self.failUnlessEqual(count_leases(immutable_si_1), 1)
3337             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3338             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3339             self.failUnlessEqual(count_leases(mutable_si_3), 1)
3340
3341             s = lc.get_state()
3342             last = s["history"][0]
3343
3344             self.failUnlessEqual(last["expiration-enabled"], True)
3345             self.failUnlessEqual(last["configured-expiration-mode"],
3346                                  ("age", 2000, None, ("mutable", "immutable")))
3347             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3348
3349             rec = last["space-recovered"]
3350             self.failUnlessEqual(rec["examined-buckets"], 4)
3351             self.failUnlessEqual(rec["examined-shares"], 4)
3352             self.failUnlessEqual(rec["actual-buckets"], 2)
3353             self.failUnlessEqual(rec["original-buckets"], 2)
3354             self.failUnlessEqual(rec["configured-buckets"], 2)
3355             self.failUnlessEqual(rec["actual-shares"], 2)
3356             self.failUnlessEqual(rec["original-shares"], 2)
3357             self.failUnlessEqual(rec["configured-shares"], 2)
3358             size = sf0_size + sf2_size
3359             self.failUnlessEqual(rec["actual-sharebytes"], size)
3360             self.failUnlessEqual(rec["original-sharebytes"], size)
3361             self.failUnlessEqual(rec["configured-sharebytes"], size)
3362             # different platforms have different notions of "blocks used by
3363             # this file", so merely assert that it's a number
3364             self.failUnless(rec["actual-diskbytes"] >= 0,
3365                             rec["actual-diskbytes"])
3366             self.failUnless(rec["original-diskbytes"] >= 0,
3367                             rec["original-diskbytes"])
3368             self.failUnless(rec["configured-diskbytes"] >= 0,
3369                             rec["configured-diskbytes"])
3370         d.addCallback(_after_first_cycle)
3371         d.addCallback(lambda ign: self.render1(webstatus))
3372         def _check_html(html):
3373             s = remove_tags(html)
3374             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
3375             self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
3376             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3377         d.addCallback(_check_html)
3378         return d
3379
3380     def test_expire_cutoff_date(self):
3381         basedir = "storage/LeaseCrawler/expire_cutoff_date"
3382         fileutil.make_dirs(basedir)
3383         # setting cutoff-date to 2000 seconds ago means that any lease which
3384         # is more than 2000s old will be expired.
3385         now = time.time()
3386         then = int(now - 2000)
3387         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3388                                        expiration_enabled=True,
3389                                        expiration_mode="cutoff-date",
3390                                        expiration_cutoff_date=then)
3391         # make it start sooner than usual.
3392         lc = ss.lease_checker
3393         lc.slow_start = 0
3394         lc.stop_after_first_bucket = True
3395         webstatus = StorageStatus(ss)
3396
3397         # create a few shares, with some leases on them
3398         self.make_shares(ss)
3399         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3400
3401         def count_shares(si):
3402             return len(list(ss._iter_share_files(si)))
3403         def _get_sharefile(si):
3404             return list(ss._iter_share_files(si))[0]
3405         def count_leases(si):
3406             return len(list(_get_sharefile(si).get_leases()))
3407
3408         self.failUnlessEqual(count_shares(immutable_si_0), 1)
3409         self.failUnlessEqual(count_leases(immutable_si_0), 1)
3410         self.failUnlessEqual(count_shares(immutable_si_1), 1)
3411         self.failUnlessEqual(count_leases(immutable_si_1), 2)
3412         self.failUnlessEqual(count_shares(mutable_si_2), 1)
3413         self.failUnlessEqual(count_leases(mutable_si_2), 1)
3414         self.failUnlessEqual(count_shares(mutable_si_3), 1)
3415         self.failUnlessEqual(count_leases(mutable_si_3), 2)
3416
3417         # artificially crank back the expiration time on the first lease of
3418         # each share, to make it look like was renewed 3000s ago. To achieve
3419         # this, we need to set the expiration time to now-3000+31days. This
3420         # will change when the lease format is improved to contain both
3421         # create/renew time and duration.
3422         new_expiration_time = now - 3000 + 31*24*60*60
3423
3424         # Some shares have an extra lease which is set to expire at the
3425         # default time in 31 days from now (age=31days). We then run the
3426         # crawler, which will expire the first lease, making some shares get
3427         # deleted and others stay alive (with one remaining lease)
3428
3429         sf0 = _get_sharefile(immutable_si_0)
3430         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3431         sf0_size = os.stat(sf0.home).st_size
3432
3433         # immutable_si_1 gets an extra lease
3434         sf1 = _get_sharefile(immutable_si_1)
3435         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3436
3437         sf2 = _get_sharefile(mutable_si_2)
3438         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3439         sf2_size = os.stat(sf2.home).st_size
3440
3441         # mutable_si_3 gets an extra lease
3442         sf3 = _get_sharefile(mutable_si_3)
3443         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3444
3445         ss.setServiceParent(self.s)
3446
3447         d = fireEventually()
3448         # examine the state right after the first bucket has been processed
3449         def _after_first_bucket(ignored):
3450             p = lc.get_progress()
3451             if not p["cycle-in-progress"]:
3452                 d2 = fireEventually()
3453                 d2.addCallback(_after_first_bucket)
3454                 return d2
3455         d.addCallback(_after_first_bucket)
3456         d.addCallback(lambda ign: self.render1(webstatus))
3457         def _check_html_in_cycle(html):
3458             s = remove_tags(html)
3459             # the first bucket encountered gets deleted, and its prefix
3460             # happens to be about 1/5th of the way through the ring, so the
3461             # predictor thinks we'll have 5 shares and that we'll delete them
3462             # all. This part of the test depends upon the SIs landing right
3463             # where they do now.
3464             self.failUnlessIn("The remainder of this cycle is expected to "
3465                               "recover: 4 shares, 4 buckets", s)
3466             self.failUnlessIn("The whole cycle is expected to examine "
3467                               "5 shares in 5 buckets and to recover: "
3468                               "5 shares, 5 buckets", s)
3469         d.addCallback(_check_html_in_cycle)
3470
3471         # wait for the crawler to finish the first cycle. Two shares should
3472         # have been removed
3473         def _wait():
3474             return bool(lc.get_state()["last-cycle-finished"] is not None)
3475         d.addCallback(lambda ign: self.poll(_wait))
3476
3477         def _after_first_cycle(ignored):
3478             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3479             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3480             self.failUnlessEqual(count_leases(immutable_si_1), 1)
3481             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3482             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3483             self.failUnlessEqual(count_leases(mutable_si_3), 1)
3484
3485             s = lc.get_state()
3486             last = s["history"][0]
3487
3488             self.failUnlessEqual(last["expiration-enabled"], True)
3489             self.failUnlessEqual(last["configured-expiration-mode"],
3490                                  ("cutoff-date", None, then,
3491                                   ("mutable", "immutable")))
3492             self.failUnlessEqual(last["leases-per-share-histogram"],
3493                                  {1: 2, 2: 2})
3494
3495             rec = last["space-recovered"]
3496             self.failUnlessEqual(rec["examined-buckets"], 4)
3497             self.failUnlessEqual(rec["examined-shares"], 4)
3498             self.failUnlessEqual(rec["actual-buckets"], 2)
3499             self.failUnlessEqual(rec["original-buckets"], 0)
3500             self.failUnlessEqual(rec["configured-buckets"], 2)
3501             self.failUnlessEqual(rec["actual-shares"], 2)
3502             self.failUnlessEqual(rec["original-shares"], 0)
3503             self.failUnlessEqual(rec["configured-shares"], 2)
3504             size = sf0_size + sf2_size
3505             self.failUnlessEqual(rec["actual-sharebytes"], size)
3506             self.failUnlessEqual(rec["original-sharebytes"], 0)
3507             self.failUnlessEqual(rec["configured-sharebytes"], size)
3508             # different platforms have different notions of "blocks used by
3509             # this file", so merely assert that it's a number
3510             self.failUnless(rec["actual-diskbytes"] >= 0,
3511                             rec["actual-diskbytes"])
3512             self.failUnless(rec["original-diskbytes"] >= 0,
3513                             rec["original-diskbytes"])
3514             self.failUnless(rec["configured-diskbytes"] >= 0,
3515                             rec["configured-diskbytes"])
3516         d.addCallback(_after_first_cycle)
3517         d.addCallback(lambda ign: self.render1(webstatus))
3518         def _check_html(html):
3519             s = remove_tags(html)
3520             self.failUnlessIn("Expiration Enabled:"
3521                               " expired leases will be removed", s)
3522             date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
3523             substr = "Leases created or last renewed before %s will be considered expired." % date
3524             self.failUnlessIn(substr, s)
3525             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3526         d.addCallback(_check_html)
3527         return d
3528
3529     def test_only_immutable(self):
3530         basedir = "storage/LeaseCrawler/only_immutable"
3531         fileutil.make_dirs(basedir)
3532         now = time.time()
3533         then = int(now - 2000)
3534         ss = StorageServer(basedir, "\x00" * 20,
3535                            expiration_enabled=True,
3536                            expiration_mode="cutoff-date",
3537                            expiration_cutoff_date=then,
3538                            expiration_sharetypes=("immutable",))
3539         lc = ss.lease_checker
3540         lc.slow_start = 0
3541         webstatus = StorageStatus(ss)
3542
3543         self.make_shares(ss)
3544         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3545         # set all leases to be expirable
3546         new_expiration_time = now - 3000 + 31*24*60*60
3547
3548         def count_shares(si):
3549             return len(list(ss._iter_share_files(si)))
3550         def _get_sharefile(si):
3551             return list(ss._iter_share_files(si))[0]
3552         def count_leases(si):
3553             return len(list(_get_sharefile(si).get_leases()))
3554
3555         sf0 = _get_sharefile(immutable_si_0)
3556         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3557         sf1 = _get_sharefile(immutable_si_1)
3558         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3559         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3560         sf2 = _get_sharefile(mutable_si_2)
3561         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3562         sf3 = _get_sharefile(mutable_si_3)
3563         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3564         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3565
3566         ss.setServiceParent(self.s)
3567         def _wait():
3568             return bool(lc.get_state()["last-cycle-finished"] is not None)
3569         d = self.poll(_wait)
3570
3571         def _after_first_cycle(ignored):
3572             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3573             self.failUnlessEqual(count_shares(immutable_si_1), 0)
3574             self.failUnlessEqual(count_shares(mutable_si_2), 1)
3575             self.failUnlessEqual(count_leases(mutable_si_2), 1)
3576             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3577             self.failUnlessEqual(count_leases(mutable_si_3), 2)
3578         d.addCallback(_after_first_cycle)
3579         d.addCallback(lambda ign: self.render1(webstatus))
3580         def _check_html(html):
3581             s = remove_tags(html)
3582             self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
3583         d.addCallback(_check_html)
3584         return d
3585
3586     def test_only_mutable(self):
3587         basedir = "storage/LeaseCrawler/only_mutable"
3588         fileutil.make_dirs(basedir)
3589         now = time.time()
3590         then = int(now - 2000)
3591         ss = StorageServer(basedir, "\x00" * 20,
3592                            expiration_enabled=True,
3593                            expiration_mode="cutoff-date",
3594                            expiration_cutoff_date=then,
3595                            expiration_sharetypes=("mutable",))
3596         lc = ss.lease_checker
3597         lc.slow_start = 0
3598         webstatus = StorageStatus(ss)
3599
3600         self.make_shares(ss)
3601         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3602         # set all leases to be expirable
3603         new_expiration_time = now - 3000 + 31*24*60*60
3604
3605         def count_shares(si):
3606             return len(list(ss._iter_share_files(si)))
3607         def _get_sharefile(si):
3608             return list(ss._iter_share_files(si))[0]
3609         def count_leases(si):
3610             return len(list(_get_sharefile(si).get_leases()))
3611
3612         sf0 = _get_sharefile(immutable_si_0)
3613         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3614         sf1 = _get_sharefile(immutable_si_1)
3615         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3616         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3617         sf2 = _get_sharefile(mutable_si_2)
3618         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3619         sf3 = _get_sharefile(mutable_si_3)
3620         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3621         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3622
3623         ss.setServiceParent(self.s)
3624         def _wait():
3625             return bool(lc.get_state()["last-cycle-finished"] is not None)
3626         d = self.poll(_wait)
3627
3628         def _after_first_cycle(ignored):
3629             self.failUnlessEqual(count_shares(immutable_si_0), 1)
3630             self.failUnlessEqual(count_leases(immutable_si_0), 1)
3631             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3632             self.failUnlessEqual(count_leases(immutable_si_1), 2)
3633             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3634             self.failUnlessEqual(count_shares(mutable_si_3), 0)
3635         d.addCallback(_after_first_cycle)
3636         d.addCallback(lambda ign: self.render1(webstatus))
3637         def _check_html(html):
3638             s = remove_tags(html)
3639             self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
3640         d.addCallback(_check_html)
3641         return d
3642
3643     def test_bad_mode(self):
3644         basedir = "storage/LeaseCrawler/bad_mode"
3645         fileutil.make_dirs(basedir)
3646         e = self.failUnlessRaises(ValueError,
3647                                   StorageServer, basedir, "\x00" * 20,
3648                                   expiration_mode="bogus")
3649         self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
3650
3651     def test_parse_duration(self):
3652         DAY = 24*60*60
3653         MONTH = 31*DAY
3654         YEAR = 365*DAY
3655         p = time_format.parse_duration
3656         self.failUnlessEqual(p("7days"), 7*DAY)
3657         self.failUnlessEqual(p("31day"), 31*DAY)
3658         self.failUnlessEqual(p("60 days"), 60*DAY)
3659         self.failUnlessEqual(p("2mo"), 2*MONTH)
3660         self.failUnlessEqual(p("3 month"), 3*MONTH)
3661         self.failUnlessEqual(p("2years"), 2*YEAR)
3662         e = self.failUnlessRaises(ValueError, p, "2kumquats")
3663         self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
3664
3665     def test_parse_date(self):
3666         p = time_format.parse_date
3667         self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
3668         self.failUnlessEqual(p("2009-03-18"), 1237334400)
3669
3670     def test_limited_history(self):
3671         basedir = "storage/LeaseCrawler/limited_history"
3672         fileutil.make_dirs(basedir)
3673         ss = StorageServer(basedir, "\x00" * 20)
3674         # make it start sooner than usual.
3675         lc = ss.lease_checker
3676         lc.slow_start = 0
3677         lc.cpu_slice = 500
3678
3679         # create a few shares, with some leases on them
3680         self.make_shares(ss)
3681
3682         ss.setServiceParent(self.s)
3683
3684         def _wait_until_15_cycles_done():
3685             last = lc.state["last-cycle-finished"]
3686             if last is not None and last >= 15:
3687                 return True
3688             if lc.timer:
3689                 lc.timer.reset(0)
3690             return False
3691         d = self.poll(_wait_until_15_cycles_done)
3692
3693         def _check(ignored):
3694             s = lc.get_state()
3695             h = s["history"]
3696             self.failUnlessEqual(len(h), 10)
3697             self.failUnlessEqual(max(h.keys()), 15)
3698             self.failUnlessEqual(min(h.keys()), 6)
3699         d.addCallback(_check)
3700         return d
3701
3702     def test_unpredictable_future(self):
3703         basedir = "storage/LeaseCrawler/unpredictable_future"
3704         fileutil.make_dirs(basedir)
3705         ss = StorageServer(basedir, "\x00" * 20)
3706         # make it start sooner than usual.
3707         lc = ss.lease_checker
3708         lc.slow_start = 0
3709         lc.cpu_slice = -1.0 # stop quickly
3710
3711         self.make_shares(ss)
3712
3713         ss.setServiceParent(self.s)
3714
3715         d = fireEventually()
3716         def _check(ignored):
3717             # this should fire after the first bucket is complete, but before
3718             # the first prefix is complete, so the progress-measurer won't
3719             # think we've gotten far enough to raise our percent-complete
3720             # above 0%, triggering the cannot-predict-the-future code in
3721             # expirer.py . This will have to change if/when the
3722             # progress-measurer gets smart enough to count buckets (we'll
3723             # have to interrupt it even earlier, before it's finished the
3724             # first bucket).
3725             s = lc.get_state()
3726             if "cycle-to-date" not in s:
3727                 d2 = fireEventually()
3728                 d2.addCallback(_check)
3729                 return d2
3730             self.failUnlessIn("cycle-to-date", s)
3731             self.failUnlessIn("estimated-remaining-cycle", s)
3732             self.failUnlessIn("estimated-current-cycle", s)
3733
3734             left = s["estimated-remaining-cycle"]["space-recovered"]
3735             self.failUnlessEqual(left["actual-buckets"], None)
3736             self.failUnlessEqual(left["original-buckets"], None)
3737             self.failUnlessEqual(left["configured-buckets"], None)
3738             self.failUnlessEqual(left["actual-shares"], None)
3739             self.failUnlessEqual(left["original-shares"], None)
3740             self.failUnlessEqual(left["configured-shares"], None)
3741             self.failUnlessEqual(left["actual-diskbytes"], None)
3742             self.failUnlessEqual(left["original-diskbytes"], None)
3743             self.failUnlessEqual(left["configured-diskbytes"], None)
3744             self.failUnlessEqual(left["actual-sharebytes"], None)
3745             self.failUnlessEqual(left["original-sharebytes"], None)
3746             self.failUnlessEqual(left["configured-sharebytes"], None)
3747
3748             full = s["estimated-remaining-cycle"]["space-recovered"]
3749             self.failUnlessEqual(full["actual-buckets"], None)
3750             self.failUnlessEqual(full["original-buckets"], None)
3751             self.failUnlessEqual(full["configured-buckets"], None)
3752             self.failUnlessEqual(full["actual-shares"], None)
3753             self.failUnlessEqual(full["original-shares"], None)
3754             self.failUnlessEqual(full["configured-shares"], None)
3755             self.failUnlessEqual(full["actual-diskbytes"], None)
3756             self.failUnlessEqual(full["original-diskbytes"], None)
3757             self.failUnlessEqual(full["configured-diskbytes"], None)
3758             self.failUnlessEqual(full["actual-sharebytes"], None)
3759             self.failUnlessEqual(full["original-sharebytes"], None)
3760             self.failUnlessEqual(full["configured-sharebytes"], None)
3761
3762         d.addCallback(_check)
3763         return d
3764
3765     def test_no_st_blocks(self):
3766         basedir = "storage/LeaseCrawler/no_st_blocks"
3767         fileutil.make_dirs(basedir)
3768         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
3769                                         expiration_mode="age",
3770                                         expiration_override_lease_duration=-1000)
3771         # a negative expiration_time= means the "configured-"
3772         # space-recovered counts will be non-zero, since all shares will have
3773         # expired by then
3774
3775         # make it start sooner than usual.
3776         lc = ss.lease_checker
3777         lc.slow_start = 0
3778
3779         self.make_shares(ss)
3780         ss.setServiceParent(self.s)
3781         def _wait():
3782             return bool(lc.get_state()["last-cycle-finished"] is not None)
3783         d = self.poll(_wait)
3784
3785         def _check(ignored):
3786             s = lc.get_state()
3787             last = s["history"][0]
3788             rec = last["space-recovered"]
3789             self.failUnlessEqual(rec["configured-buckets"], 4)
3790             self.failUnlessEqual(rec["configured-shares"], 4)
3791             self.failUnless(rec["configured-sharebytes"] > 0,
3792                             rec["configured-sharebytes"])
3793             # without the .st_blocks field in os.stat() results, we should be
3794             # reporting diskbytes==sharebytes
3795             self.failUnlessEqual(rec["configured-sharebytes"],
3796                                  rec["configured-diskbytes"])
3797         d.addCallback(_check)
3798         return d
3799
3800     def test_share_corruption(self):
3801         self._poll_should_ignore_these_errors = [
3802             UnknownMutableContainerVersionError,
3803             UnknownImmutableContainerVersionError,
3804             ]
3805         basedir = "storage/LeaseCrawler/share_corruption"
3806         fileutil.make_dirs(basedir)
3807         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3808         w = StorageStatus(ss)
3809         # make it start sooner than usual.
3810         lc = ss.lease_checker
3811         lc.stop_after_first_bucket = True
3812         lc.slow_start = 0
3813         lc.cpu_slice = 500
3814
3815         # create a few shares, with some leases on them
3816         self.make_shares(ss)
3817
3818         # now corrupt one, and make sure the lease-checker keeps going
3819         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3820         first = min(self.sis)
3821         first_b32 = base32.b2a(first)
3822         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
3823         f = open(fn, "rb+")
3824         f.seek(0)
3825         f.write("BAD MAGIC")
3826         f.close()
3827         # if get_share_file() doesn't see the correct mutable magic, it
3828         # assumes the file is an immutable share, and then
3829         # immutable.ShareFile sees a bad version. So regardless of which kind
3830         # of share we corrupted, this will trigger an
3831         # UnknownImmutableContainerVersionError.
3832
3833         # also create an empty bucket
3834         empty_si = base32.b2a("\x04"*16)
3835         empty_bucket_dir = os.path.join(ss.sharedir,
3836                                         storage_index_to_dir(empty_si))
3837         fileutil.make_dirs(empty_bucket_dir)
3838
3839         ss.setServiceParent(self.s)
3840
3841         d = fireEventually()
3842
3843         # now examine the state right after the first bucket has been
3844         # processed.
3845         def _after_first_bucket(ignored):
3846             s = lc.get_state()
3847             if "cycle-to-date" not in s:
3848                 d2 = fireEventually()
3849                 d2.addCallback(_after_first_bucket)
3850                 return d2
3851             so_far = s["cycle-to-date"]
3852             rec = so_far["space-recovered"]
3853             self.failUnlessEqual(rec["examined-buckets"], 1)
3854             self.failUnlessEqual(rec["examined-shares"], 0)
3855             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
3856         d.addCallback(_after_first_bucket)
3857
3858         d.addCallback(lambda ign: self.render_json(w))
3859         def _check_json(json):
3860             data = simplejson.loads(json)
3861             # grr. json turns all dict keys into strings.
3862             so_far = data["lease-checker"]["cycle-to-date"]
3863             corrupt_shares = so_far["corrupt-shares"]
3864             # it also turns all tuples into lists
3865             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3866         d.addCallback(_check_json)
3867         d.addCallback(lambda ign: self.render1(w))
3868         def _check_html(html):
3869             s = remove_tags(html)
3870             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3871         d.addCallback(_check_html)
3872
3873         def _wait():
3874             return bool(lc.get_state()["last-cycle-finished"] is not None)
3875         d.addCallback(lambda ign: self.poll(_wait))
3876
3877         def _after_first_cycle(ignored):
3878             s = lc.get_state()
3879             last = s["history"][0]
3880             rec = last["space-recovered"]
3881             self.failUnlessEqual(rec["examined-buckets"], 5)
3882             self.failUnlessEqual(rec["examined-shares"], 3)
3883             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
3884         d.addCallback(_after_first_cycle)
3885         d.addCallback(lambda ign: self.render_json(w))
3886         def _check_json_history(json):
3887             data = simplejson.loads(json)
3888             last = data["lease-checker"]["history"]["0"]
3889             corrupt_shares = last["corrupt-shares"]
3890             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3891         d.addCallback(_check_json_history)
3892         d.addCallback(lambda ign: self.render1(w))
3893         def _check_html_history(html):
3894             s = remove_tags(html)
3895             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3896         d.addCallback(_check_html_history)
3897
3898         def _cleanup(res):
3899             self.flushLoggedErrors(UnknownMutableContainerVersionError,
3900                                    UnknownImmutableContainerVersionError)
3901             return res
3902         d.addBoth(_cleanup)
3903         return d
3904
3905     def render_json(self, page):
3906         d = self.render1(page, args={"t": ["json"]})
3907         return d
3908
3909 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3910
3911     def setUp(self):
3912         self.s = service.MultiService()
3913         self.s.startService()
3914     def tearDown(self):
3915         return self.s.stopService()
3916
3917     def test_no_server(self):
3918         w = StorageStatus(None)
3919         html = w.renderSynchronously()
3920         self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
3921
3922     def test_status(self):
3923         basedir = "storage/WebStatus/status"
3924         fileutil.make_dirs(basedir)
3925         ss = StorageServer(basedir, "\x00" * 20)
3926         ss.setServiceParent(self.s)
3927         w = StorageStatus(ss)
3928         d = self.render1(w)
3929         def _check_html(html):
3930             self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3931             s = remove_tags(html)
3932             self.failUnlessIn("Accepting new shares: Yes", s)
3933             self.failUnlessIn("Reserved space: - 0 B (0)", s)
3934         d.addCallback(_check_html)
3935         d.addCallback(lambda ign: self.render_json(w))
3936         def _check_json(json):
3937             data = simplejson.loads(json)
3938             s = data["stats"]
3939             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
3940             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
3941             self.failUnlessIn("bucket-counter", data)
3942             self.failUnlessIn("lease-checker", data)
3943         d.addCallback(_check_json)
3944         return d
3945
3946     def render_json(self, page):
3947         d = self.render1(page, args={"t": ["json"]})
3948         return d
3949
3950     @mock.patch('allmydata.util.fileutil.get_disk_stats')
3951     def test_status_no_disk_stats(self, mock_get_disk_stats):
3952         mock_get_disk_stats.side_effect = AttributeError()
3953
3954         # Some platforms may have no disk stats API. Make sure the code can handle that
3955         # (test runs on all platforms).
3956         basedir = "storage/WebStatus/status_no_disk_stats"
3957         fileutil.make_dirs(basedir)
3958         ss = StorageServer(basedir, "\x00" * 20)
3959         ss.setServiceParent(self.s)
3960         w = StorageStatus(ss)
3961         html = w.renderSynchronously()
3962         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3963         s = remove_tags(html)
3964         self.failUnlessIn("Accepting new shares: Yes", s)
3965         self.failUnlessIn("Total disk space: ?", s)
3966         self.failUnlessIn("Space Available to Tahoe: ?", s)
3967         self.failUnless(ss.get_available_space() is None)
3968
3969     @mock.patch('allmydata.util.fileutil.get_disk_stats')
3970     def test_status_bad_disk_stats(self, mock_get_disk_stats):
3971         mock_get_disk_stats.side_effect = OSError()
3972
3973         # If the API to get disk stats exists but a call to it fails, then the status should
3974         # show that no shares will be accepted, and get_available_space() should be 0.
3975         basedir = "storage/WebStatus/status_bad_disk_stats"
3976         fileutil.make_dirs(basedir)
3977         ss = StorageServer(basedir, "\x00" * 20)
3978         ss.setServiceParent(self.s)
3979         w = StorageStatus(ss)
3980         html = w.renderSynchronously()
3981         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3982         s = remove_tags(html)
3983         self.failUnlessIn("Accepting new shares: No", s)
3984         self.failUnlessIn("Total disk space: ?", s)
3985         self.failUnlessIn("Space Available to Tahoe: ?", s)
3986         self.failUnlessEqual(ss.get_available_space(), 0)
3987
3988     @mock.patch('allmydata.util.fileutil.get_disk_stats')
3989     def test_status_right_disk_stats(self, mock_get_disk_stats):
3990         GB = 1000000000
3991         total            = 5*GB
3992         free_for_root    = 4*GB
3993         free_for_nonroot = 3*GB
3994         reserved_space   = 1*GB
3995         used = total - free_for_root
3996         avail = max(free_for_nonroot - reserved_space, 0)
3997         mock_get_disk_stats.return_value = {
3998             'total': total,
3999             'free_for_root': free_for_root,
4000             'free_for_nonroot': free_for_nonroot,
4001             'used': used,
4002             'avail': avail,
4003         }
4004
4005         basedir = "storage/WebStatus/status_right_disk_stats"
4006         fileutil.make_dirs(basedir)
4007         ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved_space)
4008         expecteddir = ss.sharedir
4009         ss.setServiceParent(self.s)
4010         w = StorageStatus(ss)
4011         html = w.renderSynchronously()
4012
4013         self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
4014                     mock_get_disk_stats.call_args_list)
4015
4016         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4017         s = remove_tags(html)
4018         self.failUnlessIn("Total disk space: 5.00 GB", s)
4019         self.failUnlessIn("Disk space used: - 1.00 GB", s)
4020         self.failUnlessIn("Disk space free (root): 4.00 GB", s)
4021         self.failUnlessIn("Disk space free (non-root): 3.00 GB", s)
4022         self.failUnlessIn("Reserved space: - 1.00 GB", s)
4023         self.failUnlessIn("Space Available to Tahoe: 2.00 GB", s)
4024         self.failUnlessEqual(ss.get_available_space(), 2*GB)
4025
4026     def test_readonly(self):
4027         basedir = "storage/WebStatus/readonly"
4028         fileutil.make_dirs(basedir)
4029         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
4030         ss.setServiceParent(self.s)
4031         w = StorageStatus(ss)
4032         html = w.renderSynchronously()
4033         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4034         s = remove_tags(html)
4035         self.failUnlessIn("Accepting new shares: No", s)
4036
4037     def test_reserved(self):
4038         basedir = "storage/WebStatus/reserved"
4039         fileutil.make_dirs(basedir)
4040         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4041         ss.setServiceParent(self.s)
4042         w = StorageStatus(ss)
4043         html = w.renderSynchronously()
4044         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4045         s = remove_tags(html)
4046         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4047
4048     def test_huge_reserved(self):
4049         basedir = "storage/WebStatus/reserved"
4050         fileutil.make_dirs(basedir)
4051         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4052         ss.setServiceParent(self.s)
4053         w = StorageStatus(ss)
4054         html = w.renderSynchronously()
4055         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4056         s = remove_tags(html)
4057         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4058
4059     def test_util(self):
4060         w = StorageStatus(None)
4061         self.failUnlessEqual(w.render_space(None, None), "?")
4062         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
4063         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
4064         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
4065         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
4066         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)