]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
immutable: test whether the server allows clients to read past the end of share data...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1 import time, os.path, platform, stat, re, simplejson, struct, shutil
2
3 import mock
4
5 from twisted.trial import unittest
6
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
10 import itertools
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22      ReadBucketProxy
23 from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
24                                      LayoutInvalid, MDMFSIGNABLEHEADER, \
25                                      SIGNED_PREFIX, MDMFHEADER, \
26                                      MDMFOFFSETS, SDMFSlotWriteProxy, \
27                                      PRIVATE_KEY_SIZE, \
28                                      SIGNATURE_SIZE, \
29                                      VERIFICATION_KEY_SIZE, \
30                                      SHARE_HASH_CHAIN_SIZE
31 from allmydata.interfaces import BadWriteEnablerError
32 from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
33 from allmydata.test.common_web import WebRenderingMixin
34 from allmydata.test.no_network import NoNetworkServer
35 from allmydata.web.storage import StorageStatus, remove_prefix
36
37 class Marker:
38     pass
39 class FakeCanary:
40     def __init__(self, ignore_disconnectors=False):
41         self.ignore = ignore_disconnectors
42         self.disconnectors = {}
43     def notifyOnDisconnect(self, f, *args, **kwargs):
44         if self.ignore:
45             return
46         m = Marker()
47         self.disconnectors[m] = (f, args, kwargs)
48         return m
49     def dontNotifyOnDisconnect(self, marker):
50         if self.ignore:
51             return
52         del self.disconnectors[marker]
53
54 class FakeStatsProvider:
55     def count(self, name, delta=1):
56         pass
57     def register_producer(self, producer):
58         pass
59
60 class Bucket(unittest.TestCase):
61     def make_workdir(self, name):
62         basedir = os.path.join("storage", "Bucket", name)
63         incoming = os.path.join(basedir, "tmp", "bucket")
64         final = os.path.join(basedir, "bucket")
65         fileutil.make_dirs(basedir)
66         fileutil.make_dirs(os.path.join(basedir, "tmp"))
67         return incoming, final
68
69     def bucket_writer_closed(self, bw, consumed):
70         pass
71     def add_latency(self, category, latency):
72         pass
73     def count(self, name, delta=1):
74         pass
75
76     def make_lease(self):
77         owner_num = 0
78         renew_secret = os.urandom(32)
79         cancel_secret = os.urandom(32)
80         expiration_time = time.time() + 5000
81         return LeaseInfo(owner_num, renew_secret, cancel_secret,
82                          expiration_time, "\x00" * 20)
83
84     def test_create(self):
85         incoming, final = self.make_workdir("test_create")
86         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
87                           FakeCanary())
88         bw.remote_write(0, "a"*25)
89         bw.remote_write(25, "b"*25)
90         bw.remote_write(50, "c"*25)
91         bw.remote_write(75, "d"*7)
92         bw.remote_close()
93
94     def test_readwrite(self):
95         incoming, final = self.make_workdir("test_readwrite")
96         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
97                           FakeCanary())
98         bw.remote_write(0, "a"*25)
99         bw.remote_write(25, "b"*25)
100         bw.remote_write(50, "c"*7) # last block may be short
101         bw.remote_close()
102
103         # now read from it
104         br = BucketReader(self, bw.finalhome)
105         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
106         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
107         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
108
109     def test_read_past_end_of_share_data(self):
110         # test vector for immutable files (hard-coded contents of an immutable share
111         # file):
112
113         # The following immutable share file content is identical to that
114         # generated with storage.immutable.ShareFile from Tahoe-LAFS v1.8.2
115         # with share data == 'a'. The total size of this content is 85
116         # bytes.
117
118         containerdata = struct.pack('>LLL', 1, 1, 1)
119
120         # A Tahoe-LAFS storage client would send as the share_data a
121         # complicated string involving hash trees and a URI Extension Block
122         # -- see allmydata/immutable/layout.py . This test, which is
123         # simulating a client, just sends 'a'.
124         share_data = 'a'
125
126         ownernumber = struct.pack('>L', 0)
127         renewsecret  = 'THIS LETS ME RENEW YOUR FILE....'
128         assert len(renewsecret) == 32
129         cancelsecret = 'THIS LETS ME KILL YOUR FILE HAHA'
130         assert len(cancelsecret) == 32
131         expirationtime = struct.pack('>L', 60*60*24*31) # 31 days in seconds
132
133         lease_data = ownernumber + renewsecret + cancelsecret + expirationtime
134
135         share_file_data = containerdata + share_data + lease_data
136
137         incoming, final = self.make_workdir("test_read_past_end_of_share_data")
138
139         fileutil.write(final, share_file_data)
140
141         mockstorageserver = mock.Mock()
142
143         # Now read from it.
144         br = BucketReader(mockstorageserver, final)
145
146         self.failUnlessEqual(br.remote_read(0, len(share_data)), share_data)
147
148         # Read past the end of share data to get the cancel secret.
149         read_length = len(share_data) + len(ownernumber) + len(renewsecret) + len(cancelsecret)
150
151         result_of_read = br.remote_read(0, read_length)
152         self.failUnlessEqual(result_of_read, share_data)
153
154         result_of_read = br.remote_read(0, len(share_data)+1)
155         self.failUnlessEqual(result_of_read, share_data)
156
157 class RemoteBucket:
158
159     def __init__(self):
160         self.read_count = 0
161         self.write_count = 0
162
163     def callRemote(self, methname, *args, **kwargs):
164         def _call():
165             meth = getattr(self.target, "remote_" + methname)
166             return meth(*args, **kwargs)
167
168         if methname == "slot_readv":
169             self.read_count += 1
170         if "writev" in methname:
171             self.write_count += 1
172
173         return defer.maybeDeferred(_call)
174
175
176 class BucketProxy(unittest.TestCase):
177     def make_bucket(self, name, size):
178         basedir = os.path.join("storage", "BucketProxy", name)
179         incoming = os.path.join(basedir, "tmp", "bucket")
180         final = os.path.join(basedir, "bucket")
181         fileutil.make_dirs(basedir)
182         fileutil.make_dirs(os.path.join(basedir, "tmp"))
183         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
184                           FakeCanary())
185         rb = RemoteBucket()
186         rb.target = bw
187         return bw, rb, final
188
189     def make_lease(self):
190         owner_num = 0
191         renew_secret = os.urandom(32)
192         cancel_secret = os.urandom(32)
193         expiration_time = time.time() + 5000
194         return LeaseInfo(owner_num, renew_secret, cancel_secret,
195                          expiration_time, "\x00" * 20)
196
197     def bucket_writer_closed(self, bw, consumed):
198         pass
199     def add_latency(self, category, latency):
200         pass
201     def count(self, name, delta=1):
202         pass
203
204     def test_create(self):
205         bw, rb, sharefname = self.make_bucket("test_create", 500)
206         bp = WriteBucketProxy(rb, None,
207                               data_size=300,
208                               block_size=10,
209                               num_segments=5,
210                               num_share_hashes=3,
211                               uri_extension_size_max=500)
212         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
213
214     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
215         # Let's pretend each share has 100 bytes of data, and that there are
216         # 4 segments (25 bytes each), and 8 shares total. So the two
217         # per-segment merkle trees (crypttext_hash_tree,
218         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
219         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
220         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
221         # long. That should make the whole share:
222         #
223         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
224         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
225
226         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
227
228         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
229                             for i in range(7)]
230         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
231                         for i in range(7)]
232         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
233                         for i in (1,9,13)]
234         uri_extension = "s" + "E"*498 + "e"
235
236         bw, rb, sharefname = self.make_bucket(name, sharesize)
237         bp = wbp_class(rb, None,
238                        data_size=95,
239                        block_size=25,
240                        num_segments=4,
241                        num_share_hashes=3,
242                        uri_extension_size_max=len(uri_extension))
243
244         d = bp.put_header()
245         d.addCallback(lambda res: bp.put_block(0, "a"*25))
246         d.addCallback(lambda res: bp.put_block(1, "b"*25))
247         d.addCallback(lambda res: bp.put_block(2, "c"*25))
248         d.addCallback(lambda res: bp.put_block(3, "d"*20))
249         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
250         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
251         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
252         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
253         d.addCallback(lambda res: bp.close())
254
255         # now read everything back
256         def _start_reading(res):
257             br = BucketReader(self, sharefname)
258             rb = RemoteBucket()
259             rb.target = br
260             server = NoNetworkServer("abc", None)
261             rbp = rbp_class(rb, server, storage_index="")
262             self.failUnlessIn("to peer", repr(rbp))
263             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
264
265             d1 = rbp.get_block_data(0, 25, 25)
266             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
267             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
268             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
269             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
270             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
271             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
272             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
273
274             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
275             d1.addCallback(lambda res:
276                            self.failUnlessEqual(res, crypttext_hashes))
277             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
278             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
279             d1.addCallback(lambda res: rbp.get_share_hashes())
280             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
281             d1.addCallback(lambda res: rbp.get_uri_extension())
282             d1.addCallback(lambda res:
283                            self.failUnlessEqual(res, uri_extension))
284
285             return d1
286
287         d.addCallback(_start_reading)
288
289         return d
290
291     def test_readwrite_v1(self):
292         return self._do_test_readwrite("test_readwrite_v1",
293                                        0x24, WriteBucketProxy, ReadBucketProxy)
294
295     def test_readwrite_v2(self):
296         return self._do_test_readwrite("test_readwrite_v2",
297                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
298
299 class Server(unittest.TestCase):
300
301     def setUp(self):
302         self.sparent = LoggingServiceParent()
303         self.sparent.startService()
304         self._lease_secret = itertools.count()
305     def tearDown(self):
306         return self.sparent.stopService()
307
308     def workdir(self, name):
309         basedir = os.path.join("storage", "Server", name)
310         return basedir
311
312     def create(self, name, reserved_space=0, klass=StorageServer):
313         workdir = self.workdir(name)
314         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
315                    stats_provider=FakeStatsProvider())
316         ss.setServiceParent(self.sparent)
317         return ss
318
319     def test_create(self):
320         self.create("test_create")
321
322     def test_declares_fixed_1528(self):
323         ss = self.create("test_declares_fixed_1528")
324         ver = ss.remote_get_version()
325         sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
326         self.failUnless(sv1.get('prevents-read-past-end-of-share-data'), sv1)
327
328     def allocate(self, ss, storage_index, sharenums, size, canary=None):
329         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
330         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
331         if not canary:
332             canary = FakeCanary()
333         return ss.remote_allocate_buckets(storage_index,
334                                           renew_secret, cancel_secret,
335                                           sharenums, size, canary)
336
337     def test_large_share(self):
338         syslow = platform.system().lower()
339         if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
340             raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
341
342         avail = fileutil.get_available_space('.', 512*2**20)
343         if avail <= 4*2**30:
344             raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
345
346         ss = self.create("test_large_share")
347
348         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
349         self.failUnlessEqual(already, set())
350         self.failUnlessEqual(set(writers.keys()), set([0]))
351
352         shnum, bucket = writers.items()[0]
353         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
354         bucket.remote_write(2**32, "ab")
355         bucket.remote_close()
356
357         readers = ss.remote_get_buckets("allocate")
358         reader = readers[shnum]
359         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
360
361     def test_dont_overfill_dirs(self):
362         """
363         This test asserts that if you add a second share whose storage index
364         share lots of leading bits with an extant share (but isn't the exact
365         same storage index), this won't add an entry to the share directory.
366         """
367         ss = self.create("test_dont_overfill_dirs")
368         already, writers = self.allocate(ss, "storageindex", [0], 10)
369         for i, wb in writers.items():
370             wb.remote_write(0, "%10d" % i)
371             wb.remote_close()
372         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
373                                 "shares")
374         children_of_storedir = set(os.listdir(storedir))
375
376         # Now store another one under another storageindex that has leading
377         # chars the same as the first storageindex.
378         already, writers = self.allocate(ss, "storageindey", [0], 10)
379         for i, wb in writers.items():
380             wb.remote_write(0, "%10d" % i)
381             wb.remote_close()
382         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
383                                 "shares")
384         new_children_of_storedir = set(os.listdir(storedir))
385         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
386
387     def test_remove_incoming(self):
388         ss = self.create("test_remove_incoming")
389         already, writers = self.allocate(ss, "vid", range(3), 10)
390         for i,wb in writers.items():
391             wb.remote_write(0, "%10d" % i)
392             wb.remote_close()
393         incoming_share_dir = wb.incominghome
394         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
395         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
396         incoming_dir = os.path.dirname(incoming_prefix_dir)
397         self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
398         self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
399         self.failUnless(os.path.exists(incoming_dir), incoming_dir)
400
401     def test_abort(self):
402         # remote_abort, when called on a writer, should make sure that
403         # the allocated size of the bucket is not counted by the storage
404         # server when accounting for space.
405         ss = self.create("test_abort")
406         already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
407         self.failIfEqual(ss.allocated_size(), 0)
408
409         # Now abort the writers.
410         for writer in writers.itervalues():
411             writer.remote_abort()
412         self.failUnlessEqual(ss.allocated_size(), 0)
413
414
415     def test_allocate(self):
416         ss = self.create("test_allocate")
417
418         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
419
420         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
421         self.failUnlessEqual(already, set())
422         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
423
424         # while the buckets are open, they should not count as readable
425         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
426
427         # close the buckets
428         for i,wb in writers.items():
429             wb.remote_write(0, "%25d" % i)
430             wb.remote_close()
431             # aborting a bucket that was already closed is a no-op
432             wb.remote_abort()
433
434         # now they should be readable
435         b = ss.remote_get_buckets("allocate")
436         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
437         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
438         b_str = str(b[0])
439         self.failUnlessIn("BucketReader", b_str)
440         self.failUnlessIn("mfwgy33dmf2g 0", b_str)
441
442         # now if we ask about writing again, the server should offer those
443         # three buckets as already present. It should offer them even if we
444         # don't ask about those specific ones.
445         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
446         self.failUnlessEqual(already, set([0,1,2]))
447         self.failUnlessEqual(set(writers.keys()), set([3,4]))
448
449         # while those two buckets are open for writing, the server should
450         # refuse to offer them to uploaders
451
452         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
453         self.failUnlessEqual(already2, set([0,1,2]))
454         self.failUnlessEqual(set(writers2.keys()), set([5]))
455
456         # aborting the writes should remove the tempfiles
457         for i,wb in writers2.items():
458             wb.remote_abort()
459         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
460         self.failUnlessEqual(already2, set([0,1,2]))
461         self.failUnlessEqual(set(writers2.keys()), set([5]))
462
463         for i,wb in writers2.items():
464             wb.remote_abort()
465         for i,wb in writers.items():
466             wb.remote_abort()
467
468     def test_bad_container_version(self):
469         ss = self.create("test_bad_container_version")
470         a,w = self.allocate(ss, "si1", [0], 10)
471         w[0].remote_write(0, "\xff"*10)
472         w[0].remote_close()
473
474         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
475         f = open(fn, "rb+")
476         f.seek(0)
477         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
478         f.close()
479
480         ss.remote_get_buckets("allocate")
481
482         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
483                                   ss.remote_get_buckets, "si1")
484         self.failUnlessIn(" had version 0 but we wanted 1", str(e))
485
486     def test_disconnect(self):
487         # simulate a disconnection
488         ss = self.create("test_disconnect")
489         canary = FakeCanary()
490         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
491         self.failUnlessEqual(already, set())
492         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
493         for (f,args,kwargs) in canary.disconnectors.values():
494             f(*args, **kwargs)
495         del already
496         del writers
497
498         # that ought to delete the incoming shares
499         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
500         self.failUnlessEqual(already, set())
501         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
502
503     @mock.patch('allmydata.util.fileutil.get_disk_stats')
504     def test_reserved_space(self, mock_get_disk_stats):
505         reserved_space=10000
506         mock_get_disk_stats.return_value = {
507             'free_for_nonroot': 15000,
508             'avail': max(15000 - reserved_space, 0),
509             }
510
511         ss = self.create("test_reserved_space", reserved_space=reserved_space)
512         # 15k available, 10k reserved, leaves 5k for shares
513
514         # a newly created and filled share incurs this much overhead, beyond
515         # the size we request.
516         OVERHEAD = 3*4
517         LEASE_SIZE = 4+32+32+4
518         canary = FakeCanary(True)
519         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
520         self.failUnlessEqual(len(writers), 3)
521         # now the StorageServer should have 3000 bytes provisionally
522         # allocated, allowing only 2000 more to be claimed
523         self.failUnlessEqual(len(ss._active_writers), 3)
524
525         # allocating 1001-byte shares only leaves room for one
526         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
527         self.failUnlessEqual(len(writers2), 1)
528         self.failUnlessEqual(len(ss._active_writers), 4)
529
530         # we abandon the first set, so their provisional allocation should be
531         # returned
532         del already
533         del writers
534         self.failUnlessEqual(len(ss._active_writers), 1)
535         # now we have a provisional allocation of 1001 bytes
536
537         # and we close the second set, so their provisional allocation should
538         # become real, long-term allocation, and grows to include the
539         # overhead.
540         for bw in writers2.values():
541             bw.remote_write(0, "a"*25)
542             bw.remote_close()
543         del already2
544         del writers2
545         del bw
546         self.failUnlessEqual(len(ss._active_writers), 0)
547
548         allocated = 1001 + OVERHEAD + LEASE_SIZE
549
550         # we have to manually increase available, since we're not doing real
551         # disk measurements
552         mock_get_disk_stats.return_value = {
553             'free_for_nonroot': 15000 - allocated,
554             'avail': max(15000 - allocated - reserved_space, 0),
555             }
556
557         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
558         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
559         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
560         self.failUnlessEqual(len(writers3), 39)
561         self.failUnlessEqual(len(ss._active_writers), 39)
562
563         del already3
564         del writers3
565         self.failUnlessEqual(len(ss._active_writers), 0)
566         ss.disownServiceParent()
567         del ss
568
569     def test_seek(self):
570         basedir = self.workdir("test_seek_behavior")
571         fileutil.make_dirs(basedir)
572         filename = os.path.join(basedir, "testfile")
573         f = open(filename, "wb")
574         f.write("start")
575         f.close()
576         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
577         # files. mode="a" preserves previous contents but does not allow
578         # seeking-to-create-holes. mode="r+" allows both.
579         f = open(filename, "rb+")
580         f.seek(100)
581         f.write("100")
582         f.close()
583         filelen = os.stat(filename)[stat.ST_SIZE]
584         self.failUnlessEqual(filelen, 100+3)
585         f2 = open(filename, "rb")
586         self.failUnlessEqual(f2.read(5), "start")
587
588
589     def test_leases(self):
590         ss = self.create("test_leases")
591         canary = FakeCanary()
592         sharenums = range(5)
593         size = 100
594
595         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
596                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
597         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
598                                                      sharenums, size, canary)
599         self.failUnlessEqual(len(already), 0)
600         self.failUnlessEqual(len(writers), 5)
601         for wb in writers.values():
602             wb.remote_close()
603
604         leases = list(ss.get_leases("si0"))
605         self.failUnlessEqual(len(leases), 1)
606         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
607
608         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
609                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
610         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
611                                                      sharenums, size, canary)
612         for wb in writers.values():
613             wb.remote_close()
614
615         # take out a second lease on si1
616         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
617                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
618         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
619                                                      sharenums, size, canary)
620         self.failUnlessEqual(len(already), 5)
621         self.failUnlessEqual(len(writers), 0)
622
623         leases = list(ss.get_leases("si1"))
624         self.failUnlessEqual(len(leases), 2)
625         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
626
627         # and a third lease, using add-lease
628         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
629                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
630         ss.remote_add_lease("si1", rs2a, cs2a)
631         leases = list(ss.get_leases("si1"))
632         self.failUnlessEqual(len(leases), 3)
633         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
634
635         # add-lease on a missing storage index is silently ignored
636         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
637
638         # check that si0 is readable
639         readers = ss.remote_get_buckets("si0")
640         self.failUnlessEqual(len(readers), 5)
641
642         # renew the first lease. Only the proper renew_secret should work
643         ss.remote_renew_lease("si0", rs0)
644         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
645         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
646
647         # check that si0 is still readable
648         readers = ss.remote_get_buckets("si0")
649         self.failUnlessEqual(len(readers), 5)
650
651         # now cancel it
652         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
653         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
654         ss.remote_cancel_lease("si0", cs0)
655
656         # si0 should now be gone
657         readers = ss.remote_get_buckets("si0")
658         self.failUnlessEqual(len(readers), 0)
659         # and the renew should no longer work
660         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
661
662
663         # cancel the first lease on si1, leaving the second and third in place
664         ss.remote_cancel_lease("si1", cs1)
665         readers = ss.remote_get_buckets("si1")
666         self.failUnlessEqual(len(readers), 5)
667         # the corresponding renew should no longer work
668         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
669
670         leases = list(ss.get_leases("si1"))
671         self.failUnlessEqual(len(leases), 2)
672         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
673
674         ss.remote_renew_lease("si1", rs2)
675         # cancelling the second and third should make it go away
676         ss.remote_cancel_lease("si1", cs2)
677         ss.remote_cancel_lease("si1", cs2a)
678         readers = ss.remote_get_buckets("si1")
679         self.failUnlessEqual(len(readers), 0)
680         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
681         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
682         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
683
684         leases = list(ss.get_leases("si1"))
685         self.failUnlessEqual(len(leases), 0)
686
687
688         # test overlapping uploads
689         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
690                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
691         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
692                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
693         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
694                                                      sharenums, size, canary)
695         self.failUnlessEqual(len(already), 0)
696         self.failUnlessEqual(len(writers), 5)
697         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
698                                                        sharenums, size, canary)
699         self.failUnlessEqual(len(already2), 0)
700         self.failUnlessEqual(len(writers2), 0)
701         for wb in writers.values():
702             wb.remote_close()
703
704         leases = list(ss.get_leases("si3"))
705         self.failUnlessEqual(len(leases), 1)
706
707         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
708                                                        sharenums, size, canary)
709         self.failUnlessEqual(len(already3), 5)
710         self.failUnlessEqual(len(writers3), 0)
711
712         leases = list(ss.get_leases("si3"))
713         self.failUnlessEqual(len(leases), 2)
714
715     def test_readonly(self):
716         workdir = self.workdir("test_readonly")
717         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
718         ss.setServiceParent(self.sparent)
719
720         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
721         self.failUnlessEqual(already, set())
722         self.failUnlessEqual(writers, {})
723
724         stats = ss.get_stats()
725         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
726         if "storage_server.disk_avail" in stats:
727             # Some platforms may not have an API to get disk stats.
728             # But if there are stats, readonly_storage means disk_avail=0
729             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
730
731     def test_discard(self):
732         # discard is really only used for other tests, but we test it anyways
733         workdir = self.workdir("test_discard")
734         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
735         ss.setServiceParent(self.sparent)
736
737         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
738         self.failUnlessEqual(already, set())
739         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
740         for i,wb in writers.items():
741             wb.remote_write(0, "%25d" % i)
742             wb.remote_close()
743         # since we discard the data, the shares should be present but sparse.
744         # Since we write with some seeks, the data we read back will be all
745         # zeros.
746         b = ss.remote_get_buckets("vid")
747         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
748         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
749
750     def test_advise_corruption(self):
751         workdir = self.workdir("test_advise_corruption")
752         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
753         ss.setServiceParent(self.sparent)
754
755         si0_s = base32.b2a("si0")
756         ss.remote_advise_corrupt_share("immutable", "si0", 0,
757                                        "This share smells funny.\n")
758         reportdir = os.path.join(workdir, "corruption-advisories")
759         reports = os.listdir(reportdir)
760         self.failUnlessEqual(len(reports), 1)
761         report_si0 = reports[0]
762         self.failUnlessIn(si0_s, report_si0)
763         f = open(os.path.join(reportdir, report_si0), "r")
764         report = f.read()
765         f.close()
766         self.failUnlessIn("type: immutable", report)
767         self.failUnlessIn("storage_index: %s" % si0_s, report)
768         self.failUnlessIn("share_number: 0", report)
769         self.failUnlessIn("This share smells funny.", report)
770
771         # test the RIBucketWriter version too
772         si1_s = base32.b2a("si1")
773         already,writers = self.allocate(ss, "si1", [1], 75)
774         self.failUnlessEqual(already, set())
775         self.failUnlessEqual(set(writers.keys()), set([1]))
776         writers[1].remote_write(0, "data")
777         writers[1].remote_close()
778
779         b = ss.remote_get_buckets("si1")
780         self.failUnlessEqual(set(b.keys()), set([1]))
781         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
782
783         reports = os.listdir(reportdir)
784         self.failUnlessEqual(len(reports), 2)
785         report_si1 = [r for r in reports if si1_s in r][0]
786         f = open(os.path.join(reportdir, report_si1), "r")
787         report = f.read()
788         f.close()
789         self.failUnlessIn("type: immutable", report)
790         self.failUnlessIn("storage_index: %s" % si1_s, report)
791         self.failUnlessIn("share_number: 1", report)
792         self.failUnlessIn("This share tastes like dust.", report)
793
794
795
796 class MutableServer(unittest.TestCase):
797
798     def setUp(self):
799         self.sparent = LoggingServiceParent()
800         self._lease_secret = itertools.count()
801     def tearDown(self):
802         return self.sparent.stopService()
803
804     def workdir(self, name):
805         basedir = os.path.join("storage", "MutableServer", name)
806         return basedir
807
808     def create(self, name):
809         workdir = self.workdir(name)
810         ss = StorageServer(workdir, "\x00" * 20)
811         ss.setServiceParent(self.sparent)
812         return ss
813
814     def test_create(self):
815         self.create("test_create")
816
817     def write_enabler(self, we_tag):
818         return hashutil.tagged_hash("we_blah", we_tag)
819
820     def renew_secret(self, tag):
821         return hashutil.tagged_hash("renew_blah", str(tag))
822
823     def cancel_secret(self, tag):
824         return hashutil.tagged_hash("cancel_blah", str(tag))
825
826     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
827         write_enabler = self.write_enabler(we_tag)
828         renew_secret = self.renew_secret(lease_tag)
829         cancel_secret = self.cancel_secret(lease_tag)
830         rstaraw = ss.remote_slot_testv_and_readv_and_writev
831         testandwritev = dict( [ (shnum, ([], [], None) )
832                          for shnum in sharenums ] )
833         readv = []
834         rc = rstaraw(storage_index,
835                      (write_enabler, renew_secret, cancel_secret),
836                      testandwritev,
837                      readv)
838         (did_write, readv_data) = rc
839         self.failUnless(did_write)
840         self.failUnless(isinstance(readv_data, dict))
841         self.failUnlessEqual(len(readv_data), 0)
842
843     def test_bad_magic(self):
844         ss = self.create("test_bad_magic")
845         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
846         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
847         f = open(fn, "rb+")
848         f.seek(0)
849         f.write("BAD MAGIC")
850         f.close()
851         read = ss.remote_slot_readv
852         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
853                                   read, "si1", [0], [(0,10)])
854         self.failUnlessIn(" had magic ", str(e))
855         self.failUnlessIn(" but we wanted ", str(e))
856
857     def test_container_size(self):
858         ss = self.create("test_container_size")
859         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
860                       set([0,1,2]), 100)
861         read = ss.remote_slot_readv
862         rstaraw = ss.remote_slot_testv_and_readv_and_writev
863         secrets = ( self.write_enabler("we1"),
864                     self.renew_secret("we1"),
865                     self.cancel_secret("we1") )
866         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
867         answer = rstaraw("si1", secrets,
868                          {0: ([], [(0,data)], len(data)+12)},
869                          [])
870         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
871
872         # trying to make the container too large will raise an exception
873         TOOBIG = MutableShareFile.MAX_SIZE + 10
874         self.failUnlessRaises(DataTooLargeError,
875                               rstaraw, "si1", secrets,
876                               {0: ([], [(0,data)], TOOBIG)},
877                               [])
878
879         # it should be possible to make the container smaller, although at
880         # the moment this doesn't actually affect the share, unless the
881         # container size is dropped to zero, in which case the share is
882         # deleted.
883         answer = rstaraw("si1", secrets,
884                          {0: ([], [(0,data)], len(data)+8)},
885                          [])
886         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
887
888         answer = rstaraw("si1", secrets,
889                          {0: ([], [(0,data)], 0)},
890                          [])
891         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
892
893         read_answer = read("si1", [0], [(0,10)])
894         self.failUnlessEqual(read_answer, {})
895
896     def test_allocate(self):
897         ss = self.create("test_allocate")
898         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
899                       set([0,1,2]), 100)
900
901         read = ss.remote_slot_readv
902         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
903                              {0: [""]})
904         self.failUnlessEqual(read("si1", [], [(0, 10)]),
905                              {0: [""], 1: [""], 2: [""]})
906         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
907                              {0: [""]})
908
909         # try writing to one
910         secrets = ( self.write_enabler("we1"),
911                     self.renew_secret("we1"),
912                     self.cancel_secret("we1") )
913         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
914         write = ss.remote_slot_testv_and_readv_and_writev
915         answer = write("si1", secrets,
916                        {0: ([], [(0,data)], None)},
917                        [])
918         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
919
920         self.failUnlessEqual(read("si1", [0], [(0,20)]),
921                              {0: ["00000000001111111111"]})
922         self.failUnlessEqual(read("si1", [0], [(95,10)]),
923                              {0: ["99999"]})
924         #self.failUnlessEqual(s0.remote_get_length(), 100)
925
926         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
927         f = self.failUnlessRaises(BadWriteEnablerError,
928                                   write, "si1", bad_secrets,
929                                   {}, [])
930         self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
931
932         # this testv should fail
933         answer = write("si1", secrets,
934                        {0: ([(0, 12, "eq", "444444444444"),
935                              (20, 5, "eq", "22222"),
936                              ],
937                             [(0, "x"*100)],
938                             None),
939                         },
940                        [(0,12), (20,5)],
941                        )
942         self.failUnlessEqual(answer, (False,
943                                       {0: ["000000000011", "22222"],
944                                        1: ["", ""],
945                                        2: ["", ""],
946                                        }))
947         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
948
949         # as should this one
950         answer = write("si1", secrets,
951                        {0: ([(10, 5, "lt", "11111"),
952                              ],
953                             [(0, "x"*100)],
954                             None),
955                         },
956                        [(10,5)],
957                        )
958         self.failUnlessEqual(answer, (False,
959                                       {0: ["11111"],
960                                        1: [""],
961                                        2: [""]},
962                                       ))
963         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
964
965
966     def test_operators(self):
967         # test operators, the data we're comparing is '11111' in all cases.
968         # test both fail+pass, reset data after each one.
969         ss = self.create("test_operators")
970
971         secrets = ( self.write_enabler("we1"),
972                     self.renew_secret("we1"),
973                     self.cancel_secret("we1") )
974         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
975         write = ss.remote_slot_testv_and_readv_and_writev
976         read = ss.remote_slot_readv
977
978         def reset():
979             write("si1", secrets,
980                   {0: ([], [(0,data)], None)},
981                   [])
982
983         reset()
984
985         #  lt
986         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
987                                              ],
988                                             [(0, "x"*100)],
989                                             None,
990                                             )}, [(10,5)])
991         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
992         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
993         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
994         reset()
995
996         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
997                                              ],
998                                             [(0, "x"*100)],
999                                             None,
1000                                             )}, [(10,5)])
1001         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1002         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1003         reset()
1004
1005         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
1006                                              ],
1007                                             [(0, "y"*100)],
1008                                             None,
1009                                             )}, [(10,5)])
1010         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1011         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1012         reset()
1013
1014         #  le
1015         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
1016                                              ],
1017                                             [(0, "x"*100)],
1018                                             None,
1019                                             )}, [(10,5)])
1020         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1021         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1022         reset()
1023
1024         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
1025                                              ],
1026                                             [(0, "y"*100)],
1027                                             None,
1028                                             )}, [(10,5)])
1029         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1030         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1031         reset()
1032
1033         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
1034                                              ],
1035                                             [(0, "y"*100)],
1036                                             None,
1037                                             )}, [(10,5)])
1038         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1039         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1040         reset()
1041
1042         #  eq
1043         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
1044                                              ],
1045                                             [(0, "x"*100)],
1046                                             None,
1047                                             )}, [(10,5)])
1048         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1049         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1050         reset()
1051
1052         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
1053                                              ],
1054                                             [(0, "y"*100)],
1055                                             None,
1056                                             )}, [(10,5)])
1057         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1058         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1059         reset()
1060
1061         #  ne
1062         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
1063                                              ],
1064                                             [(0, "x"*100)],
1065                                             None,
1066                                             )}, [(10,5)])
1067         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1068         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1069         reset()
1070
1071         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1072                                              ],
1073                                             [(0, "y"*100)],
1074                                             None,
1075                                             )}, [(10,5)])
1076         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1077         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1078         reset()
1079
1080         #  ge
1081         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1082                                              ],
1083                                             [(0, "y"*100)],
1084                                             None,
1085                                             )}, [(10,5)])
1086         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1087         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1088         reset()
1089
1090         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1091                                              ],
1092                                             [(0, "y"*100)],
1093                                             None,
1094                                             )}, [(10,5)])
1095         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1096         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1097         reset()
1098
1099         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1100                                              ],
1101                                             [(0, "y"*100)],
1102                                             None,
1103                                             )}, [(10,5)])
1104         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1105         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1106         reset()
1107
1108         #  gt
1109         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1110                                              ],
1111                                             [(0, "y"*100)],
1112                                             None,
1113                                             )}, [(10,5)])
1114         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1115         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1116         reset()
1117
1118         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1119                                              ],
1120                                             [(0, "x"*100)],
1121                                             None,
1122                                             )}, [(10,5)])
1123         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1124         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1125         reset()
1126
1127         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1128                                              ],
1129                                             [(0, "x"*100)],
1130                                             None,
1131                                             )}, [(10,5)])
1132         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1133         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1134         reset()
1135
1136         # finally, test some operators against empty shares
1137         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1138                                              ],
1139                                             [(0, "x"*100)],
1140                                             None,
1141                                             )}, [(10,5)])
1142         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1143         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1144         reset()
1145
1146     def test_readv(self):
1147         ss = self.create("test_readv")
1148         secrets = ( self.write_enabler("we1"),
1149                     self.renew_secret("we1"),
1150                     self.cancel_secret("we1") )
1151         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1152         write = ss.remote_slot_testv_and_readv_and_writev
1153         read = ss.remote_slot_readv
1154         data = [("%d" % i) * 100 for i in range(3)]
1155         rc = write("si1", secrets,
1156                    {0: ([], [(0,data[0])], None),
1157                     1: ([], [(0,data[1])], None),
1158                     2: ([], [(0,data[2])], None),
1159                     }, [])
1160         self.failUnlessEqual(rc, (True, {}))
1161
1162         answer = read("si1", [], [(0, 10)])
1163         self.failUnlessEqual(answer, {0: ["0"*10],
1164                                       1: ["1"*10],
1165                                       2: ["2"*10]})
1166
1167     def compare_leases_without_timestamps(self, leases_a, leases_b):
1168         self.failUnlessEqual(len(leases_a), len(leases_b))
1169         for i in range(len(leases_a)):
1170             a = leases_a[i]
1171             b = leases_b[i]
1172             self.failUnlessEqual(a.owner_num,       b.owner_num)
1173             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1174             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1175             self.failUnlessEqual(a.nodeid,          b.nodeid)
1176
1177     def compare_leases(self, leases_a, leases_b):
1178         self.failUnlessEqual(len(leases_a), len(leases_b))
1179         for i in range(len(leases_a)):
1180             a = leases_a[i]
1181             b = leases_b[i]
1182             self.failUnlessEqual(a.owner_num,       b.owner_num)
1183             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1184             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1185             self.failUnlessEqual(a.nodeid,          b.nodeid)
1186             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1187
1188     def test_leases(self):
1189         ss = self.create("test_leases")
1190         def secrets(n):
1191             return ( self.write_enabler("we1"),
1192                      self.renew_secret("we1-%d" % n),
1193                      self.cancel_secret("we1-%d" % n) )
1194         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1195         write = ss.remote_slot_testv_and_readv_and_writev
1196         read = ss.remote_slot_readv
1197         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1198         self.failUnlessEqual(rc, (True, {}))
1199
1200         # create a random non-numeric file in the bucket directory, to
1201         # exercise the code that's supposed to ignore those.
1202         bucket_dir = os.path.join(self.workdir("test_leases"),
1203                                   "shares", storage_index_to_dir("si1"))
1204         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1205         f.write("you ought to be ignoring me\n")
1206         f.close()
1207
1208         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1209         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1210
1211         # add-lease on a missing storage index is silently ignored
1212         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1213
1214         # re-allocate the slots and use the same secrets, that should update
1215         # the lease
1216         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1217         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1218
1219         # renew it directly
1220         ss.remote_renew_lease("si1", secrets(0)[1])
1221         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1222
1223         # now allocate them with a bunch of different secrets, to trigger the
1224         # extended lease code. Use add_lease for one of them.
1225         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1226         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1227         secrets2 = secrets(2)
1228         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1229         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1230         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1231         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1232         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1233
1234         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1235
1236         # cancel one of them
1237         ss.remote_cancel_lease("si1", secrets(5)[2])
1238         self.failUnlessEqual(len(list(s0.get_leases())), 5)
1239
1240         all_leases = list(s0.get_leases())
1241         # and write enough data to expand the container, forcing the server
1242         # to move the leases
1243         write("si1", secrets(0),
1244               {0: ([], [(0,data)], 200), },
1245               [])
1246
1247         # read back the leases, make sure they're still intact.
1248         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1249
1250         ss.remote_renew_lease("si1", secrets(0)[1])
1251         ss.remote_renew_lease("si1", secrets(1)[1])
1252         ss.remote_renew_lease("si1", secrets(2)[1])
1253         ss.remote_renew_lease("si1", secrets(3)[1])
1254         ss.remote_renew_lease("si1", secrets(4)[1])
1255         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1256         # get a new copy of the leases, with the current timestamps. Reading
1257         # data and failing to renew/cancel leases should leave the timestamps
1258         # alone.
1259         all_leases = list(s0.get_leases())
1260         # renewing with a bogus token should prompt an error message
1261
1262         # examine the exception thus raised, make sure the old nodeid is
1263         # present, to provide for share migration
1264         e = self.failUnlessRaises(IndexError,
1265                                   ss.remote_renew_lease, "si1",
1266                                   secrets(20)[1])
1267         e_s = str(e)
1268         self.failUnlessIn("Unable to renew non-existent lease", e_s)
1269         self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1270         self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1271
1272         # same for cancelling
1273         self.failUnlessRaises(IndexError,
1274                               ss.remote_cancel_lease, "si1",
1275                               secrets(20)[2])
1276         self.compare_leases(all_leases, list(s0.get_leases()))
1277
1278         # reading shares should not modify the timestamp
1279         read("si1", [], [(0,200)])
1280         self.compare_leases(all_leases, list(s0.get_leases()))
1281
1282         write("si1", secrets(0),
1283               {0: ([], [(200, "make me bigger")], None)}, [])
1284         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1285
1286         write("si1", secrets(0),
1287               {0: ([], [(500, "make me really bigger")], None)}, [])
1288         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1289
1290         # now cancel them all
1291         ss.remote_cancel_lease("si1", secrets(0)[2])
1292         ss.remote_cancel_lease("si1", secrets(1)[2])
1293         ss.remote_cancel_lease("si1", secrets(2)[2])
1294         ss.remote_cancel_lease("si1", secrets(3)[2])
1295
1296         # the slot should still be there
1297         remaining_shares = read("si1", [], [(0,10)])
1298         self.failUnlessEqual(len(remaining_shares), 1)
1299         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1300
1301         # cancelling a non-existent lease should raise an IndexError
1302         self.failUnlessRaises(IndexError,
1303                               ss.remote_cancel_lease, "si1", "nonsecret")
1304
1305         # and the slot should still be there
1306         remaining_shares = read("si1", [], [(0,10)])
1307         self.failUnlessEqual(len(remaining_shares), 1)
1308         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1309
1310         ss.remote_cancel_lease("si1", secrets(4)[2])
1311         # now the slot should be gone
1312         no_shares = read("si1", [], [(0,10)])
1313         self.failUnlessEqual(no_shares, {})
1314
1315         # cancelling a lease on a non-existent share should raise an IndexError
1316         self.failUnlessRaises(IndexError,
1317                               ss.remote_cancel_lease, "si2", "nonsecret")
1318
1319     def test_remove(self):
1320         ss = self.create("test_remove")
1321         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1322                       set([0,1,2]), 100)
1323         readv = ss.remote_slot_readv
1324         writev = ss.remote_slot_testv_and_readv_and_writev
1325         secrets = ( self.write_enabler("we1"),
1326                     self.renew_secret("we1"),
1327                     self.cancel_secret("we1") )
1328         # delete sh0 by setting its size to zero
1329         answer = writev("si1", secrets,
1330                         {0: ([], [], 0)},
1331                         [])
1332         # the answer should mention all the shares that existed before the
1333         # write
1334         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1335         # but a new read should show only sh1 and sh2
1336         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1337                              {1: [""], 2: [""]})
1338
1339         # delete sh1 by setting its size to zero
1340         answer = writev("si1", secrets,
1341                         {1: ([], [], 0)},
1342                         [])
1343         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1344         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1345                              {2: [""]})
1346
1347         # delete sh2 by setting its size to zero
1348         answer = writev("si1", secrets,
1349                         {2: ([], [], 0)},
1350                         [])
1351         self.failUnlessEqual(answer, (True, {2:[]}) )
1352         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1353                              {})
1354         # and the bucket directory should now be gone
1355         si = base32.b2a("si1")
1356         # note: this is a detail of the storage server implementation, and
1357         # may change in the future
1358         prefix = si[:2]
1359         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1360         bucketdir = os.path.join(prefixdir, si)
1361         self.failUnless(os.path.exists(prefixdir), prefixdir)
1362         self.failIf(os.path.exists(bucketdir), bucketdir)
1363
1364
1365 class MDMFProxies(unittest.TestCase, ShouldFailMixin):
1366     def setUp(self):
1367         self.sparent = LoggingServiceParent()
1368         self._lease_secret = itertools.count()
1369         self.ss = self.create("MDMFProxies storage test server")
1370         self.rref = RemoteBucket()
1371         self.rref.target = self.ss
1372         self.secrets = (self.write_enabler("we_secret"),
1373                         self.renew_secret("renew_secret"),
1374                         self.cancel_secret("cancel_secret"))
1375         self.segment = "aaaaaa"
1376         self.block = "aa"
1377         self.salt = "a" * 16
1378         self.block_hash = "a" * 32
1379         self.block_hash_tree = [self.block_hash for i in xrange(6)]
1380         self.share_hash = self.block_hash
1381         self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
1382         self.signature = "foobarbaz"
1383         self.verification_key = "vvvvvv"
1384         self.encprivkey = "private"
1385         self.root_hash = self.block_hash
1386         self.salt_hash = self.root_hash
1387         self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
1388         self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
1389         self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
1390         # blockhashes and salt hashes are serialized in the same way,
1391         # only we lop off the first element and store that in the
1392         # header.
1393         self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
1394
1395
1396     def tearDown(self):
1397         self.sparent.stopService()
1398         shutil.rmtree(self.workdir("MDMFProxies storage test server"))
1399
1400
1401     def write_enabler(self, we_tag):
1402         return hashutil.tagged_hash("we_blah", we_tag)
1403
1404
1405     def renew_secret(self, tag):
1406         return hashutil.tagged_hash("renew_blah", str(tag))
1407
1408
1409     def cancel_secret(self, tag):
1410         return hashutil.tagged_hash("cancel_blah", str(tag))
1411
1412
1413     def workdir(self, name):
1414         basedir = os.path.join("storage", "MutableServer", name)
1415         return basedir
1416
1417
1418     def create(self, name):
1419         workdir = self.workdir(name)
1420         ss = StorageServer(workdir, "\x00" * 20)
1421         ss.setServiceParent(self.sparent)
1422         return ss
1423
1424
1425     def build_test_mdmf_share(self, tail_segment=False, empty=False):
1426         # Start with the checkstring
1427         data = struct.pack(">BQ32s",
1428                            1,
1429                            0,
1430                            self.root_hash)
1431         self.checkstring = data
1432         # Next, the encoding parameters
1433         if tail_segment:
1434             data += struct.pack(">BBQQ",
1435                                 3,
1436                                 10,
1437                                 6,
1438                                 33)
1439         elif empty:
1440             data += struct.pack(">BBQQ",
1441                                 3,
1442                                 10,
1443                                 0,
1444                                 0)
1445         else:
1446             data += struct.pack(">BBQQ",
1447                                 3,
1448                                 10,
1449                                 6,
1450                                 36)
1451         # Now we'll build the offsets.
1452         sharedata = ""
1453         if not tail_segment and not empty:
1454             for i in xrange(6):
1455                 sharedata += self.salt + self.block
1456         elif tail_segment:
1457             for i in xrange(5):
1458                 sharedata += self.salt + self.block
1459             sharedata += self.salt + "a"
1460
1461         # The encrypted private key comes after the shares + salts
1462         offset_size = struct.calcsize(MDMFOFFSETS)
1463         encrypted_private_key_offset = len(data) + offset_size
1464         # The share has chain comes after the private key
1465         sharehashes_offset = encrypted_private_key_offset + \
1466             len(self.encprivkey)
1467
1468         # The signature comes after the share hash chain.
1469         signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
1470
1471         verification_key_offset = signature_offset + len(self.signature)
1472         verification_key_end = verification_key_offset + \
1473             len(self.verification_key)
1474
1475         share_data_offset = offset_size
1476         share_data_offset += PRIVATE_KEY_SIZE
1477         share_data_offset += SIGNATURE_SIZE
1478         share_data_offset += VERIFICATION_KEY_SIZE
1479         share_data_offset += SHARE_HASH_CHAIN_SIZE
1480
1481         blockhashes_offset = share_data_offset + len(sharedata)
1482         eof_offset = blockhashes_offset + len(self.block_hash_tree_s)
1483
1484         data += struct.pack(MDMFOFFSETS,
1485                             encrypted_private_key_offset,
1486                             sharehashes_offset,
1487                             signature_offset,
1488                             verification_key_offset,
1489                             verification_key_end,
1490                             share_data_offset,
1491                             blockhashes_offset,
1492                             eof_offset)
1493
1494         self.offsets = {}
1495         self.offsets['enc_privkey'] = encrypted_private_key_offset
1496         self.offsets['block_hash_tree'] = blockhashes_offset
1497         self.offsets['share_hash_chain'] = sharehashes_offset
1498         self.offsets['signature'] = signature_offset
1499         self.offsets['verification_key'] = verification_key_offset
1500         self.offsets['share_data'] = share_data_offset
1501         self.offsets['verification_key_end'] = verification_key_end
1502         self.offsets['EOF'] = eof_offset
1503
1504         # the private key,
1505         data += self.encprivkey
1506         # the sharehashes
1507         data += self.share_hash_chain_s
1508         # the signature,
1509         data += self.signature
1510         # and the verification key
1511         data += self.verification_key
1512         # Then we'll add in gibberish until we get to the right point.
1513         nulls = "".join([" " for i in xrange(len(data), share_data_offset)])
1514         data += nulls
1515
1516         # Then the share data
1517         data += sharedata
1518         # the blockhashes
1519         data += self.block_hash_tree_s
1520         return data
1521
1522
1523     def write_test_share_to_server(self,
1524                                    storage_index,
1525                                    tail_segment=False,
1526                                    empty=False):
1527         """
1528         I write some data for the read tests to read to self.ss
1529
1530         If tail_segment=True, then I will write a share that has a
1531         smaller tail segment than other segments.
1532         """
1533         write = self.ss.remote_slot_testv_and_readv_and_writev
1534         data = self.build_test_mdmf_share(tail_segment, empty)
1535         # Finally, we write the whole thing to the storage server in one
1536         # pass.
1537         testvs = [(0, 1, "eq", "")]
1538         tws = {}
1539         tws[0] = (testvs, [(0, data)], None)
1540         readv = [(0, 1)]
1541         results = write(storage_index, self.secrets, tws, readv)
1542         self.failUnless(results[0])
1543
1544
1545     def build_test_sdmf_share(self, empty=False):
1546         if empty:
1547             sharedata = ""
1548         else:
1549             sharedata = self.segment * 6
1550         self.sharedata = sharedata
1551         blocksize = len(sharedata) / 3
1552         block = sharedata[:blocksize]
1553         self.blockdata = block
1554         prefix = struct.pack(">BQ32s16s BBQQ",
1555                              0, # version,
1556                              0,
1557                              self.root_hash,
1558                              self.salt,
1559                              3,
1560                              10,
1561                              len(sharedata),
1562                              len(sharedata),
1563                             )
1564         post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
1565         signature_offset = post_offset + len(self.verification_key)
1566         sharehashes_offset = signature_offset + len(self.signature)
1567         blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
1568         sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
1569         encprivkey_offset = sharedata_offset + len(block)
1570         eof_offset = encprivkey_offset + len(self.encprivkey)
1571         offsets = struct.pack(">LLLLQQ",
1572                               signature_offset,
1573                               sharehashes_offset,
1574                               blockhashes_offset,
1575                               sharedata_offset,
1576                               encprivkey_offset,
1577                               eof_offset)
1578         final_share = "".join([prefix,
1579                            offsets,
1580                            self.verification_key,
1581                            self.signature,
1582                            self.share_hash_chain_s,
1583                            self.block_hash_tree_s,
1584                            block,
1585                            self.encprivkey])
1586         self.offsets = {}
1587         self.offsets['signature'] = signature_offset
1588         self.offsets['share_hash_chain'] = sharehashes_offset
1589         self.offsets['block_hash_tree'] = blockhashes_offset
1590         self.offsets['share_data'] = sharedata_offset
1591         self.offsets['enc_privkey'] = encprivkey_offset
1592         self.offsets['EOF'] = eof_offset
1593         return final_share
1594
1595
1596     def write_sdmf_share_to_server(self,
1597                                    storage_index,
1598                                    empty=False):
1599         # Some tests need SDMF shares to verify that we can still 
1600         # read them. This method writes one, which resembles but is not
1601         assert self.rref
1602         write = self.ss.remote_slot_testv_and_readv_and_writev
1603         share = self.build_test_sdmf_share(empty)
1604         testvs = [(0, 1, "eq", "")]
1605         tws = {}
1606         tws[0] = (testvs, [(0, share)], None)
1607         readv = []
1608         results = write(storage_index, self.secrets, tws, readv)
1609         self.failUnless(results[0])
1610
1611
1612     def test_read(self):
1613         self.write_test_share_to_server("si1")
1614         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1615         # Check that every method equals what we expect it to.
1616         d = defer.succeed(None)
1617         def _check_block_and_salt((block, salt)):
1618             self.failUnlessEqual(block, self.block)
1619             self.failUnlessEqual(salt, self.salt)
1620
1621         for i in xrange(6):
1622             d.addCallback(lambda ignored, i=i:
1623                 mr.get_block_and_salt(i))
1624             d.addCallback(_check_block_and_salt)
1625
1626         d.addCallback(lambda ignored:
1627             mr.get_encprivkey())
1628         d.addCallback(lambda encprivkey:
1629             self.failUnlessEqual(self.encprivkey, encprivkey))
1630
1631         d.addCallback(lambda ignored:
1632             mr.get_blockhashes())
1633         d.addCallback(lambda blockhashes:
1634             self.failUnlessEqual(self.block_hash_tree, blockhashes))
1635
1636         d.addCallback(lambda ignored:
1637             mr.get_sharehashes())
1638         d.addCallback(lambda sharehashes:
1639             self.failUnlessEqual(self.share_hash_chain, sharehashes))
1640
1641         d.addCallback(lambda ignored:
1642             mr.get_signature())
1643         d.addCallback(lambda signature:
1644             self.failUnlessEqual(signature, self.signature))
1645
1646         d.addCallback(lambda ignored:
1647             mr.get_verification_key())
1648         d.addCallback(lambda verification_key:
1649             self.failUnlessEqual(verification_key, self.verification_key))
1650
1651         d.addCallback(lambda ignored:
1652             mr.get_seqnum())
1653         d.addCallback(lambda seqnum:
1654             self.failUnlessEqual(seqnum, 0))
1655
1656         d.addCallback(lambda ignored:
1657             mr.get_root_hash())
1658         d.addCallback(lambda root_hash:
1659             self.failUnlessEqual(self.root_hash, root_hash))
1660
1661         d.addCallback(lambda ignored:
1662             mr.get_seqnum())
1663         d.addCallback(lambda seqnum:
1664             self.failUnlessEqual(0, seqnum))
1665
1666         d.addCallback(lambda ignored:
1667             mr.get_encoding_parameters())
1668         def _check_encoding_parameters((k, n, segsize, datalen)):
1669             self.failUnlessEqual(k, 3)
1670             self.failUnlessEqual(n, 10)
1671             self.failUnlessEqual(segsize, 6)
1672             self.failUnlessEqual(datalen, 36)
1673         d.addCallback(_check_encoding_parameters)
1674
1675         d.addCallback(lambda ignored:
1676             mr.get_checkstring())
1677         d.addCallback(lambda checkstring:
1678             self.failUnlessEqual(checkstring, checkstring))
1679         return d
1680
1681
1682     def test_read_with_different_tail_segment_size(self):
1683         self.write_test_share_to_server("si1", tail_segment=True)
1684         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1685         d = mr.get_block_and_salt(5)
1686         def _check_tail_segment(results):
1687             block, salt = results
1688             self.failUnlessEqual(len(block), 1)
1689             self.failUnlessEqual(block, "a")
1690         d.addCallback(_check_tail_segment)
1691         return d
1692
1693
1694     def test_get_block_with_invalid_segnum(self):
1695         self.write_test_share_to_server("si1")
1696         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1697         d = defer.succeed(None)
1698         d.addCallback(lambda ignored:
1699             self.shouldFail(LayoutInvalid, "test invalid segnum",
1700                             None,
1701                             mr.get_block_and_salt, 7))
1702         return d
1703
1704
1705     def test_get_encoding_parameters_first(self):
1706         self.write_test_share_to_server("si1")
1707         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1708         d = mr.get_encoding_parameters()
1709         def _check_encoding_parameters((k, n, segment_size, datalen)):
1710             self.failUnlessEqual(k, 3)
1711             self.failUnlessEqual(n, 10)
1712             self.failUnlessEqual(segment_size, 6)
1713             self.failUnlessEqual(datalen, 36)
1714         d.addCallback(_check_encoding_parameters)
1715         return d
1716
1717
1718     def test_get_seqnum_first(self):
1719         self.write_test_share_to_server("si1")
1720         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1721         d = mr.get_seqnum()
1722         d.addCallback(lambda seqnum:
1723             self.failUnlessEqual(seqnum, 0))
1724         return d
1725
1726
1727     def test_get_root_hash_first(self):
1728         self.write_test_share_to_server("si1")
1729         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1730         d = mr.get_root_hash()
1731         d.addCallback(lambda root_hash:
1732             self.failUnlessEqual(root_hash, self.root_hash))
1733         return d
1734
1735
1736     def test_get_checkstring_first(self):
1737         self.write_test_share_to_server("si1")
1738         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1739         d = mr.get_checkstring()
1740         d.addCallback(lambda checkstring:
1741             self.failUnlessEqual(checkstring, self.checkstring))
1742         return d
1743
1744
1745     def test_write_read_vectors(self):
1746         # When writing for us, the storage server will return to us a
1747         # read vector, along with its result. If a write fails because
1748         # the test vectors failed, this read vector can help us to
1749         # diagnose the problem. This test ensures that the read vector
1750         # is working appropriately.
1751         mw = self._make_new_mw("si1", 0)
1752
1753         for i in xrange(6):
1754             mw.put_block(self.block, i, self.salt)
1755         mw.put_encprivkey(self.encprivkey)
1756         mw.put_blockhashes(self.block_hash_tree)
1757         mw.put_sharehashes(self.share_hash_chain)
1758         mw.put_root_hash(self.root_hash)
1759         mw.put_signature(self.signature)
1760         mw.put_verification_key(self.verification_key)
1761         d = mw.finish_publishing()
1762         def _then(results):
1763             self.failUnless(len(results), 2)
1764             result, readv = results
1765             self.failUnless(result)
1766             self.failIf(readv)
1767             self.old_checkstring = mw.get_checkstring()
1768             mw.set_checkstring("")
1769         d.addCallback(_then)
1770         d.addCallback(lambda ignored:
1771             mw.finish_publishing())
1772         def _then_again(results):
1773             self.failUnlessEqual(len(results), 2)
1774             result, readvs = results
1775             self.failIf(result)
1776             self.failUnlessIn(0, readvs)
1777             readv = readvs[0][0]
1778             self.failUnlessEqual(readv, self.old_checkstring)
1779         d.addCallback(_then_again)
1780         # The checkstring remains the same for the rest of the process.
1781         return d
1782
1783
1784     def test_private_key_after_share_hash_chain(self):
1785         mw = self._make_new_mw("si1", 0)
1786         d = defer.succeed(None)
1787         for i in xrange(6):
1788             d.addCallback(lambda ignored, i=i:
1789                 mw.put_block(self.block, i, self.salt))
1790         d.addCallback(lambda ignored:
1791             mw.put_encprivkey(self.encprivkey))
1792         d.addCallback(lambda ignored:
1793             mw.put_sharehashes(self.share_hash_chain))
1794
1795         # Now try to put the private key again.
1796         d.addCallback(lambda ignored:
1797             self.shouldFail(LayoutInvalid, "test repeat private key",
1798                             None,
1799                             mw.put_encprivkey, self.encprivkey))
1800         return d
1801
1802
1803     def test_signature_after_verification_key(self):
1804         mw = self._make_new_mw("si1", 0)
1805         d = defer.succeed(None)
1806         # Put everything up to and including the verification key.
1807         for i in xrange(6):
1808             d.addCallback(lambda ignored, i=i:
1809                 mw.put_block(self.block, i, self.salt))
1810         d.addCallback(lambda ignored:
1811             mw.put_encprivkey(self.encprivkey))
1812         d.addCallback(lambda ignored:
1813             mw.put_blockhashes(self.block_hash_tree))
1814         d.addCallback(lambda ignored:
1815             mw.put_sharehashes(self.share_hash_chain))
1816         d.addCallback(lambda ignored:
1817             mw.put_root_hash(self.root_hash))
1818         d.addCallback(lambda ignored:
1819             mw.put_signature(self.signature))
1820         d.addCallback(lambda ignored:
1821             mw.put_verification_key(self.verification_key))
1822         # Now try to put the signature again. This should fail
1823         d.addCallback(lambda ignored:
1824             self.shouldFail(LayoutInvalid, "signature after verification",
1825                             None,
1826                             mw.put_signature, self.signature))
1827         return d
1828
1829
1830     def test_uncoordinated_write(self):
1831         # Make two mutable writers, both pointing to the same storage
1832         # server, both at the same storage index, and try writing to the
1833         # same share.
1834         mw1 = self._make_new_mw("si1", 0)
1835         mw2 = self._make_new_mw("si1", 0)
1836
1837         def _check_success(results):
1838             result, readvs = results
1839             self.failUnless(result)
1840
1841         def _check_failure(results):
1842             result, readvs = results
1843             self.failIf(result)
1844
1845         def _write_share(mw):
1846             for i in xrange(6):
1847                 mw.put_block(self.block, i, self.salt)
1848             mw.put_encprivkey(self.encprivkey)
1849             mw.put_blockhashes(self.block_hash_tree)
1850             mw.put_sharehashes(self.share_hash_chain)
1851             mw.put_root_hash(self.root_hash)
1852             mw.put_signature(self.signature)
1853             mw.put_verification_key(self.verification_key)
1854             return mw.finish_publishing()
1855         d = _write_share(mw1)
1856         d.addCallback(_check_success)
1857         d.addCallback(lambda ignored:
1858             _write_share(mw2))
1859         d.addCallback(_check_failure)
1860         return d
1861
1862
1863     def test_invalid_salt_size(self):
1864         # Salts need to be 16 bytes in size. Writes that attempt to
1865         # write more or less than this should be rejected.
1866         mw = self._make_new_mw("si1", 0)
1867         invalid_salt = "a" * 17 # 17 bytes
1868         another_invalid_salt = "b" * 15 # 15 bytes
1869         d = defer.succeed(None)
1870         d.addCallback(lambda ignored:
1871             self.shouldFail(LayoutInvalid, "salt too big",
1872                             None,
1873                             mw.put_block, self.block, 0, invalid_salt))
1874         d.addCallback(lambda ignored:
1875             self.shouldFail(LayoutInvalid, "salt too small",
1876                             None,
1877                             mw.put_block, self.block, 0,
1878                             another_invalid_salt))
1879         return d
1880
1881
1882     def test_write_test_vectors(self):
1883         # If we give the write proxy a bogus test vector at 
1884         # any point during the process, it should fail to write when we 
1885         # tell it to write.
1886         def _check_failure(results):
1887             self.failUnlessEqual(len(results), 2)
1888             res, d = results
1889             self.failIf(res)
1890
1891         def _check_success(results):
1892             self.failUnlessEqual(len(results), 2)
1893             res, d = results
1894             self.failUnless(results)
1895
1896         mw = self._make_new_mw("si1", 0)
1897         mw.set_checkstring("this is a lie")
1898         for i in xrange(6):
1899             mw.put_block(self.block, i, self.salt)
1900         mw.put_encprivkey(self.encprivkey)
1901         mw.put_blockhashes(self.block_hash_tree)
1902         mw.put_sharehashes(self.share_hash_chain)
1903         mw.put_root_hash(self.root_hash)
1904         mw.put_signature(self.signature)
1905         mw.put_verification_key(self.verification_key)
1906         d = mw.finish_publishing()
1907         d.addCallback(_check_failure)
1908         d.addCallback(lambda ignored:
1909             mw.set_checkstring(""))
1910         d.addCallback(lambda ignored:
1911             mw.finish_publishing())
1912         d.addCallback(_check_success)
1913         return d
1914
1915
1916     def serialize_blockhashes(self, blockhashes):
1917         return "".join(blockhashes)
1918
1919
1920     def serialize_sharehashes(self, sharehashes):
1921         ret = "".join([struct.pack(">H32s", i, sharehashes[i])
1922                         for i in sorted(sharehashes.keys())])
1923         return ret
1924
1925
1926     def test_write(self):
1927         # This translates to a file with 6 6-byte segments, and with 2-byte
1928         # blocks.
1929         mw = self._make_new_mw("si1", 0)
1930         # Test writing some blocks.
1931         read = self.ss.remote_slot_readv
1932         expected_private_key_offset = struct.calcsize(MDMFHEADER)
1933         expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
1934                                     PRIVATE_KEY_SIZE + \
1935                                     SIGNATURE_SIZE + \
1936                                     VERIFICATION_KEY_SIZE + \
1937                                     SHARE_HASH_CHAIN_SIZE
1938         written_block_size = 2 + len(self.salt)
1939         written_block = self.block + self.salt
1940         for i in xrange(6):
1941             mw.put_block(self.block, i, self.salt)
1942
1943         mw.put_encprivkey(self.encprivkey)
1944         mw.put_blockhashes(self.block_hash_tree)
1945         mw.put_sharehashes(self.share_hash_chain)
1946         mw.put_root_hash(self.root_hash)
1947         mw.put_signature(self.signature)
1948         mw.put_verification_key(self.verification_key)
1949         d = mw.finish_publishing()
1950         def _check_publish(results):
1951             self.failUnlessEqual(len(results), 2)
1952             result, ign = results
1953             self.failUnless(result, "publish failed")
1954             for i in xrange(6):
1955                 self.failUnlessEqual(read("si1", [0], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]),
1956                                 {0: [written_block]})
1957
1958             self.failUnlessEqual(len(self.encprivkey), 7)
1959             self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]),
1960                                  {0: [self.encprivkey]})
1961
1962             expected_block_hash_offset = expected_sharedata_offset + \
1963                         (6 * written_block_size)
1964             self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6)
1965             self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]),
1966                                  {0: [self.block_hash_tree_s]})
1967
1968             expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey)
1969             self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
1970                                  {0: [self.share_hash_chain_s]})
1971
1972             self.failUnlessEqual(read("si1", [0], [(9, 32)]),
1973                                  {0: [self.root_hash]})
1974             expected_signature_offset = expected_share_hash_offset + \
1975                 len(self.share_hash_chain_s)
1976             self.failUnlessEqual(len(self.signature), 9)
1977             self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]),
1978                                  {0: [self.signature]})
1979
1980             expected_verification_key_offset = expected_signature_offset + len(self.signature)
1981             self.failUnlessEqual(len(self.verification_key), 6)
1982             self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]),
1983                                  {0: [self.verification_key]})
1984
1985             signable = mw.get_signable()
1986             verno, seq, roothash, k, n, segsize, datalen = \
1987                                             struct.unpack(">BQ32sBBQQ",
1988                                                           signable)
1989             self.failUnlessEqual(verno, 1)
1990             self.failUnlessEqual(seq, 0)
1991             self.failUnlessEqual(roothash, self.root_hash)
1992             self.failUnlessEqual(k, 3)
1993             self.failUnlessEqual(n, 10)
1994             self.failUnlessEqual(segsize, 6)
1995             self.failUnlessEqual(datalen, 36)
1996             expected_eof_offset = expected_block_hash_offset + \
1997                 len(self.block_hash_tree_s)
1998
1999             # Check the version number to make sure that it is correct.
2000             expected_version_number = struct.pack(">B", 1)
2001             self.failUnlessEqual(read("si1", [0], [(0, 1)]),
2002                                  {0: [expected_version_number]})
2003             # Check the sequence number to make sure that it is correct
2004             expected_sequence_number = struct.pack(">Q", 0)
2005             self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2006                                  {0: [expected_sequence_number]})
2007             # Check that the encoding parameters (k, N, segement size, data
2008             # length) are what they should be. These are  3, 10, 6, 36
2009             expected_k = struct.pack(">B", 3)
2010             self.failUnlessEqual(read("si1", [0], [(41, 1)]),
2011                                  {0: [expected_k]})
2012             expected_n = struct.pack(">B", 10)
2013             self.failUnlessEqual(read("si1", [0], [(42, 1)]),
2014                                  {0: [expected_n]})
2015             expected_segment_size = struct.pack(">Q", 6)
2016             self.failUnlessEqual(read("si1", [0], [(43, 8)]),
2017                                  {0: [expected_segment_size]})
2018             expected_data_length = struct.pack(">Q", 36)
2019             self.failUnlessEqual(read("si1", [0], [(51, 8)]),
2020                                  {0: [expected_data_length]})
2021             expected_offset = struct.pack(">Q", expected_private_key_offset)
2022             self.failUnlessEqual(read("si1", [0], [(59, 8)]),
2023                                  {0: [expected_offset]})
2024             expected_offset = struct.pack(">Q", expected_share_hash_offset)
2025             self.failUnlessEqual(read("si1", [0], [(67, 8)]),
2026                                  {0: [expected_offset]})
2027             expected_offset = struct.pack(">Q", expected_signature_offset)
2028             self.failUnlessEqual(read("si1", [0], [(75, 8)]),
2029                                  {0: [expected_offset]})
2030             expected_offset = struct.pack(">Q", expected_verification_key_offset)
2031             self.failUnlessEqual(read("si1", [0], [(83, 8)]),
2032                                  {0: [expected_offset]})
2033             expected_offset = struct.pack(">Q", expected_verification_key_offset + len(self.verification_key))
2034             self.failUnlessEqual(read("si1", [0], [(91, 8)]),
2035                                  {0: [expected_offset]})
2036             expected_offset = struct.pack(">Q", expected_sharedata_offset)
2037             self.failUnlessEqual(read("si1", [0], [(99, 8)]),
2038                                  {0: [expected_offset]})
2039             expected_offset = struct.pack(">Q", expected_block_hash_offset)
2040             self.failUnlessEqual(read("si1", [0], [(107, 8)]),
2041                                  {0: [expected_offset]})
2042             expected_offset = struct.pack(">Q", expected_eof_offset)
2043             self.failUnlessEqual(read("si1", [0], [(115, 8)]),
2044                                  {0: [expected_offset]})
2045         d.addCallback(_check_publish)
2046         return d
2047
2048     def _make_new_mw(self, si, share, datalength=36):
2049         # This is a file of size 36 bytes. Since it has a segment
2050         # size of 6, we know that it has 6 byte segments, which will
2051         # be split into blocks of 2 bytes because our FEC k
2052         # parameter is 3.
2053         mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
2054                                 6, datalength)
2055         return mw
2056
2057
2058     def test_write_rejected_with_too_many_blocks(self):
2059         mw = self._make_new_mw("si0", 0)
2060
2061         # Try writing too many blocks. We should not be able to write
2062         # more than 6
2063         # blocks into each share.
2064         d = defer.succeed(None)
2065         for i in xrange(6):
2066             d.addCallback(lambda ignored, i=i:
2067                 mw.put_block(self.block, i, self.salt))
2068         d.addCallback(lambda ignored:
2069             self.shouldFail(LayoutInvalid, "too many blocks",
2070                             None,
2071                             mw.put_block, self.block, 7, self.salt))
2072         return d
2073
2074
2075     def test_write_rejected_with_invalid_salt(self):
2076         # Try writing an invalid salt. Salts are 16 bytes -- any more or
2077         # less should cause an error.
2078         mw = self._make_new_mw("si1", 0)
2079         bad_salt = "a" * 17 # 17 bytes
2080         d = defer.succeed(None)
2081         d.addCallback(lambda ignored:
2082             self.shouldFail(LayoutInvalid, "test_invalid_salt",
2083                             None, mw.put_block, self.block, 7, bad_salt))
2084         return d
2085
2086
2087     def test_write_rejected_with_invalid_root_hash(self):
2088         # Try writing an invalid root hash. This should be SHA256d, and
2089         # 32 bytes long as a result.
2090         mw = self._make_new_mw("si2", 0)
2091         # 17 bytes != 32 bytes
2092         invalid_root_hash = "a" * 17
2093         d = defer.succeed(None)
2094         # Before this test can work, we need to put some blocks + salts,
2095         # a block hash tree, and a share hash tree. Otherwise, we'll see
2096         # failures that match what we are looking for, but are caused by
2097         # the constraints imposed on operation ordering.
2098         for i in xrange(6):
2099             d.addCallback(lambda ignored, i=i:
2100                 mw.put_block(self.block, i, self.salt))
2101         d.addCallback(lambda ignored:
2102             mw.put_encprivkey(self.encprivkey))
2103         d.addCallback(lambda ignored:
2104             mw.put_blockhashes(self.block_hash_tree))
2105         d.addCallback(lambda ignored:
2106             mw.put_sharehashes(self.share_hash_chain))
2107         d.addCallback(lambda ignored:
2108             self.shouldFail(LayoutInvalid, "invalid root hash",
2109                             None, mw.put_root_hash, invalid_root_hash))
2110         return d
2111
2112
2113     def test_write_rejected_with_invalid_blocksize(self):
2114         # The blocksize implied by the writer that we get from
2115         # _make_new_mw is 2bytes -- any more or any less than this
2116         # should be cause for failure, unless it is the tail segment, in
2117         # which case it may not be failure.
2118         invalid_block = "a"
2119         mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
2120                                              # one byte blocks
2121         # 1 bytes != 2 bytes
2122         d = defer.succeed(None)
2123         d.addCallback(lambda ignored, invalid_block=invalid_block:
2124             self.shouldFail(LayoutInvalid, "test blocksize too small",
2125                             None, mw.put_block, invalid_block, 0,
2126                             self.salt))
2127         invalid_block = invalid_block * 3
2128         # 3 bytes != 2 bytes
2129         d.addCallback(lambda ignored:
2130             self.shouldFail(LayoutInvalid, "test blocksize too large",
2131                             None,
2132                             mw.put_block, invalid_block, 0, self.salt))
2133         for i in xrange(5):
2134             d.addCallback(lambda ignored, i=i:
2135                 mw.put_block(self.block, i, self.salt))
2136         # Try to put an invalid tail segment
2137         d.addCallback(lambda ignored:
2138             self.shouldFail(LayoutInvalid, "test invalid tail segment",
2139                             None,
2140                             mw.put_block, self.block, 5, self.salt))
2141         valid_block = "a"
2142         d.addCallback(lambda ignored:
2143             mw.put_block(valid_block, 5, self.salt))
2144         return d
2145
2146
2147     def test_write_enforces_order_constraints(self):
2148         # We require that the MDMFSlotWriteProxy be interacted with in a
2149         # specific way.
2150         # That way is:
2151         # 0: __init__
2152         # 1: write blocks and salts
2153         # 2: Write the encrypted private key
2154         # 3: Write the block hashes
2155         # 4: Write the share hashes
2156         # 5: Write the root hash and salt hash
2157         # 6: Write the signature and verification key
2158         # 7: Write the file.
2159         # 
2160         # Some of these can be performed out-of-order, and some can't.
2161         # The dependencies that I want to test here are:
2162         #  - Private key before block hashes
2163         #  - share hashes and block hashes before root hash
2164         #  - root hash before signature
2165         #  - signature before verification key
2166         mw0 = self._make_new_mw("si0", 0)
2167         # Write some shares
2168         d = defer.succeed(None)
2169         for i in xrange(6):
2170             d.addCallback(lambda ignored, i=i:
2171                 mw0.put_block(self.block, i, self.salt))
2172
2173         # Try to write the share hash chain without writing the
2174         # encrypted private key
2175         d.addCallback(lambda ignored:
2176             self.shouldFail(LayoutInvalid, "share hash chain before "
2177                                            "private key",
2178                             None,
2179                             mw0.put_sharehashes, self.share_hash_chain))
2180         # Write the private key.
2181         d.addCallback(lambda ignored:
2182             mw0.put_encprivkey(self.encprivkey))
2183
2184         # Now write the block hashes and try again
2185         d.addCallback(lambda ignored:
2186             mw0.put_blockhashes(self.block_hash_tree))
2187
2188         # We haven't yet put the root hash on the share, so we shouldn't
2189         # be able to sign it.
2190         d.addCallback(lambda ignored:
2191             self.shouldFail(LayoutInvalid, "signature before root hash",
2192                             None, mw0.put_signature, self.signature))
2193
2194         d.addCallback(lambda ignored:
2195             self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
2196
2197         # ..and, since that fails, we also shouldn't be able to put the
2198         # verification key.
2199         d.addCallback(lambda ignored:
2200             self.shouldFail(LayoutInvalid, "key before signature",
2201                             None, mw0.put_verification_key,
2202                             self.verification_key))
2203
2204         # Now write the share hashes.
2205         d.addCallback(lambda ignored:
2206             mw0.put_sharehashes(self.share_hash_chain))
2207         # We should be able to write the root hash now too
2208         d.addCallback(lambda ignored:
2209             mw0.put_root_hash(self.root_hash))
2210
2211         # We should still be unable to put the verification key
2212         d.addCallback(lambda ignored:
2213             self.shouldFail(LayoutInvalid, "key before signature",
2214                             None, mw0.put_verification_key,
2215                             self.verification_key))
2216
2217         d.addCallback(lambda ignored:
2218             mw0.put_signature(self.signature))
2219
2220         # We shouldn't be able to write the offsets to the remote server
2221         # until the offset table is finished; IOW, until we have written
2222         # the verification key.
2223         d.addCallback(lambda ignored:
2224             self.shouldFail(LayoutInvalid, "offsets before verification key",
2225                             None,
2226                             mw0.finish_publishing))
2227
2228         d.addCallback(lambda ignored:
2229             mw0.put_verification_key(self.verification_key))
2230         return d
2231
2232
2233     def test_end_to_end(self):
2234         mw = self._make_new_mw("si1", 0)
2235         # Write a share using the mutable writer, and make sure that the
2236         # reader knows how to read everything back to us.
2237         d = defer.succeed(None)
2238         for i in xrange(6):
2239             d.addCallback(lambda ignored, i=i:
2240                 mw.put_block(self.block, i, self.salt))
2241         d.addCallback(lambda ignored:
2242             mw.put_encprivkey(self.encprivkey))
2243         d.addCallback(lambda ignored:
2244             mw.put_blockhashes(self.block_hash_tree))
2245         d.addCallback(lambda ignored:
2246             mw.put_sharehashes(self.share_hash_chain))
2247         d.addCallback(lambda ignored:
2248             mw.put_root_hash(self.root_hash))
2249         d.addCallback(lambda ignored:
2250             mw.put_signature(self.signature))
2251         d.addCallback(lambda ignored:
2252             mw.put_verification_key(self.verification_key))
2253         d.addCallback(lambda ignored:
2254             mw.finish_publishing())
2255
2256         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2257         def _check_block_and_salt((block, salt)):
2258             self.failUnlessEqual(block, self.block)
2259             self.failUnlessEqual(salt, self.salt)
2260
2261         for i in xrange(6):
2262             d.addCallback(lambda ignored, i=i:
2263                 mr.get_block_and_salt(i))
2264             d.addCallback(_check_block_and_salt)
2265
2266         d.addCallback(lambda ignored:
2267             mr.get_encprivkey())
2268         d.addCallback(lambda encprivkey:
2269             self.failUnlessEqual(self.encprivkey, encprivkey))
2270
2271         d.addCallback(lambda ignored:
2272             mr.get_blockhashes())
2273         d.addCallback(lambda blockhashes:
2274             self.failUnlessEqual(self.block_hash_tree, blockhashes))
2275
2276         d.addCallback(lambda ignored:
2277             mr.get_sharehashes())
2278         d.addCallback(lambda sharehashes:
2279             self.failUnlessEqual(self.share_hash_chain, sharehashes))
2280
2281         d.addCallback(lambda ignored:
2282             mr.get_signature())
2283         d.addCallback(lambda signature:
2284             self.failUnlessEqual(signature, self.signature))
2285
2286         d.addCallback(lambda ignored:
2287             mr.get_verification_key())
2288         d.addCallback(lambda verification_key:
2289             self.failUnlessEqual(verification_key, self.verification_key))
2290
2291         d.addCallback(lambda ignored:
2292             mr.get_seqnum())
2293         d.addCallback(lambda seqnum:
2294             self.failUnlessEqual(seqnum, 0))
2295
2296         d.addCallback(lambda ignored:
2297             mr.get_root_hash())
2298         d.addCallback(lambda root_hash:
2299             self.failUnlessEqual(self.root_hash, root_hash))
2300
2301         d.addCallback(lambda ignored:
2302             mr.get_encoding_parameters())
2303         def _check_encoding_parameters((k, n, segsize, datalen)):
2304             self.failUnlessEqual(k, 3)
2305             self.failUnlessEqual(n, 10)
2306             self.failUnlessEqual(segsize, 6)
2307             self.failUnlessEqual(datalen, 36)
2308         d.addCallback(_check_encoding_parameters)
2309
2310         d.addCallback(lambda ignored:
2311             mr.get_checkstring())
2312         d.addCallback(lambda checkstring:
2313             self.failUnlessEqual(checkstring, mw.get_checkstring()))
2314         return d
2315
2316
2317     def test_is_sdmf(self):
2318         # The MDMFSlotReadProxy should also know how to read SDMF files,
2319         # since it will encounter them on the grid. Callers use the
2320         # is_sdmf method to test this.
2321         self.write_sdmf_share_to_server("si1")
2322         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2323         d = mr.is_sdmf()
2324         d.addCallback(lambda issdmf:
2325             self.failUnless(issdmf))
2326         return d
2327
2328
2329     def test_reads_sdmf(self):
2330         # The slot read proxy should, naturally, know how to tell us
2331         # about data in the SDMF format
2332         self.write_sdmf_share_to_server("si1")
2333         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2334         d = defer.succeed(None)
2335         d.addCallback(lambda ignored:
2336             mr.is_sdmf())
2337         d.addCallback(lambda issdmf:
2338             self.failUnless(issdmf))
2339
2340         # What do we need to read?
2341         #  - The sharedata
2342         #  - The salt
2343         d.addCallback(lambda ignored:
2344             mr.get_block_and_salt(0))
2345         def _check_block_and_salt(results):
2346             block, salt = results
2347             # Our original file is 36 bytes long. Then each share is 12
2348             # bytes in size. The share is composed entirely of the
2349             # letter a. self.block contains 2 as, so 6 * self.block is
2350             # what we are looking for.
2351             self.failUnlessEqual(block, self.block * 6)
2352             self.failUnlessEqual(salt, self.salt)
2353         d.addCallback(_check_block_and_salt)
2354
2355         #  - The blockhashes
2356         d.addCallback(lambda ignored:
2357             mr.get_blockhashes())
2358         d.addCallback(lambda blockhashes:
2359             self.failUnlessEqual(self.block_hash_tree,
2360                                  blockhashes,
2361                                  blockhashes))
2362         #  - The sharehashes
2363         d.addCallback(lambda ignored:
2364             mr.get_sharehashes())
2365         d.addCallback(lambda sharehashes:
2366             self.failUnlessEqual(self.share_hash_chain,
2367                                  sharehashes))
2368         #  - The keys
2369         d.addCallback(lambda ignored:
2370             mr.get_encprivkey())
2371         d.addCallback(lambda encprivkey:
2372             self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
2373         d.addCallback(lambda ignored:
2374             mr.get_verification_key())
2375         d.addCallback(lambda verification_key:
2376             self.failUnlessEqual(verification_key,
2377                                  self.verification_key,
2378                                  verification_key))
2379         #  - The signature
2380         d.addCallback(lambda ignored:
2381             mr.get_signature())
2382         d.addCallback(lambda signature:
2383             self.failUnlessEqual(signature, self.signature, signature))
2384
2385         #  - The sequence number
2386         d.addCallback(lambda ignored:
2387             mr.get_seqnum())
2388         d.addCallback(lambda seqnum:
2389             self.failUnlessEqual(seqnum, 0, seqnum))
2390
2391         #  - The root hash
2392         d.addCallback(lambda ignored:
2393             mr.get_root_hash())
2394         d.addCallback(lambda root_hash:
2395             self.failUnlessEqual(root_hash, self.root_hash, root_hash))
2396         return d
2397
2398
2399     def test_only_reads_one_segment_sdmf(self):
2400         # SDMF shares have only one segment, so it doesn't make sense to
2401         # read more segments than that. The reader should know this and
2402         # complain if we try to do that.
2403         self.write_sdmf_share_to_server("si1")
2404         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2405         d = defer.succeed(None)
2406         d.addCallback(lambda ignored:
2407             mr.is_sdmf())
2408         d.addCallback(lambda issdmf:
2409             self.failUnless(issdmf))
2410         d.addCallback(lambda ignored:
2411             self.shouldFail(LayoutInvalid, "test bad segment",
2412                             None,
2413                             mr.get_block_and_salt, 1))
2414         return d
2415
2416
2417     def test_read_with_prefetched_mdmf_data(self):
2418         # The MDMFSlotReadProxy will prefill certain fields if you pass
2419         # it data that you have already fetched. This is useful for
2420         # cases like the Servermap, which prefetches ~2kb of data while
2421         # finding out which shares are on the remote peer so that it
2422         # doesn't waste round trips.
2423         mdmf_data = self.build_test_mdmf_share()
2424         self.write_test_share_to_server("si1")
2425         def _make_mr(ignored, length):
2426             mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
2427             return mr
2428
2429         d = defer.succeed(None)
2430         # This should be enough to fill in both the encoding parameters
2431         # and the table of offsets, which will complete the version
2432         # information tuple.
2433         d.addCallback(_make_mr, 123)
2434         d.addCallback(lambda mr:
2435             mr.get_verinfo())
2436         def _check_verinfo(verinfo):
2437             self.failUnless(verinfo)
2438             self.failUnlessEqual(len(verinfo), 9)
2439             (seqnum,
2440              root_hash,
2441              salt_hash,
2442              segsize,
2443              datalen,
2444              k,
2445              n,
2446              prefix,
2447              offsets) = verinfo
2448             self.failUnlessEqual(seqnum, 0)
2449             self.failUnlessEqual(root_hash, self.root_hash)
2450             self.failUnlessEqual(segsize, 6)
2451             self.failUnlessEqual(datalen, 36)
2452             self.failUnlessEqual(k, 3)
2453             self.failUnlessEqual(n, 10)
2454             expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
2455                                           1,
2456                                           seqnum,
2457                                           root_hash,
2458                                           k,
2459                                           n,
2460                                           segsize,
2461                                           datalen)
2462             self.failUnlessEqual(expected_prefix, prefix)
2463             self.failUnlessEqual(self.rref.read_count, 0)
2464         d.addCallback(_check_verinfo)
2465         # This is not enough data to read a block and a share, so the
2466         # wrapper should attempt to read this from the remote server.
2467         d.addCallback(_make_mr, 123)
2468         d.addCallback(lambda mr:
2469             mr.get_block_and_salt(0))
2470         def _check_block_and_salt((block, salt)):
2471             self.failUnlessEqual(block, self.block)
2472             self.failUnlessEqual(salt, self.salt)
2473             self.failUnlessEqual(self.rref.read_count, 1)
2474         # This should be enough data to read one block.
2475         d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140)
2476         d.addCallback(lambda mr:
2477             mr.get_block_and_salt(0))
2478         d.addCallback(_check_block_and_salt)
2479         return d
2480
2481
2482     def test_read_with_prefetched_sdmf_data(self):
2483         sdmf_data = self.build_test_sdmf_share()
2484         self.write_sdmf_share_to_server("si1")
2485         def _make_mr(ignored, length):
2486             mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
2487             return mr
2488
2489         d = defer.succeed(None)
2490         # This should be enough to get us the encoding parameters,
2491         # offset table, and everything else we need to build a verinfo
2492         # string.
2493         d.addCallback(_make_mr, 123)
2494         d.addCallback(lambda mr:
2495             mr.get_verinfo())
2496         def _check_verinfo(verinfo):
2497             self.failUnless(verinfo)
2498             self.failUnlessEqual(len(verinfo), 9)
2499             (seqnum,
2500              root_hash,
2501              salt,
2502              segsize,
2503              datalen,
2504              k,
2505              n,
2506              prefix,
2507              offsets) = verinfo
2508             self.failUnlessEqual(seqnum, 0)
2509             self.failUnlessEqual(root_hash, self.root_hash)
2510             self.failUnlessEqual(salt, self.salt)
2511             self.failUnlessEqual(segsize, 36)
2512             self.failUnlessEqual(datalen, 36)
2513             self.failUnlessEqual(k, 3)
2514             self.failUnlessEqual(n, 10)
2515             expected_prefix = struct.pack(SIGNED_PREFIX,
2516                                           0,
2517                                           seqnum,
2518                                           root_hash,
2519                                           salt,
2520                                           k,
2521                                           n,
2522                                           segsize,
2523                                           datalen)
2524             self.failUnlessEqual(expected_prefix, prefix)
2525             self.failUnlessEqual(self.rref.read_count, 0)
2526         d.addCallback(_check_verinfo)
2527         # This shouldn't be enough to read any share data.
2528         d.addCallback(_make_mr, 123)
2529         d.addCallback(lambda mr:
2530             mr.get_block_and_salt(0))
2531         def _check_block_and_salt((block, salt)):
2532             self.failUnlessEqual(block, self.block * 6)
2533             self.failUnlessEqual(salt, self.salt)
2534             # TODO: Fix the read routine so that it reads only the data
2535             #       that it has cached if it can't read all of it.
2536             self.failUnlessEqual(self.rref.read_count, 2)
2537
2538         # This should be enough to read share data.
2539         d.addCallback(_make_mr, self.offsets['share_data'])
2540         d.addCallback(lambda mr:
2541             mr.get_block_and_salt(0))
2542         d.addCallback(_check_block_and_salt)
2543         return d
2544
2545
2546     def test_read_with_empty_mdmf_file(self):
2547         # Some tests upload a file with no contents to test things
2548         # unrelated to the actual handling of the content of the file.
2549         # The reader should behave intelligently in these cases.
2550         self.write_test_share_to_server("si1", empty=True)
2551         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2552         # We should be able to get the encoding parameters, and they
2553         # should be correct.
2554         d = defer.succeed(None)
2555         d.addCallback(lambda ignored:
2556             mr.get_encoding_parameters())
2557         def _check_encoding_parameters(params):
2558             self.failUnlessEqual(len(params), 4)
2559             k, n, segsize, datalen = params
2560             self.failUnlessEqual(k, 3)
2561             self.failUnlessEqual(n, 10)
2562             self.failUnlessEqual(segsize, 0)
2563             self.failUnlessEqual(datalen, 0)
2564         d.addCallback(_check_encoding_parameters)
2565
2566         # We should not be able to fetch a block, since there are no
2567         # blocks to fetch
2568         d.addCallback(lambda ignored:
2569             self.shouldFail(LayoutInvalid, "get block on empty file",
2570                             None,
2571                             mr.get_block_and_salt, 0))
2572         return d
2573
2574
2575     def test_read_with_empty_sdmf_file(self):
2576         self.write_sdmf_share_to_server("si1", empty=True)
2577         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2578         # We should be able to get the encoding parameters, and they
2579         # should be correct
2580         d = defer.succeed(None)
2581         d.addCallback(lambda ignored:
2582             mr.get_encoding_parameters())
2583         def _check_encoding_parameters(params):
2584             self.failUnlessEqual(len(params), 4)
2585             k, n, segsize, datalen = params
2586             self.failUnlessEqual(k, 3)
2587             self.failUnlessEqual(n, 10)
2588             self.failUnlessEqual(segsize, 0)
2589             self.failUnlessEqual(datalen, 0)
2590         d.addCallback(_check_encoding_parameters)
2591
2592         # It does not make sense to get a block in this format, so we
2593         # should not be able to.
2594         d.addCallback(lambda ignored:
2595             self.shouldFail(LayoutInvalid, "get block on an empty file",
2596                             None,
2597                             mr.get_block_and_salt, 0))
2598         return d
2599
2600
2601     def test_verinfo_with_sdmf_file(self):
2602         self.write_sdmf_share_to_server("si1")
2603         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2604         # We should be able to get the version information.
2605         d = defer.succeed(None)
2606         d.addCallback(lambda ignored:
2607             mr.get_verinfo())
2608         def _check_verinfo(verinfo):
2609             self.failUnless(verinfo)
2610             self.failUnlessEqual(len(verinfo), 9)
2611             (seqnum,
2612              root_hash,
2613              salt,
2614              segsize,
2615              datalen,
2616              k,
2617              n,
2618              prefix,
2619              offsets) = verinfo
2620             self.failUnlessEqual(seqnum, 0)
2621             self.failUnlessEqual(root_hash, self.root_hash)
2622             self.failUnlessEqual(salt, self.salt)
2623             self.failUnlessEqual(segsize, 36)
2624             self.failUnlessEqual(datalen, 36)
2625             self.failUnlessEqual(k, 3)
2626             self.failUnlessEqual(n, 10)
2627             expected_prefix = struct.pack(">BQ32s16s BBQQ",
2628                                           0,
2629                                           seqnum,
2630                                           root_hash,
2631                                           salt,
2632                                           k,
2633                                           n,
2634                                           segsize,
2635                                           datalen)
2636             self.failUnlessEqual(prefix, expected_prefix)
2637             self.failUnlessEqual(offsets, self.offsets)
2638         d.addCallback(_check_verinfo)
2639         return d
2640
2641
2642     def test_verinfo_with_mdmf_file(self):
2643         self.write_test_share_to_server("si1")
2644         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2645         d = defer.succeed(None)
2646         d.addCallback(lambda ignored:
2647             mr.get_verinfo())
2648         def _check_verinfo(verinfo):
2649             self.failUnless(verinfo)
2650             self.failUnlessEqual(len(verinfo), 9)
2651             (seqnum,
2652              root_hash,
2653              IV,
2654              segsize,
2655              datalen,
2656              k,
2657              n,
2658              prefix,
2659              offsets) = verinfo
2660             self.failUnlessEqual(seqnum, 0)
2661             self.failUnlessEqual(root_hash, self.root_hash)
2662             self.failIf(IV)
2663             self.failUnlessEqual(segsize, 6)
2664             self.failUnlessEqual(datalen, 36)
2665             self.failUnlessEqual(k, 3)
2666             self.failUnlessEqual(n, 10)
2667             expected_prefix = struct.pack(">BQ32s BBQQ",
2668                                           1,
2669                                           seqnum,
2670                                           root_hash,
2671                                           k,
2672                                           n,
2673                                           segsize,
2674                                           datalen)
2675             self.failUnlessEqual(prefix, expected_prefix)
2676             self.failUnlessEqual(offsets, self.offsets)
2677         d.addCallback(_check_verinfo)
2678         return d
2679
2680
2681     def test_sdmf_writer(self):
2682         # Go through the motions of writing an SDMF share to the storage
2683         # server. Then read the storage server to see that the share got
2684         # written in the way that we think it should have. 
2685
2686         # We do this first so that the necessary instance variables get
2687         # set the way we want them for the tests below.
2688         data = self.build_test_sdmf_share()
2689         sdmfr = SDMFSlotWriteProxy(0,
2690                                    self.rref,
2691                                    "si1",
2692                                    self.secrets,
2693                                    0, 3, 10, 36, 36)
2694         # Put the block and salt.
2695         sdmfr.put_block(self.blockdata, 0, self.salt)
2696
2697         # Put the encprivkey
2698         sdmfr.put_encprivkey(self.encprivkey)
2699
2700         # Put the block and share hash chains
2701         sdmfr.put_blockhashes(self.block_hash_tree)
2702         sdmfr.put_sharehashes(self.share_hash_chain)
2703         sdmfr.put_root_hash(self.root_hash)
2704
2705         # Put the signature
2706         sdmfr.put_signature(self.signature)
2707
2708         # Put the verification key
2709         sdmfr.put_verification_key(self.verification_key)
2710
2711         # Now check to make sure that nothing has been written yet.
2712         self.failUnlessEqual(self.rref.write_count, 0)
2713
2714         # Now finish publishing
2715         d = sdmfr.finish_publishing()
2716         def _then(ignored):
2717             self.failUnlessEqual(self.rref.write_count, 1)
2718             read = self.ss.remote_slot_readv
2719             self.failUnlessEqual(read("si1", [0], [(0, len(data))]),
2720                                  {0: [data]})
2721         d.addCallback(_then)
2722         return d
2723
2724
2725     def test_sdmf_writer_preexisting_share(self):
2726         data = self.build_test_sdmf_share()
2727         self.write_sdmf_share_to_server("si1")
2728
2729         # Now there is a share on the storage server. To successfully
2730         # write, we need to set the checkstring correctly. When we
2731         # don't, no write should occur.
2732         sdmfw = SDMFSlotWriteProxy(0,
2733                                    self.rref,
2734                                    "si1",
2735                                    self.secrets,
2736                                    1, 3, 10, 36, 36)
2737         sdmfw.put_block(self.blockdata, 0, self.salt)
2738
2739         # Put the encprivkey
2740         sdmfw.put_encprivkey(self.encprivkey)
2741
2742         # Put the block and share hash chains
2743         sdmfw.put_blockhashes(self.block_hash_tree)
2744         sdmfw.put_sharehashes(self.share_hash_chain)
2745
2746         # Put the root hash
2747         sdmfw.put_root_hash(self.root_hash)
2748
2749         # Put the signature
2750         sdmfw.put_signature(self.signature)
2751
2752         # Put the verification key
2753         sdmfw.put_verification_key(self.verification_key)
2754
2755         # We shouldn't have a checkstring yet
2756         self.failUnlessEqual(sdmfw.get_checkstring(), "")
2757
2758         d = sdmfw.finish_publishing()
2759         def _then(results):
2760             self.failIf(results[0])
2761             # this is the correct checkstring
2762             self._expected_checkstring = results[1][0][0]
2763             return self._expected_checkstring
2764
2765         d.addCallback(_then)
2766         d.addCallback(sdmfw.set_checkstring)
2767         d.addCallback(lambda ignored:
2768             sdmfw.get_checkstring())
2769         d.addCallback(lambda checkstring:
2770             self.failUnlessEqual(checkstring, self._expected_checkstring))
2771         d.addCallback(lambda ignored:
2772             sdmfw.finish_publishing())
2773         def _then_again(results):
2774             self.failUnless(results[0])
2775             read = self.ss.remote_slot_readv
2776             self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2777                                  {0: [struct.pack(">Q", 1)]})
2778             self.failUnlessEqual(read("si1", [0], [(9, len(data) - 9)]),
2779                                  {0: [data[9:]]})
2780         d.addCallback(_then_again)
2781         return d
2782
2783
2784 class Stats(unittest.TestCase):
2785
2786     def setUp(self):
2787         self.sparent = LoggingServiceParent()
2788         self._lease_secret = itertools.count()
2789     def tearDown(self):
2790         return self.sparent.stopService()
2791
2792     def workdir(self, name):
2793         basedir = os.path.join("storage", "Server", name)
2794         return basedir
2795
2796     def create(self, name):
2797         workdir = self.workdir(name)
2798         ss = StorageServer(workdir, "\x00" * 20)
2799         ss.setServiceParent(self.sparent)
2800         return ss
2801
2802     def test_latencies(self):
2803         ss = self.create("test_latencies")
2804         for i in range(10000):
2805             ss.add_latency("allocate", 1.0 * i)
2806         for i in range(1000):
2807             ss.add_latency("renew", 1.0 * i)
2808         for i in range(20):
2809             ss.add_latency("write", 1.0 * i)
2810         for i in range(10):
2811             ss.add_latency("cancel", 2.0 * i)
2812         ss.add_latency("get", 5.0)
2813
2814         output = ss.get_latencies()
2815
2816         self.failUnlessEqual(sorted(output.keys()),
2817                              sorted(["allocate", "renew", "cancel", "write", "get"]))
2818         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
2819         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
2820         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
2821         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
2822         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
2823         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
2824         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
2825         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
2826         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
2827
2828         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
2829         self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
2830         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1, output)
2831         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
2832         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
2833         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
2834         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
2835         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
2836         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
2837
2838         self.failUnlessEqual(len(ss.latencies["write"]), 20)
2839         self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
2840         self.failUnless(output["write"]["01_0_percentile"] is None, output)
2841         self.failUnless(abs(output["write"]["10_0_percentile"] -  2) < 1, output)
2842         self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
2843         self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
2844         self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
2845         self.failUnless(output["write"]["99_0_percentile"] is None, output)
2846         self.failUnless(output["write"]["99_9_percentile"] is None, output)
2847
2848         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
2849         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
2850         self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
2851         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1, output)
2852         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
2853         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
2854         self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
2855         self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
2856         self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
2857
2858         self.failUnlessEqual(len(ss.latencies["get"]), 1)
2859         self.failUnless(output["get"]["mean"] is None, output)
2860         self.failUnless(output["get"]["01_0_percentile"] is None, output)
2861         self.failUnless(output["get"]["10_0_percentile"] is None, output)
2862         self.failUnless(output["get"]["50_0_percentile"] is None, output)
2863         self.failUnless(output["get"]["90_0_percentile"] is None, output)
2864         self.failUnless(output["get"]["95_0_percentile"] is None, output)
2865         self.failUnless(output["get"]["99_0_percentile"] is None, output)
2866         self.failUnless(output["get"]["99_9_percentile"] is None, output)
2867
2868 def remove_tags(s):
2869     s = re.sub(r'<[^>]*>', ' ', s)
2870     s = re.sub(r'\s+', ' ', s)
2871     return s
2872
2873 class MyBucketCountingCrawler(BucketCountingCrawler):
2874     def finished_prefix(self, cycle, prefix):
2875         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
2876         if self.hook_ds:
2877             d = self.hook_ds.pop(0)
2878             d.callback(None)
2879
2880 class MyStorageServer(StorageServer):
2881     def add_bucket_counter(self):
2882         statefile = os.path.join(self.storedir, "bucket_counter.state")
2883         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
2884         self.bucket_counter.setServiceParent(self)
2885
2886 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
2887
2888     def setUp(self):
2889         self.s = service.MultiService()
2890         self.s.startService()
2891     def tearDown(self):
2892         return self.s.stopService()
2893
2894     def test_bucket_counter(self):
2895         basedir = "storage/BucketCounter/bucket_counter"
2896         fileutil.make_dirs(basedir)
2897         ss = StorageServer(basedir, "\x00" * 20)
2898         # to make sure we capture the bucket-counting-crawler in the middle
2899         # of a cycle, we reach in and reduce its maximum slice time to 0. We
2900         # also make it start sooner than usual.
2901         ss.bucket_counter.slow_start = 0
2902         orig_cpu_slice = ss.bucket_counter.cpu_slice
2903         ss.bucket_counter.cpu_slice = 0
2904         ss.setServiceParent(self.s)
2905
2906         w = StorageStatus(ss)
2907
2908         # this sample is before the crawler has started doing anything
2909         html = w.renderSynchronously()
2910         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2911         s = remove_tags(html)
2912         self.failUnlessIn("Accepting new shares: Yes", s)
2913         self.failUnlessIn("Reserved space: - 0 B (0)", s)
2914         self.failUnlessIn("Total buckets: Not computed yet", s)
2915         self.failUnlessIn("Next crawl in", s)
2916
2917         # give the bucket-counting-crawler one tick to get started. The
2918         # cpu_slice=0 will force it to yield right after it processes the
2919         # first prefix
2920
2921         d = fireEventually()
2922         def _check(ignored):
2923             # are we really right after the first prefix?
2924             state = ss.bucket_counter.get_state()
2925             if state["last-complete-prefix"] is None:
2926                 d2 = fireEventually()
2927                 d2.addCallback(_check)
2928                 return d2
2929             self.failUnlessEqual(state["last-complete-prefix"],
2930                                  ss.bucket_counter.prefixes[0])
2931             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2932             html = w.renderSynchronously()
2933             s = remove_tags(html)
2934             self.failUnlessIn(" Current crawl ", s)
2935             self.failUnlessIn(" (next work in ", s)
2936         d.addCallback(_check)
2937
2938         # now give it enough time to complete a full cycle
2939         def _watch():
2940             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2941         d.addCallback(lambda ignored: self.poll(_watch))
2942         def _check2(ignored):
2943             ss.bucket_counter.cpu_slice = orig_cpu_slice
2944             html = w.renderSynchronously()
2945             s = remove_tags(html)
2946             self.failUnlessIn("Total buckets: 0 (the number of", s)
2947             self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
2948         d.addCallback(_check2)
2949         return d
2950
2951     def test_bucket_counter_cleanup(self):
2952         basedir = "storage/BucketCounter/bucket_counter_cleanup"
2953         fileutil.make_dirs(basedir)
2954         ss = StorageServer(basedir, "\x00" * 20)
2955         # to make sure we capture the bucket-counting-crawler in the middle
2956         # of a cycle, we reach in and reduce its maximum slice time to 0.
2957         ss.bucket_counter.slow_start = 0
2958         orig_cpu_slice = ss.bucket_counter.cpu_slice
2959         ss.bucket_counter.cpu_slice = 0
2960         ss.setServiceParent(self.s)
2961
2962         d = fireEventually()
2963
2964         def _after_first_prefix(ignored):
2965             state = ss.bucket_counter.state
2966             if state["last-complete-prefix"] is None:
2967                 d2 = fireEventually()
2968                 d2.addCallback(_after_first_prefix)
2969                 return d2
2970             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2971             # now sneak in and mess with its state, to make sure it cleans up
2972             # properly at the end of the cycle
2973             self.failUnlessEqual(state["last-complete-prefix"],
2974                                  ss.bucket_counter.prefixes[0])
2975             state["bucket-counts"][-12] = {}
2976             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
2977             ss.bucket_counter.save_state()
2978         d.addCallback(_after_first_prefix)
2979
2980         # now give it enough time to complete a cycle
2981         def _watch():
2982             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2983         d.addCallback(lambda ignored: self.poll(_watch))
2984         def _check2(ignored):
2985             ss.bucket_counter.cpu_slice = orig_cpu_slice
2986             s = ss.bucket_counter.get_state()
2987             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
2988             self.failIf("bogusprefix!" in s["storage-index-samples"],
2989                         s["storage-index-samples"].keys())
2990         d.addCallback(_check2)
2991         return d
2992
2993     def test_bucket_counter_eta(self):
2994         basedir = "storage/BucketCounter/bucket_counter_eta"
2995         fileutil.make_dirs(basedir)
2996         ss = MyStorageServer(basedir, "\x00" * 20)
2997         ss.bucket_counter.slow_start = 0
2998         # these will be fired inside finished_prefix()
2999         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
3000         w = StorageStatus(ss)
3001
3002         d = defer.Deferred()
3003
3004         def _check_1(ignored):
3005             # no ETA is available yet
3006             html = w.renderSynchronously()
3007             s = remove_tags(html)
3008             self.failUnlessIn("complete (next work", s)
3009
3010         def _check_2(ignored):
3011             # one prefix has finished, so an ETA based upon that elapsed time
3012             # should be available.
3013             html = w.renderSynchronously()
3014             s = remove_tags(html)
3015             self.failUnlessIn("complete (ETA ", s)
3016
3017         def _check_3(ignored):
3018             # two prefixes have finished
3019             html = w.renderSynchronously()
3020             s = remove_tags(html)
3021             self.failUnlessIn("complete (ETA ", s)
3022             d.callback("done")
3023
3024         hooks[0].addCallback(_check_1).addErrback(d.errback)
3025         hooks[1].addCallback(_check_2).addErrback(d.errback)
3026         hooks[2].addCallback(_check_3).addErrback(d.errback)
3027
3028         ss.setServiceParent(self.s)
3029         return d
3030
3031 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
3032     stop_after_first_bucket = False
3033     def process_bucket(self, *args, **kwargs):
3034         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
3035         if self.stop_after_first_bucket:
3036             self.stop_after_first_bucket = False
3037             self.cpu_slice = -1.0
3038     def yielding(self, sleep_time):
3039         if not self.stop_after_first_bucket:
3040             self.cpu_slice = 500
3041
3042 class BrokenStatResults:
3043     pass
3044 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
3045     def stat(self, fn):
3046         s = os.stat(fn)
3047         bsr = BrokenStatResults()
3048         for attrname in dir(s):
3049             if attrname.startswith("_"):
3050                 continue
3051             if attrname == "st_blocks":
3052                 continue
3053             setattr(bsr, attrname, getattr(s, attrname))
3054         return bsr
3055
3056 class InstrumentedStorageServer(StorageServer):
3057     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
3058 class No_ST_BLOCKS_StorageServer(StorageServer):
3059     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
3060
3061 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3062
3063     def setUp(self):
3064         self.s = service.MultiService()
3065         self.s.startService()
3066     def tearDown(self):
3067         return self.s.stopService()
3068
3069     def make_shares(self, ss):
3070         def make(si):
3071             return (si, hashutil.tagged_hash("renew", si),
3072                     hashutil.tagged_hash("cancel", si))
3073         def make_mutable(si):
3074             return (si, hashutil.tagged_hash("renew", si),
3075                     hashutil.tagged_hash("cancel", si),
3076                     hashutil.tagged_hash("write-enabler", si))
3077         def make_extra_lease(si, num):
3078             return (hashutil.tagged_hash("renew-%d" % num, si),
3079                     hashutil.tagged_hash("cancel-%d" % num, si))
3080
3081         immutable_si_0, rs0, cs0 = make("\x00" * 16)
3082         immutable_si_1, rs1, cs1 = make("\x01" * 16)
3083         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
3084         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
3085         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
3086         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
3087         sharenums = [0]
3088         canary = FakeCanary()
3089         # note: 'tahoe debug dump-share' will not handle this file, since the
3090         # inner contents are not a valid CHK share
3091         data = "\xff" * 1000
3092
3093         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
3094                                          1000, canary)
3095         w[0].remote_write(0, data)
3096         w[0].remote_close()
3097
3098         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
3099                                          1000, canary)
3100         w[0].remote_write(0, data)
3101         w[0].remote_close()
3102         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
3103
3104         writev = ss.remote_slot_testv_and_readv_and_writev
3105         writev(mutable_si_2, (we2, rs2, cs2),
3106                {0: ([], [(0,data)], len(data))}, [])
3107         writev(mutable_si_3, (we3, rs3, cs3),
3108                {0: ([], [(0,data)], len(data))}, [])
3109         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
3110
3111         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
3112         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
3113         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
3114
3115     def test_basic(self):
3116         basedir = "storage/LeaseCrawler/basic"
3117         fileutil.make_dirs(basedir)
3118         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3119         # make it start sooner than usual.
3120         lc = ss.lease_checker
3121         lc.slow_start = 0
3122         lc.cpu_slice = 500
3123         lc.stop_after_first_bucket = True
3124         webstatus = StorageStatus(ss)
3125
3126         # create a few shares, with some leases on them
3127         self.make_shares(ss)
3128         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3129
3130         # add a non-sharefile to exercise another code path
3131         fn = os.path.join(ss.sharedir,
3132                           storage_index_to_dir(immutable_si_0),
3133                           "not-a-share")
3134         f = open(fn, "wb")
3135         f.write("I am not a share.\n")
3136         f.close()
3137
3138         # this is before the crawl has started, so we're not in a cycle yet
3139         initial_state = lc.get_state()
3140         self.failIf(lc.get_progress()["cycle-in-progress"])
3141         self.failIfIn("cycle-to-date", initial_state)
3142         self.failIfIn("estimated-remaining-cycle", initial_state)
3143         self.failIfIn("estimated-current-cycle", initial_state)
3144         self.failUnlessIn("history", initial_state)
3145         self.failUnlessEqual(initial_state["history"], {})
3146
3147         ss.setServiceParent(self.s)
3148
3149         DAY = 24*60*60
3150
3151         d = fireEventually()
3152
3153         # now examine the state right after the first bucket has been
3154         # processed.
3155         def _after_first_bucket(ignored):
3156             initial_state = lc.get_state()
3157             if "cycle-to-date" not in initial_state:
3158                 d2 = fireEventually()
3159                 d2.addCallback(_after_first_bucket)
3160                 return d2
3161             self.failUnlessIn("cycle-to-date", initial_state)
3162             self.failUnlessIn("estimated-remaining-cycle", initial_state)
3163             self.failUnlessIn("estimated-current-cycle", initial_state)
3164             self.failUnlessIn("history", initial_state)
3165             self.failUnlessEqual(initial_state["history"], {})
3166
3167             so_far = initial_state["cycle-to-date"]
3168             self.failUnlessEqual(so_far["expiration-enabled"], False)
3169             self.failUnlessIn("configured-expiration-mode", so_far)
3170             self.failUnlessIn("lease-age-histogram", so_far)
3171             lah = so_far["lease-age-histogram"]
3172             self.failUnlessEqual(type(lah), list)
3173             self.failUnlessEqual(len(lah), 1)
3174             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
3175             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
3176             self.failUnlessEqual(so_far["corrupt-shares"], [])
3177             sr1 = so_far["space-recovered"]
3178             self.failUnlessEqual(sr1["examined-buckets"], 1)
3179             self.failUnlessEqual(sr1["examined-shares"], 1)
3180             self.failUnlessEqual(sr1["actual-shares"], 0)
3181             self.failUnlessEqual(sr1["configured-diskbytes"], 0)
3182             self.failUnlessEqual(sr1["original-sharebytes"], 0)
3183             left = initial_state["estimated-remaining-cycle"]
3184             sr2 = left["space-recovered"]
3185             self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
3186             self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
3187             self.failIfEqual(sr2["actual-shares"], None)
3188             self.failIfEqual(sr2["configured-diskbytes"], None)
3189             self.failIfEqual(sr2["original-sharebytes"], None)
3190         d.addCallback(_after_first_bucket)
3191         d.addCallback(lambda ign: self.render1(webstatus))
3192         def _check_html_in_cycle(html):
3193             s = remove_tags(html)
3194             self.failUnlessIn("So far, this cycle has examined "
3195                               "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
3196             self.failUnlessIn("and has recovered: "
3197                               "0 shares, 0 buckets (0 mutable / 0 immutable), "
3198                               "0 B (0 B / 0 B)", s)
3199             self.failUnlessIn("If expiration were enabled, "
3200                               "we would have recovered: "
3201                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3202                               " 0 B (0 B / 0 B) by now", s)
3203             self.failUnlessIn("and the remainder of this cycle "
3204                               "would probably recover: "
3205                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3206                               " 0 B (0 B / 0 B)", s)
3207             self.failUnlessIn("and the whole cycle would probably recover: "
3208                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3209                               " 0 B (0 B / 0 B)", s)
3210             self.failUnlessIn("if we were strictly using each lease's default "
3211                               "31-day lease lifetime", s)
3212             self.failUnlessIn("this cycle would be expected to recover: ", s)
3213         d.addCallback(_check_html_in_cycle)
3214
3215         # wait for the crawler to finish the first cycle. Nothing should have
3216         # been removed.
3217         def _wait():
3218             return bool(lc.get_state()["last-cycle-finished"] is not None)
3219         d.addCallback(lambda ign: self.poll(_wait))
3220
3221         def _after_first_cycle(ignored):
3222             s = lc.get_state()
3223             self.failIf("cycle-to-date" in s)
3224             self.failIf("estimated-remaining-cycle" in s)
3225             self.failIf("estimated-current-cycle" in s)
3226             last = s["history"][0]
3227             self.failUnlessIn("cycle-start-finish-times", last)
3228             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
3229             self.failUnlessEqual(last["expiration-enabled"], False)
3230             self.failUnlessIn("configured-expiration-mode", last)
3231
3232             self.failUnlessIn("lease-age-histogram", last)
3233             lah = last["lease-age-histogram"]
3234             self.failUnlessEqual(type(lah), list)
3235             self.failUnlessEqual(len(lah), 1)
3236             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
3237
3238             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3239             self.failUnlessEqual(last["corrupt-shares"], [])
3240
3241             rec = last["space-recovered"]
3242             self.failUnlessEqual(rec["examined-buckets"], 4)
3243             self.failUnlessEqual(rec["examined-shares"], 4)
3244             self.failUnlessEqual(rec["actual-buckets"], 0)
3245             self.failUnlessEqual(rec["original-buckets"], 0)
3246             self.failUnlessEqual(rec["configured-buckets"], 0)
3247             self.failUnlessEqual(rec["actual-shares"], 0)
3248             self.failUnlessEqual(rec["original-shares"], 0)
3249             self.failUnlessEqual(rec["configured-shares"], 0)
3250             self.failUnlessEqual(rec["actual-diskbytes"], 0)
3251             self.failUnlessEqual(rec["original-diskbytes"], 0)
3252             self.failUnlessEqual(rec["configured-diskbytes"], 0)
3253             self.failUnlessEqual(rec["actual-sharebytes"], 0)
3254             self.failUnlessEqual(rec["original-sharebytes"], 0)
3255             self.failUnlessEqual(rec["configured-sharebytes"], 0)
3256
3257             def _get_sharefile(si):
3258                 return list(ss._iter_share_files(si))[0]
3259             def count_leases(si):
3260                 return len(list(_get_sharefile(si).get_leases()))
3261             self.failUnlessEqual(count_leases(immutable_si_0), 1)
3262             self.failUnlessEqual(count_leases(immutable_si_1), 2)
3263             self.failUnlessEqual(count_leases(mutable_si_2), 1)
3264             self.failUnlessEqual(count_leases(mutable_si_3), 2)
3265         d.addCallback(_after_first_cycle)
3266         d.addCallback(lambda ign: self.render1(webstatus))
3267         def _check_html(html):
3268             s = remove_tags(html)
3269             self.failUnlessIn("recovered: 0 shares, 0 buckets "
3270                               "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
3271             self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
3272                               "(2 mutable / 2 immutable),", s)
3273             self.failUnlessIn("but expiration was not enabled", s)
3274         d.addCallback(_check_html)
3275         d.addCallback(lambda ign: self.render_json(webstatus))
3276         def _check_json(json):
3277             data = simplejson.loads(json)
3278             self.failUnlessIn("lease-checker", data)
3279             self.failUnlessIn("lease-checker-progress", data)
3280         d.addCallback(_check_json)
3281         return d
3282
3283     def backdate_lease(self, sf, renew_secret, new_expire_time):
3284         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
3285         # "renew" a lease with a new_expire_time that is older than what the
3286         # current lease has), so we have to reach inside it.
3287         for i,lease in enumerate(sf.get_leases()):
3288             if lease.renew_secret == renew_secret:
3289                 lease.expiration_time = new_expire_time
3290                 f = open(sf.home, 'rb+')
3291                 sf._write_lease_record(f, i, lease)
3292                 f.close()
3293                 return
3294         raise IndexError("unable to renew non-existent lease")
3295
3296     def test_expire_age(self):
3297         basedir = "storage/LeaseCrawler/expire_age"
3298         fileutil.make_dirs(basedir)
3299         # setting expiration_time to 2000 means that any lease which is more
3300         # than 2000s old will be expired.
3301         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3302                                        expiration_enabled=True,
3303                                        expiration_mode="age",
3304                                        expiration_override_lease_duration=2000)
3305         # make it start sooner than usual.
3306         lc = ss.lease_checker
3307         lc.slow_start = 0
3308         lc.stop_after_first_bucket = True
3309         webstatus = StorageStatus(ss)
3310
3311         # create a few shares, with some leases on them
3312         self.make_shares(ss)
3313         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3314
3315         def count_shares(si):
3316             return len(list(ss._iter_share_files(si)))
3317         def _get_sharefile(si):
3318             return list(ss._iter_share_files(si))[0]
3319         def count_leases(si):
3320             return len(list(_get_sharefile(si).get_leases()))
3321
3322         self.failUnlessEqual(count_shares(immutable_si_0), 1)
3323         self.failUnlessEqual(count_leases(immutable_si_0), 1)
3324         self.failUnlessEqual(count_shares(immutable_si_1), 1)
3325         self.failUnlessEqual(count_leases(immutable_si_1), 2)
3326         self.failUnlessEqual(count_shares(mutable_si_2), 1)
3327         self.failUnlessEqual(count_leases(mutable_si_2), 1)
3328         self.failUnlessEqual(count_shares(mutable_si_3), 1)
3329         self.failUnlessEqual(count_leases(mutable_si_3), 2)
3330
3331         # artificially crank back the expiration time on the first lease of
3332         # each share, to make it look like it expired already (age=1000s).
3333         # Some shares have an extra lease which is set to expire at the
3334         # default time in 31 days from now (age=31days). We then run the
3335         # crawler, which will expire the first lease, making some shares get
3336         # deleted and others stay alive (with one remaining lease)
3337         now = time.time()
3338
3339         sf0 = _get_sharefile(immutable_si_0)
3340         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
3341         sf0_size = os.stat(sf0.home).st_size
3342
3343         # immutable_si_1 gets an extra lease
3344         sf1 = _get_sharefile(immutable_si_1)
3345         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
3346
3347         sf2 = _get_sharefile(mutable_si_2)
3348         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
3349         sf2_size = os.stat(sf2.home).st_size
3350
3351         # mutable_si_3 gets an extra lease
3352         sf3 = _get_sharefile(mutable_si_3)
3353         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
3354
3355         ss.setServiceParent(self.s)
3356
3357         d = fireEventually()
3358         # examine the state right after the first bucket has been processed
3359         def _after_first_bucket(ignored):
3360             p = lc.get_progress()
3361             if not p["cycle-in-progress"]:
3362                 d2 = fireEventually()
3363                 d2.addCallback(_after_first_bucket)
3364                 return d2
3365         d.addCallback(_after_first_bucket)
3366         d.addCallback(lambda ign: self.render1(webstatus))
3367         def _check_html_in_cycle(html):
3368             s = remove_tags(html)
3369             # the first bucket encountered gets deleted, and its prefix
3370             # happens to be about 1/5th of the way through the ring, so the
3371             # predictor thinks we'll have 5 shares and that we'll delete them
3372             # all. This part of the test depends upon the SIs landing right
3373             # where they do now.
3374             self.failUnlessIn("The remainder of this cycle is expected to "
3375                               "recover: 4 shares, 4 buckets", s)
3376             self.failUnlessIn("The whole cycle is expected to examine "
3377                               "5 shares in 5 buckets and to recover: "
3378                               "5 shares, 5 buckets", s)
3379         d.addCallback(_check_html_in_cycle)
3380
3381         # wait for the crawler to finish the first cycle. Two shares should
3382         # have been removed
3383         def _wait():
3384             return bool(lc.get_state()["last-cycle-finished"] is not None)
3385         d.addCallback(lambda ign: self.poll(_wait))
3386
3387         def _after_first_cycle(ignored):
3388             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3389             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3390             self.failUnlessEqual(count_leases(immutable_si_1), 1)
3391             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3392             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3393             self.failUnlessEqual(count_leases(mutable_si_3), 1)
3394
3395             s = lc.get_state()
3396             last = s["history"][0]
3397
3398             self.failUnlessEqual(last["expiration-enabled"], True)
3399             self.failUnlessEqual(last["configured-expiration-mode"],
3400                                  ("age", 2000, None, ("mutable", "immutable")))
3401             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3402
3403             rec = last["space-recovered"]
3404             self.failUnlessEqual(rec["examined-buckets"], 4)
3405             self.failUnlessEqual(rec["examined-shares"], 4)
3406             self.failUnlessEqual(rec["actual-buckets"], 2)
3407             self.failUnlessEqual(rec["original-buckets"], 2)
3408             self.failUnlessEqual(rec["configured-buckets"], 2)
3409             self.failUnlessEqual(rec["actual-shares"], 2)
3410             self.failUnlessEqual(rec["original-shares"], 2)
3411             self.failUnlessEqual(rec["configured-shares"], 2)
3412             size = sf0_size + sf2_size
3413             self.failUnlessEqual(rec["actual-sharebytes"], size)
3414             self.failUnlessEqual(rec["original-sharebytes"], size)
3415             self.failUnlessEqual(rec["configured-sharebytes"], size)
3416             # different platforms have different notions of "blocks used by
3417             # this file", so merely assert that it's a number
3418             self.failUnless(rec["actual-diskbytes"] >= 0,
3419                             rec["actual-diskbytes"])
3420             self.failUnless(rec["original-diskbytes"] >= 0,
3421                             rec["original-diskbytes"])
3422             self.failUnless(rec["configured-diskbytes"] >= 0,
3423                             rec["configured-diskbytes"])
3424         d.addCallback(_after_first_cycle)
3425         d.addCallback(lambda ign: self.render1(webstatus))
3426         def _check_html(html):
3427             s = remove_tags(html)
3428             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
3429             self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
3430             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3431         d.addCallback(_check_html)
3432         return d
3433
3434     def test_expire_cutoff_date(self):
3435         basedir = "storage/LeaseCrawler/expire_cutoff_date"
3436         fileutil.make_dirs(basedir)
3437         # setting cutoff-date to 2000 seconds ago means that any lease which
3438         # is more than 2000s old will be expired.
3439         now = time.time()
3440         then = int(now - 2000)
3441         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3442                                        expiration_enabled=True,
3443                                        expiration_mode="cutoff-date",
3444                                        expiration_cutoff_date=then)
3445         # make it start sooner than usual.
3446         lc = ss.lease_checker
3447         lc.slow_start = 0
3448         lc.stop_after_first_bucket = True
3449         webstatus = StorageStatus(ss)
3450
3451         # create a few shares, with some leases on them
3452         self.make_shares(ss)
3453         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3454
3455         def count_shares(si):
3456             return len(list(ss._iter_share_files(si)))
3457         def _get_sharefile(si):
3458             return list(ss._iter_share_files(si))[0]
3459         def count_leases(si):
3460             return len(list(_get_sharefile(si).get_leases()))
3461
3462         self.failUnlessEqual(count_shares(immutable_si_0), 1)
3463         self.failUnlessEqual(count_leases(immutable_si_0), 1)
3464         self.failUnlessEqual(count_shares(immutable_si_1), 1)
3465         self.failUnlessEqual(count_leases(immutable_si_1), 2)
3466         self.failUnlessEqual(count_shares(mutable_si_2), 1)
3467         self.failUnlessEqual(count_leases(mutable_si_2), 1)
3468         self.failUnlessEqual(count_shares(mutable_si_3), 1)
3469         self.failUnlessEqual(count_leases(mutable_si_3), 2)
3470
3471         # artificially crank back the expiration time on the first lease of
3472         # each share, to make it look like was renewed 3000s ago. To achieve
3473         # this, we need to set the expiration time to now-3000+31days. This
3474         # will change when the lease format is improved to contain both
3475         # create/renew time and duration.
3476         new_expiration_time = now - 3000 + 31*24*60*60
3477
3478         # Some shares have an extra lease which is set to expire at the
3479         # default time in 31 days from now (age=31days). We then run the
3480         # crawler, which will expire the first lease, making some shares get
3481         # deleted and others stay alive (with one remaining lease)
3482
3483         sf0 = _get_sharefile(immutable_si_0)
3484         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3485         sf0_size = os.stat(sf0.home).st_size
3486
3487         # immutable_si_1 gets an extra lease
3488         sf1 = _get_sharefile(immutable_si_1)
3489         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3490
3491         sf2 = _get_sharefile(mutable_si_2)
3492         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3493         sf2_size = os.stat(sf2.home).st_size
3494
3495         # mutable_si_3 gets an extra lease
3496         sf3 = _get_sharefile(mutable_si_3)
3497         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3498
3499         ss.setServiceParent(self.s)
3500
3501         d = fireEventually()
3502         # examine the state right after the first bucket has been processed
3503         def _after_first_bucket(ignored):
3504             p = lc.get_progress()
3505             if not p["cycle-in-progress"]:
3506                 d2 = fireEventually()
3507                 d2.addCallback(_after_first_bucket)
3508                 return d2
3509         d.addCallback(_after_first_bucket)
3510         d.addCallback(lambda ign: self.render1(webstatus))
3511         def _check_html_in_cycle(html):
3512             s = remove_tags(html)
3513             # the first bucket encountered gets deleted, and its prefix
3514             # happens to be about 1/5th of the way through the ring, so the
3515             # predictor thinks we'll have 5 shares and that we'll delete them
3516             # all. This part of the test depends upon the SIs landing right
3517             # where they do now.
3518             self.failUnlessIn("The remainder of this cycle is expected to "
3519                               "recover: 4 shares, 4 buckets", s)
3520             self.failUnlessIn("The whole cycle is expected to examine "
3521                               "5 shares in 5 buckets and to recover: "
3522                               "5 shares, 5 buckets", s)
3523         d.addCallback(_check_html_in_cycle)
3524
3525         # wait for the crawler to finish the first cycle. Two shares should
3526         # have been removed
3527         def _wait():
3528             return bool(lc.get_state()["last-cycle-finished"] is not None)
3529         d.addCallback(lambda ign: self.poll(_wait))
3530
3531         def _after_first_cycle(ignored):
3532             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3533             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3534             self.failUnlessEqual(count_leases(immutable_si_1), 1)
3535             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3536             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3537             self.failUnlessEqual(count_leases(mutable_si_3), 1)
3538
3539             s = lc.get_state()
3540             last = s["history"][0]
3541
3542             self.failUnlessEqual(last["expiration-enabled"], True)
3543             self.failUnlessEqual(last["configured-expiration-mode"],
3544                                  ("cutoff-date", None, then,
3545                                   ("mutable", "immutable")))
3546             self.failUnlessEqual(last["leases-per-share-histogram"],
3547                                  {1: 2, 2: 2})
3548
3549             rec = last["space-recovered"]
3550             self.failUnlessEqual(rec["examined-buckets"], 4)
3551             self.failUnlessEqual(rec["examined-shares"], 4)
3552             self.failUnlessEqual(rec["actual-buckets"], 2)
3553             self.failUnlessEqual(rec["original-buckets"], 0)
3554             self.failUnlessEqual(rec["configured-buckets"], 2)
3555             self.failUnlessEqual(rec["actual-shares"], 2)
3556             self.failUnlessEqual(rec["original-shares"], 0)
3557             self.failUnlessEqual(rec["configured-shares"], 2)
3558             size = sf0_size + sf2_size
3559             self.failUnlessEqual(rec["actual-sharebytes"], size)
3560             self.failUnlessEqual(rec["original-sharebytes"], 0)
3561             self.failUnlessEqual(rec["configured-sharebytes"], size)
3562             # different platforms have different notions of "blocks used by
3563             # this file", so merely assert that it's a number
3564             self.failUnless(rec["actual-diskbytes"] >= 0,
3565                             rec["actual-diskbytes"])
3566             self.failUnless(rec["original-diskbytes"] >= 0,
3567                             rec["original-diskbytes"])
3568             self.failUnless(rec["configured-diskbytes"] >= 0,
3569                             rec["configured-diskbytes"])
3570         d.addCallback(_after_first_cycle)
3571         d.addCallback(lambda ign: self.render1(webstatus))
3572         def _check_html(html):
3573             s = remove_tags(html)
3574             self.failUnlessIn("Expiration Enabled:"
3575                               " expired leases will be removed", s)
3576             date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
3577             substr = "Leases created or last renewed before %s will be considered expired." % date
3578             self.failUnlessIn(substr, s)
3579             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3580         d.addCallback(_check_html)
3581         return d
3582
3583     def test_only_immutable(self):
3584         basedir = "storage/LeaseCrawler/only_immutable"
3585         fileutil.make_dirs(basedir)
3586         now = time.time()
3587         then = int(now - 2000)
3588         ss = StorageServer(basedir, "\x00" * 20,
3589                            expiration_enabled=True,
3590                            expiration_mode="cutoff-date",
3591                            expiration_cutoff_date=then,
3592                            expiration_sharetypes=("immutable",))
3593         lc = ss.lease_checker
3594         lc.slow_start = 0
3595         webstatus = StorageStatus(ss)
3596
3597         self.make_shares(ss)
3598         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3599         # set all leases to be expirable
3600         new_expiration_time = now - 3000 + 31*24*60*60
3601
3602         def count_shares(si):
3603             return len(list(ss._iter_share_files(si)))
3604         def _get_sharefile(si):
3605             return list(ss._iter_share_files(si))[0]
3606         def count_leases(si):
3607             return len(list(_get_sharefile(si).get_leases()))
3608
3609         sf0 = _get_sharefile(immutable_si_0)
3610         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3611         sf1 = _get_sharefile(immutable_si_1)
3612         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3613         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3614         sf2 = _get_sharefile(mutable_si_2)
3615         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3616         sf3 = _get_sharefile(mutable_si_3)
3617         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3618         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3619
3620         ss.setServiceParent(self.s)
3621         def _wait():
3622             return bool(lc.get_state()["last-cycle-finished"] is not None)
3623         d = self.poll(_wait)
3624
3625         def _after_first_cycle(ignored):
3626             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3627             self.failUnlessEqual(count_shares(immutable_si_1), 0)
3628             self.failUnlessEqual(count_shares(mutable_si_2), 1)
3629             self.failUnlessEqual(count_leases(mutable_si_2), 1)
3630             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3631             self.failUnlessEqual(count_leases(mutable_si_3), 2)
3632         d.addCallback(_after_first_cycle)
3633         d.addCallback(lambda ign: self.render1(webstatus))
3634         def _check_html(html):
3635             s = remove_tags(html)
3636             self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
3637         d.addCallback(_check_html)
3638         return d
3639
3640     def test_only_mutable(self):
3641         basedir = "storage/LeaseCrawler/only_mutable"
3642         fileutil.make_dirs(basedir)
3643         now = time.time()
3644         then = int(now - 2000)
3645         ss = StorageServer(basedir, "\x00" * 20,
3646                            expiration_enabled=True,
3647                            expiration_mode="cutoff-date",
3648                            expiration_cutoff_date=then,
3649                            expiration_sharetypes=("mutable",))
3650         lc = ss.lease_checker
3651         lc.slow_start = 0
3652         webstatus = StorageStatus(ss)
3653
3654         self.make_shares(ss)
3655         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3656         # set all leases to be expirable
3657         new_expiration_time = now - 3000 + 31*24*60*60
3658
3659         def count_shares(si):
3660             return len(list(ss._iter_share_files(si)))
3661         def _get_sharefile(si):
3662             return list(ss._iter_share_files(si))[0]
3663         def count_leases(si):
3664             return len(list(_get_sharefile(si).get_leases()))
3665
3666         sf0 = _get_sharefile(immutable_si_0)
3667         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3668         sf1 = _get_sharefile(immutable_si_1)
3669         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3670         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3671         sf2 = _get_sharefile(mutable_si_2)
3672         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3673         sf3 = _get_sharefile(mutable_si_3)
3674         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3675         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3676
3677         ss.setServiceParent(self.s)
3678         def _wait():
3679             return bool(lc.get_state()["last-cycle-finished"] is not None)
3680         d = self.poll(_wait)
3681
3682         def _after_first_cycle(ignored):
3683             self.failUnlessEqual(count_shares(immutable_si_0), 1)
3684             self.failUnlessEqual(count_leases(immutable_si_0), 1)
3685             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3686             self.failUnlessEqual(count_leases(immutable_si_1), 2)
3687             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3688             self.failUnlessEqual(count_shares(mutable_si_3), 0)
3689         d.addCallback(_after_first_cycle)
3690         d.addCallback(lambda ign: self.render1(webstatus))
3691         def _check_html(html):
3692             s = remove_tags(html)
3693             self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
3694         d.addCallback(_check_html)
3695         return d
3696
3697     def test_bad_mode(self):
3698         basedir = "storage/LeaseCrawler/bad_mode"
3699         fileutil.make_dirs(basedir)
3700         e = self.failUnlessRaises(ValueError,
3701                                   StorageServer, basedir, "\x00" * 20,
3702                                   expiration_mode="bogus")
3703         self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
3704
3705     def test_parse_duration(self):
3706         DAY = 24*60*60
3707         MONTH = 31*DAY
3708         YEAR = 365*DAY
3709         p = time_format.parse_duration
3710         self.failUnlessEqual(p("7days"), 7*DAY)
3711         self.failUnlessEqual(p("31day"), 31*DAY)
3712         self.failUnlessEqual(p("60 days"), 60*DAY)
3713         self.failUnlessEqual(p("2mo"), 2*MONTH)
3714         self.failUnlessEqual(p("3 month"), 3*MONTH)
3715         self.failUnlessEqual(p("2years"), 2*YEAR)
3716         e = self.failUnlessRaises(ValueError, p, "2kumquats")
3717         self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
3718
3719     def test_parse_date(self):
3720         p = time_format.parse_date
3721         self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
3722         self.failUnlessEqual(p("2009-03-18"), 1237334400)
3723
3724     def test_limited_history(self):
3725         basedir = "storage/LeaseCrawler/limited_history"
3726         fileutil.make_dirs(basedir)
3727         ss = StorageServer(basedir, "\x00" * 20)
3728         # make it start sooner than usual.
3729         lc = ss.lease_checker
3730         lc.slow_start = 0
3731         lc.cpu_slice = 500
3732
3733         # create a few shares, with some leases on them
3734         self.make_shares(ss)
3735
3736         ss.setServiceParent(self.s)
3737
3738         def _wait_until_15_cycles_done():
3739             last = lc.state["last-cycle-finished"]
3740             if last is not None and last >= 15:
3741                 return True
3742             if lc.timer:
3743                 lc.timer.reset(0)
3744             return False
3745         d = self.poll(_wait_until_15_cycles_done)
3746
3747         def _check(ignored):
3748             s = lc.get_state()
3749             h = s["history"]
3750             self.failUnlessEqual(len(h), 10)
3751             self.failUnlessEqual(max(h.keys()), 15)
3752             self.failUnlessEqual(min(h.keys()), 6)
3753         d.addCallback(_check)
3754         return d
3755
3756     def test_unpredictable_future(self):
3757         basedir = "storage/LeaseCrawler/unpredictable_future"
3758         fileutil.make_dirs(basedir)
3759         ss = StorageServer(basedir, "\x00" * 20)
3760         # make it start sooner than usual.
3761         lc = ss.lease_checker
3762         lc.slow_start = 0
3763         lc.cpu_slice = -1.0 # stop quickly
3764
3765         self.make_shares(ss)
3766
3767         ss.setServiceParent(self.s)
3768
3769         d = fireEventually()
3770         def _check(ignored):
3771             # this should fire after the first bucket is complete, but before
3772             # the first prefix is complete, so the progress-measurer won't
3773             # think we've gotten far enough to raise our percent-complete
3774             # above 0%, triggering the cannot-predict-the-future code in
3775             # expirer.py . This will have to change if/when the
3776             # progress-measurer gets smart enough to count buckets (we'll
3777             # have to interrupt it even earlier, before it's finished the
3778             # first bucket).
3779             s = lc.get_state()
3780             if "cycle-to-date" not in s:
3781                 d2 = fireEventually()
3782                 d2.addCallback(_check)
3783                 return d2
3784             self.failUnlessIn("cycle-to-date", s)
3785             self.failUnlessIn("estimated-remaining-cycle", s)
3786             self.failUnlessIn("estimated-current-cycle", s)
3787
3788             left = s["estimated-remaining-cycle"]["space-recovered"]
3789             self.failUnlessEqual(left["actual-buckets"], None)
3790             self.failUnlessEqual(left["original-buckets"], None)
3791             self.failUnlessEqual(left["configured-buckets"], None)
3792             self.failUnlessEqual(left["actual-shares"], None)
3793             self.failUnlessEqual(left["original-shares"], None)
3794             self.failUnlessEqual(left["configured-shares"], None)
3795             self.failUnlessEqual(left["actual-diskbytes"], None)
3796             self.failUnlessEqual(left["original-diskbytes"], None)
3797             self.failUnlessEqual(left["configured-diskbytes"], None)
3798             self.failUnlessEqual(left["actual-sharebytes"], None)
3799             self.failUnlessEqual(left["original-sharebytes"], None)
3800             self.failUnlessEqual(left["configured-sharebytes"], None)
3801
3802             full = s["estimated-remaining-cycle"]["space-recovered"]
3803             self.failUnlessEqual(full["actual-buckets"], None)
3804             self.failUnlessEqual(full["original-buckets"], None)
3805             self.failUnlessEqual(full["configured-buckets"], None)
3806             self.failUnlessEqual(full["actual-shares"], None)
3807             self.failUnlessEqual(full["original-shares"], None)
3808             self.failUnlessEqual(full["configured-shares"], None)
3809             self.failUnlessEqual(full["actual-diskbytes"], None)
3810             self.failUnlessEqual(full["original-diskbytes"], None)
3811             self.failUnlessEqual(full["configured-diskbytes"], None)
3812             self.failUnlessEqual(full["actual-sharebytes"], None)
3813             self.failUnlessEqual(full["original-sharebytes"], None)
3814             self.failUnlessEqual(full["configured-sharebytes"], None)
3815
3816         d.addCallback(_check)
3817         return d
3818
3819     def test_no_st_blocks(self):
3820         basedir = "storage/LeaseCrawler/no_st_blocks"
3821         fileutil.make_dirs(basedir)
3822         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
3823                                         expiration_mode="age",
3824                                         expiration_override_lease_duration=-1000)
3825         # a negative expiration_time= means the "configured-"
3826         # space-recovered counts will be non-zero, since all shares will have
3827         # expired by then
3828
3829         # make it start sooner than usual.
3830         lc = ss.lease_checker
3831         lc.slow_start = 0
3832
3833         self.make_shares(ss)
3834         ss.setServiceParent(self.s)
3835         def _wait():
3836             return bool(lc.get_state()["last-cycle-finished"] is not None)
3837         d = self.poll(_wait)
3838
3839         def _check(ignored):
3840             s = lc.get_state()
3841             last = s["history"][0]
3842             rec = last["space-recovered"]
3843             self.failUnlessEqual(rec["configured-buckets"], 4)
3844             self.failUnlessEqual(rec["configured-shares"], 4)
3845             self.failUnless(rec["configured-sharebytes"] > 0,
3846                             rec["configured-sharebytes"])
3847             # without the .st_blocks field in os.stat() results, we should be
3848             # reporting diskbytes==sharebytes
3849             self.failUnlessEqual(rec["configured-sharebytes"],
3850                                  rec["configured-diskbytes"])
3851         d.addCallback(_check)
3852         return d
3853
3854     def test_share_corruption(self):
3855         self._poll_should_ignore_these_errors = [
3856             UnknownMutableContainerVersionError,
3857             UnknownImmutableContainerVersionError,
3858             ]
3859         basedir = "storage/LeaseCrawler/share_corruption"
3860         fileutil.make_dirs(basedir)
3861         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3862         w = StorageStatus(ss)
3863         # make it start sooner than usual.
3864         lc = ss.lease_checker
3865         lc.stop_after_first_bucket = True
3866         lc.slow_start = 0
3867         lc.cpu_slice = 500
3868
3869         # create a few shares, with some leases on them
3870         self.make_shares(ss)
3871
3872         # now corrupt one, and make sure the lease-checker keeps going
3873         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3874         first = min(self.sis)
3875         first_b32 = base32.b2a(first)
3876         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
3877         f = open(fn, "rb+")
3878         f.seek(0)
3879         f.write("BAD MAGIC")
3880         f.close()
3881         # if get_share_file() doesn't see the correct mutable magic, it
3882         # assumes the file is an immutable share, and then
3883         # immutable.ShareFile sees a bad version. So regardless of which kind
3884         # of share we corrupted, this will trigger an
3885         # UnknownImmutableContainerVersionError.
3886
3887         # also create an empty bucket
3888         empty_si = base32.b2a("\x04"*16)
3889         empty_bucket_dir = os.path.join(ss.sharedir,
3890                                         storage_index_to_dir(empty_si))
3891         fileutil.make_dirs(empty_bucket_dir)
3892
3893         ss.setServiceParent(self.s)
3894
3895         d = fireEventually()
3896
3897         # now examine the state right after the first bucket has been
3898         # processed.
3899         def _after_first_bucket(ignored):
3900             s = lc.get_state()
3901             if "cycle-to-date" not in s:
3902                 d2 = fireEventually()
3903                 d2.addCallback(_after_first_bucket)
3904                 return d2
3905             so_far = s["cycle-to-date"]
3906             rec = so_far["space-recovered"]
3907             self.failUnlessEqual(rec["examined-buckets"], 1)
3908             self.failUnlessEqual(rec["examined-shares"], 0)
3909             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
3910         d.addCallback(_after_first_bucket)
3911
3912         d.addCallback(lambda ign: self.render_json(w))
3913         def _check_json(json):
3914             data = simplejson.loads(json)
3915             # grr. json turns all dict keys into strings.
3916             so_far = data["lease-checker"]["cycle-to-date"]
3917             corrupt_shares = so_far["corrupt-shares"]
3918             # it also turns all tuples into lists
3919             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3920         d.addCallback(_check_json)
3921         d.addCallback(lambda ign: self.render1(w))
3922         def _check_html(html):
3923             s = remove_tags(html)
3924             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3925         d.addCallback(_check_html)
3926
3927         def _wait():
3928             return bool(lc.get_state()["last-cycle-finished"] is not None)
3929         d.addCallback(lambda ign: self.poll(_wait))
3930
3931         def _after_first_cycle(ignored):
3932             s = lc.get_state()
3933             last = s["history"][0]
3934             rec = last["space-recovered"]
3935             self.failUnlessEqual(rec["examined-buckets"], 5)
3936             self.failUnlessEqual(rec["examined-shares"], 3)
3937             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
3938         d.addCallback(_after_first_cycle)
3939         d.addCallback(lambda ign: self.render_json(w))
3940         def _check_json_history(json):
3941             data = simplejson.loads(json)
3942             last = data["lease-checker"]["history"]["0"]
3943             corrupt_shares = last["corrupt-shares"]
3944             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3945         d.addCallback(_check_json_history)
3946         d.addCallback(lambda ign: self.render1(w))
3947         def _check_html_history(html):
3948             s = remove_tags(html)
3949             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3950         d.addCallback(_check_html_history)
3951
3952         def _cleanup(res):
3953             self.flushLoggedErrors(UnknownMutableContainerVersionError,
3954                                    UnknownImmutableContainerVersionError)
3955             return res
3956         d.addBoth(_cleanup)
3957         return d
3958
3959     def render_json(self, page):
3960         d = self.render1(page, args={"t": ["json"]})
3961         return d
3962
3963 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3964
3965     def setUp(self):
3966         self.s = service.MultiService()
3967         self.s.startService()
3968     def tearDown(self):
3969         return self.s.stopService()
3970
3971     def test_no_server(self):
3972         w = StorageStatus(None)
3973         html = w.renderSynchronously()
3974         self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
3975
3976     def test_status(self):
3977         basedir = "storage/WebStatus/status"
3978         fileutil.make_dirs(basedir)
3979         ss = StorageServer(basedir, "\x00" * 20)
3980         ss.setServiceParent(self.s)
3981         w = StorageStatus(ss)
3982         d = self.render1(w)
3983         def _check_html(html):
3984             self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3985             s = remove_tags(html)
3986             self.failUnlessIn("Accepting new shares: Yes", s)
3987             self.failUnlessIn("Reserved space: - 0 B (0)", s)
3988         d.addCallback(_check_html)
3989         d.addCallback(lambda ign: self.render_json(w))
3990         def _check_json(json):
3991             data = simplejson.loads(json)
3992             s = data["stats"]
3993             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
3994             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
3995             self.failUnlessIn("bucket-counter", data)
3996             self.failUnlessIn("lease-checker", data)
3997         d.addCallback(_check_json)
3998         return d
3999
4000     def render_json(self, page):
4001         d = self.render1(page, args={"t": ["json"]})
4002         return d
4003
4004     @mock.patch('allmydata.util.fileutil.get_disk_stats')
4005     def test_status_no_disk_stats(self, mock_get_disk_stats):
4006         mock_get_disk_stats.side_effect = AttributeError()
4007
4008         # Some platforms may have no disk stats API. Make sure the code can handle that
4009         # (test runs on all platforms).
4010         basedir = "storage/WebStatus/status_no_disk_stats"
4011         fileutil.make_dirs(basedir)
4012         ss = StorageServer(basedir, "\x00" * 20)
4013         ss.setServiceParent(self.s)
4014         w = StorageStatus(ss)
4015         html = w.renderSynchronously()
4016         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4017         s = remove_tags(html)
4018         self.failUnlessIn("Accepting new shares: Yes", s)
4019         self.failUnlessIn("Total disk space: ?", s)
4020         self.failUnlessIn("Space Available to Tahoe: ?", s)
4021         self.failUnless(ss.get_available_space() is None)
4022
4023     @mock.patch('allmydata.util.fileutil.get_disk_stats')
4024     def test_status_bad_disk_stats(self, mock_get_disk_stats):
4025         mock_get_disk_stats.side_effect = OSError()
4026
4027         # If the API to get disk stats exists but a call to it fails, then the status should
4028         # show that no shares will be accepted, and get_available_space() should be 0.
4029         basedir = "storage/WebStatus/status_bad_disk_stats"
4030         fileutil.make_dirs(basedir)
4031         ss = StorageServer(basedir, "\x00" * 20)
4032         ss.setServiceParent(self.s)
4033         w = StorageStatus(ss)
4034         html = w.renderSynchronously()
4035         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4036         s = remove_tags(html)
4037         self.failUnlessIn("Accepting new shares: No", s)
4038         self.failUnlessIn("Total disk space: ?", s)
4039         self.failUnlessIn("Space Available to Tahoe: ?", s)
4040         self.failUnlessEqual(ss.get_available_space(), 0)
4041
4042     @mock.patch('allmydata.util.fileutil.get_disk_stats')
4043     def test_status_right_disk_stats(self, mock_get_disk_stats):
4044         GB = 1000000000
4045         total            = 5*GB
4046         free_for_root    = 4*GB
4047         free_for_nonroot = 3*GB
4048         reserved_space   = 1*GB
4049         used = total - free_for_root
4050         avail = max(free_for_nonroot - reserved_space, 0)
4051         mock_get_disk_stats.return_value = {
4052             'total': total,
4053             'free_for_root': free_for_root,
4054             'free_for_nonroot': free_for_nonroot,
4055             'used': used,
4056             'avail': avail,
4057         }
4058
4059         basedir = "storage/WebStatus/status_right_disk_stats"
4060         fileutil.make_dirs(basedir)
4061         ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved_space)
4062         expecteddir = ss.sharedir
4063         ss.setServiceParent(self.s)
4064         w = StorageStatus(ss)
4065         html = w.renderSynchronously()
4066
4067         self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
4068                     mock_get_disk_stats.call_args_list)
4069
4070         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4071         s = remove_tags(html)
4072         self.failUnlessIn("Total disk space: 5.00 GB", s)
4073         self.failUnlessIn("Disk space used: - 1.00 GB", s)
4074         self.failUnlessIn("Disk space free (root): 4.00 GB", s)
4075         self.failUnlessIn("Disk space free (non-root): 3.00 GB", s)
4076         self.failUnlessIn("Reserved space: - 1.00 GB", s)
4077         self.failUnlessIn("Space Available to Tahoe: 2.00 GB", s)
4078         self.failUnlessEqual(ss.get_available_space(), 2*GB)
4079
4080     def test_readonly(self):
4081         basedir = "storage/WebStatus/readonly"
4082         fileutil.make_dirs(basedir)
4083         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
4084         ss.setServiceParent(self.s)
4085         w = StorageStatus(ss)
4086         html = w.renderSynchronously()
4087         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4088         s = remove_tags(html)
4089         self.failUnlessIn("Accepting new shares: No", s)
4090
4091     def test_reserved(self):
4092         basedir = "storage/WebStatus/reserved"
4093         fileutil.make_dirs(basedir)
4094         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4095         ss.setServiceParent(self.s)
4096         w = StorageStatus(ss)
4097         html = w.renderSynchronously()
4098         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4099         s = remove_tags(html)
4100         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4101
4102     def test_huge_reserved(self):
4103         basedir = "storage/WebStatus/reserved"
4104         fileutil.make_dirs(basedir)
4105         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4106         ss.setServiceParent(self.s)
4107         w = StorageStatus(ss)
4108         html = w.renderSynchronously()
4109         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4110         s = remove_tags(html)
4111         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4112
4113     def test_util(self):
4114         w = StorageStatus(None)
4115         self.failUnlessEqual(w.render_space(None, None), "?")
4116         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
4117         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
4118         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
4119         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
4120         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)