]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
storage: test that the storage server ignores requests to extend shares by sending...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1 import time, os.path, platform, stat, re, simplejson, struct, shutil
2
3 import mock
4
5 from twisted.trial import unittest
6
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
10 import itertools
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22      ReadBucketProxy
23 from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
24                                      LayoutInvalid, MDMFSIGNABLEHEADER, \
25                                      SIGNED_PREFIX, MDMFHEADER, \
26                                      MDMFOFFSETS, SDMFSlotWriteProxy, \
27                                      PRIVATE_KEY_SIZE, \
28                                      SIGNATURE_SIZE, \
29                                      VERIFICATION_KEY_SIZE, \
30                                      SHARE_HASH_CHAIN_SIZE
31 from allmydata.interfaces import BadWriteEnablerError
32 from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
33 from allmydata.test.common_web import WebRenderingMixin
34 from allmydata.test.no_network import NoNetworkServer
35 from allmydata.web.storage import StorageStatus, remove_prefix
36
37 class Marker:
38     pass
39 class FakeCanary:
40     def __init__(self, ignore_disconnectors=False):
41         self.ignore = ignore_disconnectors
42         self.disconnectors = {}
43     def notifyOnDisconnect(self, f, *args, **kwargs):
44         if self.ignore:
45             return
46         m = Marker()
47         self.disconnectors[m] = (f, args, kwargs)
48         return m
49     def dontNotifyOnDisconnect(self, marker):
50         if self.ignore:
51             return
52         del self.disconnectors[marker]
53
54 class FakeStatsProvider:
55     def count(self, name, delta=1):
56         pass
57     def register_producer(self, producer):
58         pass
59
60 class Bucket(unittest.TestCase):
61     def make_workdir(self, name):
62         basedir = os.path.join("storage", "Bucket", name)
63         incoming = os.path.join(basedir, "tmp", "bucket")
64         final = os.path.join(basedir, "bucket")
65         fileutil.make_dirs(basedir)
66         fileutil.make_dirs(os.path.join(basedir, "tmp"))
67         return incoming, final
68
69     def bucket_writer_closed(self, bw, consumed):
70         pass
71     def add_latency(self, category, latency):
72         pass
73     def count(self, name, delta=1):
74         pass
75
76     def make_lease(self):
77         owner_num = 0
78         renew_secret = os.urandom(32)
79         cancel_secret = os.urandom(32)
80         expiration_time = time.time() + 5000
81         return LeaseInfo(owner_num, renew_secret, cancel_secret,
82                          expiration_time, "\x00" * 20)
83
84     def test_create(self):
85         incoming, final = self.make_workdir("test_create")
86         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
87                           FakeCanary())
88         bw.remote_write(0, "a"*25)
89         bw.remote_write(25, "b"*25)
90         bw.remote_write(50, "c"*25)
91         bw.remote_write(75, "d"*7)
92         bw.remote_close()
93
94     def test_readwrite(self):
95         incoming, final = self.make_workdir("test_readwrite")
96         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
97                           FakeCanary())
98         bw.remote_write(0, "a"*25)
99         bw.remote_write(25, "b"*25)
100         bw.remote_write(50, "c"*7) # last block may be short
101         bw.remote_close()
102
103         # now read from it
104         br = BucketReader(self, bw.finalhome)
105         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
106         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
107         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
108
109     def test_read_past_end_of_share_data(self):
110         # test vector for immutable files (hard-coded contents of an immutable share
111         # file):
112
113         # The following immutable share file content is identical to that
114         # generated with storage.immutable.ShareFile from Tahoe-LAFS v1.8.2
115         # with share data == 'a'. The total size of this content is 85
116         # bytes.
117
118         containerdata = struct.pack('>LLL', 1, 1, 1)
119
120         # A Tahoe-LAFS storage client would send as the share_data a
121         # complicated string involving hash trees and a URI Extension Block
122         # -- see allmydata/immutable/layout.py . This test, which is
123         # simulating a client, just sends 'a'.
124         share_data = 'a'
125
126         ownernumber = struct.pack('>L', 0)
127         renewsecret  = 'THIS LETS ME RENEW YOUR FILE....'
128         assert len(renewsecret) == 32
129         cancelsecret = 'THIS LETS ME KILL YOUR FILE HAHA'
130         assert len(cancelsecret) == 32
131         expirationtime = struct.pack('>L', 60*60*24*31) # 31 days in seconds
132
133         lease_data = ownernumber + renewsecret + cancelsecret + expirationtime
134
135         share_file_data = containerdata + share_data + lease_data
136
137         incoming, final = self.make_workdir("test_read_past_end_of_share_data")
138
139         fileutil.write(final, share_file_data)
140
141         mockstorageserver = mock.Mock()
142
143         # Now read from it.
144         br = BucketReader(mockstorageserver, final)
145
146         self.failUnlessEqual(br.remote_read(0, len(share_data)), share_data)
147
148         # Read past the end of share data to get the cancel secret.
149         read_length = len(share_data) + len(ownernumber) + len(renewsecret) + len(cancelsecret)
150
151         result_of_read = br.remote_read(0, read_length)
152         self.failUnlessEqual(result_of_read, share_data)
153
154         result_of_read = br.remote_read(0, len(share_data)+1)
155         self.failUnlessEqual(result_of_read, share_data)
156
157 class RemoteBucket:
158
159     def __init__(self):
160         self.read_count = 0
161         self.write_count = 0
162
163     def callRemote(self, methname, *args, **kwargs):
164         def _call():
165             meth = getattr(self.target, "remote_" + methname)
166             return meth(*args, **kwargs)
167
168         if methname == "slot_readv":
169             self.read_count += 1
170         if "writev" in methname:
171             self.write_count += 1
172
173         return defer.maybeDeferred(_call)
174
175
176 class BucketProxy(unittest.TestCase):
177     def make_bucket(self, name, size):
178         basedir = os.path.join("storage", "BucketProxy", name)
179         incoming = os.path.join(basedir, "tmp", "bucket")
180         final = os.path.join(basedir, "bucket")
181         fileutil.make_dirs(basedir)
182         fileutil.make_dirs(os.path.join(basedir, "tmp"))
183         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
184                           FakeCanary())
185         rb = RemoteBucket()
186         rb.target = bw
187         return bw, rb, final
188
189     def make_lease(self):
190         owner_num = 0
191         renew_secret = os.urandom(32)
192         cancel_secret = os.urandom(32)
193         expiration_time = time.time() + 5000
194         return LeaseInfo(owner_num, renew_secret, cancel_secret,
195                          expiration_time, "\x00" * 20)
196
197     def bucket_writer_closed(self, bw, consumed):
198         pass
199     def add_latency(self, category, latency):
200         pass
201     def count(self, name, delta=1):
202         pass
203
204     def test_create(self):
205         bw, rb, sharefname = self.make_bucket("test_create", 500)
206         bp = WriteBucketProxy(rb, None,
207                               data_size=300,
208                               block_size=10,
209                               num_segments=5,
210                               num_share_hashes=3,
211                               uri_extension_size_max=500)
212         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
213
214     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
215         # Let's pretend each share has 100 bytes of data, and that there are
216         # 4 segments (25 bytes each), and 8 shares total. So the two
217         # per-segment merkle trees (crypttext_hash_tree,
218         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
219         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
220         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
221         # long. That should make the whole share:
222         #
223         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
224         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
225
226         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
227
228         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
229                             for i in range(7)]
230         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
231                         for i in range(7)]
232         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
233                         for i in (1,9,13)]
234         uri_extension = "s" + "E"*498 + "e"
235
236         bw, rb, sharefname = self.make_bucket(name, sharesize)
237         bp = wbp_class(rb, None,
238                        data_size=95,
239                        block_size=25,
240                        num_segments=4,
241                        num_share_hashes=3,
242                        uri_extension_size_max=len(uri_extension))
243
244         d = bp.put_header()
245         d.addCallback(lambda res: bp.put_block(0, "a"*25))
246         d.addCallback(lambda res: bp.put_block(1, "b"*25))
247         d.addCallback(lambda res: bp.put_block(2, "c"*25))
248         d.addCallback(lambda res: bp.put_block(3, "d"*20))
249         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
250         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
251         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
252         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
253         d.addCallback(lambda res: bp.close())
254
255         # now read everything back
256         def _start_reading(res):
257             br = BucketReader(self, sharefname)
258             rb = RemoteBucket()
259             rb.target = br
260             server = NoNetworkServer("abc", None)
261             rbp = rbp_class(rb, server, storage_index="")
262             self.failUnlessIn("to peer", repr(rbp))
263             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
264
265             d1 = rbp.get_block_data(0, 25, 25)
266             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
267             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
268             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
269             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
270             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
271             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
272             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
273
274             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
275             d1.addCallback(lambda res:
276                            self.failUnlessEqual(res, crypttext_hashes))
277             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
278             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
279             d1.addCallback(lambda res: rbp.get_share_hashes())
280             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
281             d1.addCallback(lambda res: rbp.get_uri_extension())
282             d1.addCallback(lambda res:
283                            self.failUnlessEqual(res, uri_extension))
284
285             return d1
286
287         d.addCallback(_start_reading)
288
289         return d
290
291     def test_readwrite_v1(self):
292         return self._do_test_readwrite("test_readwrite_v1",
293                                        0x24, WriteBucketProxy, ReadBucketProxy)
294
295     def test_readwrite_v2(self):
296         return self._do_test_readwrite("test_readwrite_v2",
297                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
298
299 class Server(unittest.TestCase):
300
301     def setUp(self):
302         self.sparent = LoggingServiceParent()
303         self.sparent.startService()
304         self._lease_secret = itertools.count()
305     def tearDown(self):
306         return self.sparent.stopService()
307
308     def workdir(self, name):
309         basedir = os.path.join("storage", "Server", name)
310         return basedir
311
312     def create(self, name, reserved_space=0, klass=StorageServer):
313         workdir = self.workdir(name)
314         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
315                    stats_provider=FakeStatsProvider())
316         ss.setServiceParent(self.sparent)
317         return ss
318
319     def test_create(self):
320         self.create("test_create")
321
322     def test_declares_fixed_1528(self):
323         ss = self.create("test_declares_fixed_1528")
324         ver = ss.remote_get_version()
325         sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
326         self.failUnless(sv1.get('prevents-read-past-end-of-share-data'), sv1)
327
328     def allocate(self, ss, storage_index, sharenums, size, canary=None):
329         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
330         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
331         if not canary:
332             canary = FakeCanary()
333         return ss.remote_allocate_buckets(storage_index,
334                                           renew_secret, cancel_secret,
335                                           sharenums, size, canary)
336
337     def test_large_share(self):
338         syslow = platform.system().lower()
339         if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
340             raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
341
342         avail = fileutil.get_available_space('.', 512*2**20)
343         if avail <= 4*2**30:
344             raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
345
346         ss = self.create("test_large_share")
347
348         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
349         self.failUnlessEqual(already, set())
350         self.failUnlessEqual(set(writers.keys()), set([0]))
351
352         shnum, bucket = writers.items()[0]
353         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
354         bucket.remote_write(2**32, "ab")
355         bucket.remote_close()
356
357         readers = ss.remote_get_buckets("allocate")
358         reader = readers[shnum]
359         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
360
361     def test_dont_overfill_dirs(self):
362         """
363         This test asserts that if you add a second share whose storage index
364         share lots of leading bits with an extant share (but isn't the exact
365         same storage index), this won't add an entry to the share directory.
366         """
367         ss = self.create("test_dont_overfill_dirs")
368         already, writers = self.allocate(ss, "storageindex", [0], 10)
369         for i, wb in writers.items():
370             wb.remote_write(0, "%10d" % i)
371             wb.remote_close()
372         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
373                                 "shares")
374         children_of_storedir = set(os.listdir(storedir))
375
376         # Now store another one under another storageindex that has leading
377         # chars the same as the first storageindex.
378         already, writers = self.allocate(ss, "storageindey", [0], 10)
379         for i, wb in writers.items():
380             wb.remote_write(0, "%10d" % i)
381             wb.remote_close()
382         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
383                                 "shares")
384         new_children_of_storedir = set(os.listdir(storedir))
385         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
386
387     def test_remove_incoming(self):
388         ss = self.create("test_remove_incoming")
389         already, writers = self.allocate(ss, "vid", range(3), 10)
390         for i,wb in writers.items():
391             wb.remote_write(0, "%10d" % i)
392             wb.remote_close()
393         incoming_share_dir = wb.incominghome
394         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
395         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
396         incoming_dir = os.path.dirname(incoming_prefix_dir)
397         self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
398         self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
399         self.failUnless(os.path.exists(incoming_dir), incoming_dir)
400
401     def test_abort(self):
402         # remote_abort, when called on a writer, should make sure that
403         # the allocated size of the bucket is not counted by the storage
404         # server when accounting for space.
405         ss = self.create("test_abort")
406         already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
407         self.failIfEqual(ss.allocated_size(), 0)
408
409         # Now abort the writers.
410         for writer in writers.itervalues():
411             writer.remote_abort()
412         self.failUnlessEqual(ss.allocated_size(), 0)
413
414
415     def test_allocate(self):
416         ss = self.create("test_allocate")
417
418         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
419
420         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
421         self.failUnlessEqual(already, set())
422         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
423
424         # while the buckets are open, they should not count as readable
425         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
426
427         # close the buckets
428         for i,wb in writers.items():
429             wb.remote_write(0, "%25d" % i)
430             wb.remote_close()
431             # aborting a bucket that was already closed is a no-op
432             wb.remote_abort()
433
434         # now they should be readable
435         b = ss.remote_get_buckets("allocate")
436         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
437         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
438         b_str = str(b[0])
439         self.failUnlessIn("BucketReader", b_str)
440         self.failUnlessIn("mfwgy33dmf2g 0", b_str)
441
442         # now if we ask about writing again, the server should offer those
443         # three buckets as already present. It should offer them even if we
444         # don't ask about those specific ones.
445         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
446         self.failUnlessEqual(already, set([0,1,2]))
447         self.failUnlessEqual(set(writers.keys()), set([3,4]))
448
449         # while those two buckets are open for writing, the server should
450         # refuse to offer them to uploaders
451
452         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
453         self.failUnlessEqual(already2, set([0,1,2]))
454         self.failUnlessEqual(set(writers2.keys()), set([5]))
455
456         # aborting the writes should remove the tempfiles
457         for i,wb in writers2.items():
458             wb.remote_abort()
459         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
460         self.failUnlessEqual(already2, set([0,1,2]))
461         self.failUnlessEqual(set(writers2.keys()), set([5]))
462
463         for i,wb in writers2.items():
464             wb.remote_abort()
465         for i,wb in writers.items():
466             wb.remote_abort()
467
468     def test_bad_container_version(self):
469         ss = self.create("test_bad_container_version")
470         a,w = self.allocate(ss, "si1", [0], 10)
471         w[0].remote_write(0, "\xff"*10)
472         w[0].remote_close()
473
474         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
475         f = open(fn, "rb+")
476         f.seek(0)
477         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
478         f.close()
479
480         ss.remote_get_buckets("allocate")
481
482         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
483                                   ss.remote_get_buckets, "si1")
484         self.failUnlessIn(" had version 0 but we wanted 1", str(e))
485
486     def test_disconnect(self):
487         # simulate a disconnection
488         ss = self.create("test_disconnect")
489         canary = FakeCanary()
490         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
491         self.failUnlessEqual(already, set())
492         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
493         for (f,args,kwargs) in canary.disconnectors.values():
494             f(*args, **kwargs)
495         del already
496         del writers
497
498         # that ought to delete the incoming shares
499         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
500         self.failUnlessEqual(already, set())
501         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
502
503     @mock.patch('allmydata.util.fileutil.get_disk_stats')
504     def test_reserved_space(self, mock_get_disk_stats):
505         reserved_space=10000
506         mock_get_disk_stats.return_value = {
507             'free_for_nonroot': 15000,
508             'avail': max(15000 - reserved_space, 0),
509             }
510
511         ss = self.create("test_reserved_space", reserved_space=reserved_space)
512         # 15k available, 10k reserved, leaves 5k for shares
513
514         # a newly created and filled share incurs this much overhead, beyond
515         # the size we request.
516         OVERHEAD = 3*4
517         LEASE_SIZE = 4+32+32+4
518         canary = FakeCanary(True)
519         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
520         self.failUnlessEqual(len(writers), 3)
521         # now the StorageServer should have 3000 bytes provisionally
522         # allocated, allowing only 2000 more to be claimed
523         self.failUnlessEqual(len(ss._active_writers), 3)
524
525         # allocating 1001-byte shares only leaves room for one
526         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
527         self.failUnlessEqual(len(writers2), 1)
528         self.failUnlessEqual(len(ss._active_writers), 4)
529
530         # we abandon the first set, so their provisional allocation should be
531         # returned
532         del already
533         del writers
534         self.failUnlessEqual(len(ss._active_writers), 1)
535         # now we have a provisional allocation of 1001 bytes
536
537         # and we close the second set, so their provisional allocation should
538         # become real, long-term allocation, and grows to include the
539         # overhead.
540         for bw in writers2.values():
541             bw.remote_write(0, "a"*25)
542             bw.remote_close()
543         del already2
544         del writers2
545         del bw
546         self.failUnlessEqual(len(ss._active_writers), 0)
547
548         allocated = 1001 + OVERHEAD + LEASE_SIZE
549
550         # we have to manually increase available, since we're not doing real
551         # disk measurements
552         mock_get_disk_stats.return_value = {
553             'free_for_nonroot': 15000 - allocated,
554             'avail': max(15000 - allocated - reserved_space, 0),
555             }
556
557         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
558         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
559         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
560         self.failUnlessEqual(len(writers3), 39)
561         self.failUnlessEqual(len(ss._active_writers), 39)
562
563         del already3
564         del writers3
565         self.failUnlessEqual(len(ss._active_writers), 0)
566         ss.disownServiceParent()
567         del ss
568
569     def test_seek(self):
570         basedir = self.workdir("test_seek_behavior")
571         fileutil.make_dirs(basedir)
572         filename = os.path.join(basedir, "testfile")
573         f = open(filename, "wb")
574         f.write("start")
575         f.close()
576         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
577         # files. mode="a" preserves previous contents but does not allow
578         # seeking-to-create-holes. mode="r+" allows both.
579         f = open(filename, "rb+")
580         f.seek(100)
581         f.write("100")
582         f.close()
583         filelen = os.stat(filename)[stat.ST_SIZE]
584         self.failUnlessEqual(filelen, 100+3)
585         f2 = open(filename, "rb")
586         self.failUnlessEqual(f2.read(5), "start")
587
588
589     def test_leases(self):
590         ss = self.create("test_leases")
591         canary = FakeCanary()
592         sharenums = range(5)
593         size = 100
594
595         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
596                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
597         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
598                                                      sharenums, size, canary)
599         self.failUnlessEqual(len(already), 0)
600         self.failUnlessEqual(len(writers), 5)
601         for wb in writers.values():
602             wb.remote_close()
603
604         leases = list(ss.get_leases("si0"))
605         self.failUnlessEqual(len(leases), 1)
606         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
607
608         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
609                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
610         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
611                                                      sharenums, size, canary)
612         for wb in writers.values():
613             wb.remote_close()
614
615         # take out a second lease on si1
616         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
617                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
618         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
619                                                      sharenums, size, canary)
620         self.failUnlessEqual(len(already), 5)
621         self.failUnlessEqual(len(writers), 0)
622
623         leases = list(ss.get_leases("si1"))
624         self.failUnlessEqual(len(leases), 2)
625         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
626
627         # and a third lease, using add-lease
628         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
629                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
630         ss.remote_add_lease("si1", rs2a, cs2a)
631         leases = list(ss.get_leases("si1"))
632         self.failUnlessEqual(len(leases), 3)
633         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
634
635         # add-lease on a missing storage index is silently ignored
636         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
637
638         # check that si0 is readable
639         readers = ss.remote_get_buckets("si0")
640         self.failUnlessEqual(len(readers), 5)
641
642         # renew the first lease. Only the proper renew_secret should work
643         ss.remote_renew_lease("si0", rs0)
644         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
645         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
646
647         # check that si0 is still readable
648         readers = ss.remote_get_buckets("si0")
649         self.failUnlessEqual(len(readers), 5)
650
651         # There is no such method as remote_cancel_lease for now -- see
652         # ticket #1528.
653         self.failIf(hasattr(ss, 'remote_cancel_lease'), \
654                         "ss should not have a 'remote_cancel_lease' method/attribute")
655
656         # test overlapping uploads
657         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
658                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
659         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
660                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
661         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
662                                                      sharenums, size, canary)
663         self.failUnlessEqual(len(already), 0)
664         self.failUnlessEqual(len(writers), 5)
665         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
666                                                        sharenums, size, canary)
667         self.failUnlessEqual(len(already2), 0)
668         self.failUnlessEqual(len(writers2), 0)
669         for wb in writers.values():
670             wb.remote_close()
671
672         leases = list(ss.get_leases("si3"))
673         self.failUnlessEqual(len(leases), 1)
674
675         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
676                                                        sharenums, size, canary)
677         self.failUnlessEqual(len(already3), 5)
678         self.failUnlessEqual(len(writers3), 0)
679
680         leases = list(ss.get_leases("si3"))
681         self.failUnlessEqual(len(leases), 2)
682
683     def test_readonly(self):
684         workdir = self.workdir("test_readonly")
685         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
686         ss.setServiceParent(self.sparent)
687
688         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
689         self.failUnlessEqual(already, set())
690         self.failUnlessEqual(writers, {})
691
692         stats = ss.get_stats()
693         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
694         if "storage_server.disk_avail" in stats:
695             # Some platforms may not have an API to get disk stats.
696             # But if there are stats, readonly_storage means disk_avail=0
697             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
698
699     def test_discard(self):
700         # discard is really only used for other tests, but we test it anyways
701         workdir = self.workdir("test_discard")
702         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
703         ss.setServiceParent(self.sparent)
704
705         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
706         self.failUnlessEqual(already, set())
707         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
708         for i,wb in writers.items():
709             wb.remote_write(0, "%25d" % i)
710             wb.remote_close()
711         # since we discard the data, the shares should be present but sparse.
712         # Since we write with some seeks, the data we read back will be all
713         # zeros.
714         b = ss.remote_get_buckets("vid")
715         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
716         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
717
718     def test_advise_corruption(self):
719         workdir = self.workdir("test_advise_corruption")
720         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
721         ss.setServiceParent(self.sparent)
722
723         si0_s = base32.b2a("si0")
724         ss.remote_advise_corrupt_share("immutable", "si0", 0,
725                                        "This share smells funny.\n")
726         reportdir = os.path.join(workdir, "corruption-advisories")
727         reports = os.listdir(reportdir)
728         self.failUnlessEqual(len(reports), 1)
729         report_si0 = reports[0]
730         self.failUnlessIn(si0_s, report_si0)
731         f = open(os.path.join(reportdir, report_si0), "r")
732         report = f.read()
733         f.close()
734         self.failUnlessIn("type: immutable", report)
735         self.failUnlessIn("storage_index: %s" % si0_s, report)
736         self.failUnlessIn("share_number: 0", report)
737         self.failUnlessIn("This share smells funny.", report)
738
739         # test the RIBucketWriter version too
740         si1_s = base32.b2a("si1")
741         already,writers = self.allocate(ss, "si1", [1], 75)
742         self.failUnlessEqual(already, set())
743         self.failUnlessEqual(set(writers.keys()), set([1]))
744         writers[1].remote_write(0, "data")
745         writers[1].remote_close()
746
747         b = ss.remote_get_buckets("si1")
748         self.failUnlessEqual(set(b.keys()), set([1]))
749         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
750
751         reports = os.listdir(reportdir)
752         self.failUnlessEqual(len(reports), 2)
753         report_si1 = [r for r in reports if si1_s in r][0]
754         f = open(os.path.join(reportdir, report_si1), "r")
755         report = f.read()
756         f.close()
757         self.failUnlessIn("type: immutable", report)
758         self.failUnlessIn("storage_index: %s" % si1_s, report)
759         self.failUnlessIn("share_number: 1", report)
760         self.failUnlessIn("This share tastes like dust.", report)
761
762
763
764 class MutableServer(unittest.TestCase):
765
766     def setUp(self):
767         self.sparent = LoggingServiceParent()
768         self._lease_secret = itertools.count()
769     def tearDown(self):
770         return self.sparent.stopService()
771
772     def workdir(self, name):
773         basedir = os.path.join("storage", "MutableServer", name)
774         return basedir
775
776     def create(self, name):
777         workdir = self.workdir(name)
778         ss = StorageServer(workdir, "\x00" * 20)
779         ss.setServiceParent(self.sparent)
780         return ss
781
782     def test_create(self):
783         self.create("test_create")
784
785     def write_enabler(self, we_tag):
786         return hashutil.tagged_hash("we_blah", we_tag)
787
788     def renew_secret(self, tag):
789         return hashutil.tagged_hash("renew_blah", str(tag))
790
791     def cancel_secret(self, tag):
792         return hashutil.tagged_hash("cancel_blah", str(tag))
793
794     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
795         write_enabler = self.write_enabler(we_tag)
796         renew_secret = self.renew_secret(lease_tag)
797         cancel_secret = self.cancel_secret(lease_tag)
798         rstaraw = ss.remote_slot_testv_and_readv_and_writev
799         testandwritev = dict( [ (shnum, ([], [], None) )
800                          for shnum in sharenums ] )
801         readv = []
802         rc = rstaraw(storage_index,
803                      (write_enabler, renew_secret, cancel_secret),
804                      testandwritev,
805                      readv)
806         (did_write, readv_data) = rc
807         self.failUnless(did_write)
808         self.failUnless(isinstance(readv_data, dict))
809         self.failUnlessEqual(len(readv_data), 0)
810
811     def test_bad_magic(self):
812         ss = self.create("test_bad_magic")
813         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
814         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
815         f = open(fn, "rb+")
816         f.seek(0)
817         f.write("BAD MAGIC")
818         f.close()
819         read = ss.remote_slot_readv
820         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
821                                   read, "si1", [0], [(0,10)])
822         self.failUnlessIn(" had magic ", str(e))
823         self.failUnlessIn(" but we wanted ", str(e))
824
825     def test_container_size(self):
826         ss = self.create("test_container_size")
827         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
828                       set([0,1,2]), 100)
829         read = ss.remote_slot_readv
830         rstaraw = ss.remote_slot_testv_and_readv_and_writev
831         secrets = ( self.write_enabler("we1"),
832                     self.renew_secret("we1"),
833                     self.cancel_secret("we1") )
834         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
835         answer = rstaraw("si1", secrets,
836                          {0: ([], [(0,data)], len(data)+12)},
837                          [])
838         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
839
840         # Trying to make the container too large (by sending a write vector
841         # whose offset is too high) will raise an exception.
842         TOOBIG = MutableShareFile.MAX_SIZE + 10
843         self.failUnlessRaises(DataTooLargeError,
844                               rstaraw, "si1", secrets,
845                               {0: ([], [(TOOBIG,data)], None)},
846                               [])
847
848         answer = rstaraw("si1", secrets,
849                          {0: ([], [(0,data)], None)},
850                          [])
851         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
852
853         read_answer = read("si1", [0], [(0,10)])
854         self.failUnlessEqual(read_answer, {0: [data[:10]]})
855
856         # Sending a new_length shorter than the current length truncates the
857         # data.
858         answer = rstaraw("si1", secrets,
859                          {0: ([], [], 9)},
860                          [])
861         read_answer = read("si1", [0], [(0,10)])
862         self.failUnlessEqual(read_answer, {0: [data[:9]]})
863
864         # Sending a new_length longer than the current length doesn't change
865         # the data.
866         answer = rstaraw("si1", secrets,
867                          {0: ([], [], 20)},
868                          [])
869         assert answer == (True, {0:[],1:[],2:[]})
870         read_answer = read("si1", [0], [(0, 20)])
871         self.failUnlessEqual(read_answer, {0: [data[:9]]})
872
873         # Sending a write vector whose start is after the end of the current
874         # data doesn't reveal "whatever was there last time" (palimpsest),
875         # but instead fills with zeroes.
876
877         # To test this, we fill the data area with a recognizable pattern.
878         pattern = ''.join([chr(i) for i in range(100)])
879         answer = rstaraw("si1", secrets,
880                          {0: ([], [(0, pattern)], None)},
881                          [])
882         assert answer == (True, {0:[],1:[],2:[]})
883         # Then truncate the data...
884         answer = rstaraw("si1", secrets,
885                          {0: ([], [], 20)},
886                          [])
887         assert answer == (True, {0:[],1:[],2:[]})
888         # Just confirm that you get an empty string if you try to read from
889         # past the (new) endpoint now.
890         answer = rstaraw("si1", secrets,
891                          {0: ([], [], None)},
892                          [(20, 1980)])
893         self.failUnlessEqual(answer, (True, {0:[''],1:[''],2:['']}))
894
895         # Then the extend the file by writing a vector which starts out past
896         # the end...
897         answer = rstaraw("si1", secrets,
898                          {0: ([], [(50, 'hellothere')], None)},
899                          [])
900         assert answer == (True, {0:[],1:[],2:[]})
901         # Now if you read the stuff between 20 (where we earlier truncated)
902         # and 50, it had better be all zeroes.
903         answer = rstaraw("si1", secrets,
904                          {0: ([], [], None)},
905                          [(20, 30)])
906         self.failUnlessEqual(answer, (True, {0:['\x00'*30],1:[''],2:['']}))
907
908         # Also see if the server explicitly declares that it supports this
909         # feature.
910         ver = ss.remote_get_version()
911         storage_v1_ver = ver["http://allmydata.org/tahoe/protocols/storage/v1"]
912         self.failUnless(storage_v1_ver.get("fills-holes-with-zero-bytes"))
913
914         # If the size is dropped to zero the share is deleted.
915         answer = rstaraw("si1", secrets,
916                          {0: ([], [(0,data)], 0)},
917                          [])
918         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
919
920         read_answer = read("si1", [0], [(0,10)])
921         self.failUnlessEqual(read_answer, {})
922
923     def test_allocate(self):
924         ss = self.create("test_allocate")
925         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
926                       set([0,1,2]), 100)
927
928         read = ss.remote_slot_readv
929         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
930                              {0: [""]})
931         self.failUnlessEqual(read("si1", [], [(0, 10)]),
932                              {0: [""], 1: [""], 2: [""]})
933         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
934                              {0: [""]})
935
936         # try writing to one
937         secrets = ( self.write_enabler("we1"),
938                     self.renew_secret("we1"),
939                     self.cancel_secret("we1") )
940         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
941         write = ss.remote_slot_testv_and_readv_and_writev
942         answer = write("si1", secrets,
943                        {0: ([], [(0,data)], None)},
944                        [])
945         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
946
947         self.failUnlessEqual(read("si1", [0], [(0,20)]),
948                              {0: ["00000000001111111111"]})
949         self.failUnlessEqual(read("si1", [0], [(95,10)]),
950                              {0: ["99999"]})
951         #self.failUnlessEqual(s0.remote_get_length(), 100)
952
953         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
954         f = self.failUnlessRaises(BadWriteEnablerError,
955                                   write, "si1", bad_secrets,
956                                   {}, [])
957         self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
958
959         # this testv should fail
960         answer = write("si1", secrets,
961                        {0: ([(0, 12, "eq", "444444444444"),
962                              (20, 5, "eq", "22222"),
963                              ],
964                             [(0, "x"*100)],
965                             None),
966                         },
967                        [(0,12), (20,5)],
968                        )
969         self.failUnlessEqual(answer, (False,
970                                       {0: ["000000000011", "22222"],
971                                        1: ["", ""],
972                                        2: ["", ""],
973                                        }))
974         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
975
976         # as should this one
977         answer = write("si1", secrets,
978                        {0: ([(10, 5, "lt", "11111"),
979                              ],
980                             [(0, "x"*100)],
981                             None),
982                         },
983                        [(10,5)],
984                        )
985         self.failUnlessEqual(answer, (False,
986                                       {0: ["11111"],
987                                        1: [""],
988                                        2: [""]},
989                                       ))
990         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
991
992
993     def test_operators(self):
994         # test operators, the data we're comparing is '11111' in all cases.
995         # test both fail+pass, reset data after each one.
996         ss = self.create("test_operators")
997
998         secrets = ( self.write_enabler("we1"),
999                     self.renew_secret("we1"),
1000                     self.cancel_secret("we1") )
1001         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1002         write = ss.remote_slot_testv_and_readv_and_writev
1003         read = ss.remote_slot_readv
1004
1005         def reset():
1006             write("si1", secrets,
1007                   {0: ([], [(0,data)], None)},
1008                   [])
1009
1010         reset()
1011
1012         #  lt
1013         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
1014                                              ],
1015                                             [(0, "x"*100)],
1016                                             None,
1017                                             )}, [(10,5)])
1018         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1019         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1020         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
1021         reset()
1022
1023         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
1024                                              ],
1025                                             [(0, "x"*100)],
1026                                             None,
1027                                             )}, [(10,5)])
1028         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1029         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1030         reset()
1031
1032         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
1033                                              ],
1034                                             [(0, "y"*100)],
1035                                             None,
1036                                             )}, [(10,5)])
1037         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1038         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1039         reset()
1040
1041         #  le
1042         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
1043                                              ],
1044                                             [(0, "x"*100)],
1045                                             None,
1046                                             )}, [(10,5)])
1047         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1048         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1049         reset()
1050
1051         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
1052                                              ],
1053                                             [(0, "y"*100)],
1054                                             None,
1055                                             )}, [(10,5)])
1056         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1057         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1058         reset()
1059
1060         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
1061                                              ],
1062                                             [(0, "y"*100)],
1063                                             None,
1064                                             )}, [(10,5)])
1065         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1066         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1067         reset()
1068
1069         #  eq
1070         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
1071                                              ],
1072                                             [(0, "x"*100)],
1073                                             None,
1074                                             )}, [(10,5)])
1075         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1076         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1077         reset()
1078
1079         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
1080                                              ],
1081                                             [(0, "y"*100)],
1082                                             None,
1083                                             )}, [(10,5)])
1084         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1085         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1086         reset()
1087
1088         #  ne
1089         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
1090                                              ],
1091                                             [(0, "x"*100)],
1092                                             None,
1093                                             )}, [(10,5)])
1094         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1095         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1096         reset()
1097
1098         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1099                                              ],
1100                                             [(0, "y"*100)],
1101                                             None,
1102                                             )}, [(10,5)])
1103         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1104         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1105         reset()
1106
1107         #  ge
1108         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1109                                              ],
1110                                             [(0, "y"*100)],
1111                                             None,
1112                                             )}, [(10,5)])
1113         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1114         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1115         reset()
1116
1117         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1118                                              ],
1119                                             [(0, "y"*100)],
1120                                             None,
1121                                             )}, [(10,5)])
1122         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1123         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1124         reset()
1125
1126         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1127                                              ],
1128                                             [(0, "y"*100)],
1129                                             None,
1130                                             )}, [(10,5)])
1131         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1132         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1133         reset()
1134
1135         #  gt
1136         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1137                                              ],
1138                                             [(0, "y"*100)],
1139                                             None,
1140                                             )}, [(10,5)])
1141         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1142         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1143         reset()
1144
1145         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1146                                              ],
1147                                             [(0, "x"*100)],
1148                                             None,
1149                                             )}, [(10,5)])
1150         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1151         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1152         reset()
1153
1154         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1155                                              ],
1156                                             [(0, "x"*100)],
1157                                             None,
1158                                             )}, [(10,5)])
1159         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1160         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1161         reset()
1162
1163         # finally, test some operators against empty shares
1164         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1165                                              ],
1166                                             [(0, "x"*100)],
1167                                             None,
1168                                             )}, [(10,5)])
1169         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1170         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1171         reset()
1172
1173     def test_readv(self):
1174         ss = self.create("test_readv")
1175         secrets = ( self.write_enabler("we1"),
1176                     self.renew_secret("we1"),
1177                     self.cancel_secret("we1") )
1178         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1179         write = ss.remote_slot_testv_and_readv_and_writev
1180         read = ss.remote_slot_readv
1181         data = [("%d" % i) * 100 for i in range(3)]
1182         rc = write("si1", secrets,
1183                    {0: ([], [(0,data[0])], None),
1184                     1: ([], [(0,data[1])], None),
1185                     2: ([], [(0,data[2])], None),
1186                     }, [])
1187         self.failUnlessEqual(rc, (True, {}))
1188
1189         answer = read("si1", [], [(0, 10)])
1190         self.failUnlessEqual(answer, {0: ["0"*10],
1191                                       1: ["1"*10],
1192                                       2: ["2"*10]})
1193
1194     def compare_leases_without_timestamps(self, leases_a, leases_b):
1195         self.failUnlessEqual(len(leases_a), len(leases_b))
1196         for i in range(len(leases_a)):
1197             a = leases_a[i]
1198             b = leases_b[i]
1199             self.failUnlessEqual(a.owner_num,       b.owner_num)
1200             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1201             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1202             self.failUnlessEqual(a.nodeid,          b.nodeid)
1203
1204     def compare_leases(self, leases_a, leases_b):
1205         self.failUnlessEqual(len(leases_a), len(leases_b))
1206         for i in range(len(leases_a)):
1207             a = leases_a[i]
1208             b = leases_b[i]
1209             self.failUnlessEqual(a.owner_num,       b.owner_num)
1210             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1211             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1212             self.failUnlessEqual(a.nodeid,          b.nodeid)
1213             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1214
1215     def test_leases(self):
1216         ss = self.create("test_leases")
1217         def secrets(n):
1218             return ( self.write_enabler("we1"),
1219                      self.renew_secret("we1-%d" % n),
1220                      self.cancel_secret("we1-%d" % n) )
1221         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1222         write = ss.remote_slot_testv_and_readv_and_writev
1223         read = ss.remote_slot_readv
1224         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1225         self.failUnlessEqual(rc, (True, {}))
1226
1227         # create a random non-numeric file in the bucket directory, to
1228         # exercise the code that's supposed to ignore those.
1229         bucket_dir = os.path.join(self.workdir("test_leases"),
1230                                   "shares", storage_index_to_dir("si1"))
1231         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1232         f.write("you ought to be ignoring me\n")
1233         f.close()
1234
1235         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1236         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1237
1238         # add-lease on a missing storage index is silently ignored
1239         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1240
1241         # re-allocate the slots and use the same secrets, that should update
1242         # the lease
1243         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1244         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1245
1246         # renew it directly
1247         ss.remote_renew_lease("si1", secrets(0)[1])
1248         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1249
1250         # now allocate them with a bunch of different secrets, to trigger the
1251         # extended lease code. Use add_lease for one of them.
1252         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1253         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1254         secrets2 = secrets(2)
1255         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1256         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1257         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1258         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1259         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1260
1261         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1262
1263         all_leases = list(s0.get_leases())
1264         # and write enough data to expand the container, forcing the server
1265         # to move the leases
1266         write("si1", secrets(0),
1267               {0: ([], [(0,data)], 200), },
1268               [])
1269
1270         # read back the leases, make sure they're still intact.
1271         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1272
1273         ss.remote_renew_lease("si1", secrets(0)[1])
1274         ss.remote_renew_lease("si1", secrets(1)[1])
1275         ss.remote_renew_lease("si1", secrets(2)[1])
1276         ss.remote_renew_lease("si1", secrets(3)[1])
1277         ss.remote_renew_lease("si1", secrets(4)[1])
1278         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1279         # get a new copy of the leases, with the current timestamps. Reading
1280         # data and failing to renew/cancel leases should leave the timestamps
1281         # alone.
1282         all_leases = list(s0.get_leases())
1283         # renewing with a bogus token should prompt an error message
1284
1285         # examine the exception thus raised, make sure the old nodeid is
1286         # present, to provide for share migration
1287         e = self.failUnlessRaises(IndexError,
1288                                   ss.remote_renew_lease, "si1",
1289                                   secrets(20)[1])
1290         e_s = str(e)
1291         self.failUnlessIn("Unable to renew non-existent lease", e_s)
1292         self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1293         self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1294
1295         self.compare_leases(all_leases, list(s0.get_leases()))
1296
1297         # reading shares should not modify the timestamp
1298         read("si1", [], [(0,200)])
1299         self.compare_leases(all_leases, list(s0.get_leases()))
1300
1301         write("si1", secrets(0),
1302               {0: ([], [(200, "make me bigger")], None)}, [])
1303         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1304
1305         write("si1", secrets(0),
1306               {0: ([], [(500, "make me really bigger")], None)}, [])
1307         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1308
1309     def test_remove(self):
1310         ss = self.create("test_remove")
1311         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1312                       set([0,1,2]), 100)
1313         readv = ss.remote_slot_readv
1314         writev = ss.remote_slot_testv_and_readv_and_writev
1315         secrets = ( self.write_enabler("we1"),
1316                     self.renew_secret("we1"),
1317                     self.cancel_secret("we1") )
1318         # delete sh0 by setting its size to zero
1319         answer = writev("si1", secrets,
1320                         {0: ([], [], 0)},
1321                         [])
1322         # the answer should mention all the shares that existed before the
1323         # write
1324         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1325         # but a new read should show only sh1 and sh2
1326         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1327                              {1: [""], 2: [""]})
1328
1329         # delete sh1 by setting its size to zero
1330         answer = writev("si1", secrets,
1331                         {1: ([], [], 0)},
1332                         [])
1333         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1334         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1335                              {2: [""]})
1336
1337         # delete sh2 by setting its size to zero
1338         answer = writev("si1", secrets,
1339                         {2: ([], [], 0)},
1340                         [])
1341         self.failUnlessEqual(answer, (True, {2:[]}) )
1342         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1343                              {})
1344         # and the bucket directory should now be gone
1345         si = base32.b2a("si1")
1346         # note: this is a detail of the storage server implementation, and
1347         # may change in the future
1348         prefix = si[:2]
1349         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1350         bucketdir = os.path.join(prefixdir, si)
1351         self.failUnless(os.path.exists(prefixdir), prefixdir)
1352         self.failIf(os.path.exists(bucketdir), bucketdir)
1353
1354
1355 class MDMFProxies(unittest.TestCase, ShouldFailMixin):
1356     def setUp(self):
1357         self.sparent = LoggingServiceParent()
1358         self._lease_secret = itertools.count()
1359         self.ss = self.create("MDMFProxies storage test server")
1360         self.rref = RemoteBucket()
1361         self.rref.target = self.ss
1362         self.secrets = (self.write_enabler("we_secret"),
1363                         self.renew_secret("renew_secret"),
1364                         self.cancel_secret("cancel_secret"))
1365         self.segment = "aaaaaa"
1366         self.block = "aa"
1367         self.salt = "a" * 16
1368         self.block_hash = "a" * 32
1369         self.block_hash_tree = [self.block_hash for i in xrange(6)]
1370         self.share_hash = self.block_hash
1371         self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
1372         self.signature = "foobarbaz"
1373         self.verification_key = "vvvvvv"
1374         self.encprivkey = "private"
1375         self.root_hash = self.block_hash
1376         self.salt_hash = self.root_hash
1377         self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
1378         self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
1379         self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
1380         # blockhashes and salt hashes are serialized in the same way,
1381         # only we lop off the first element and store that in the
1382         # header.
1383         self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
1384
1385
1386     def tearDown(self):
1387         self.sparent.stopService()
1388         shutil.rmtree(self.workdir("MDMFProxies storage test server"))
1389
1390
1391     def write_enabler(self, we_tag):
1392         return hashutil.tagged_hash("we_blah", we_tag)
1393
1394
1395     def renew_secret(self, tag):
1396         return hashutil.tagged_hash("renew_blah", str(tag))
1397
1398
1399     def cancel_secret(self, tag):
1400         return hashutil.tagged_hash("cancel_blah", str(tag))
1401
1402
1403     def workdir(self, name):
1404         basedir = os.path.join("storage", "MutableServer", name)
1405         return basedir
1406
1407
1408     def create(self, name):
1409         workdir = self.workdir(name)
1410         ss = StorageServer(workdir, "\x00" * 20)
1411         ss.setServiceParent(self.sparent)
1412         return ss
1413
1414
1415     def build_test_mdmf_share(self, tail_segment=False, empty=False):
1416         # Start with the checkstring
1417         data = struct.pack(">BQ32s",
1418                            1,
1419                            0,
1420                            self.root_hash)
1421         self.checkstring = data
1422         # Next, the encoding parameters
1423         if tail_segment:
1424             data += struct.pack(">BBQQ",
1425                                 3,
1426                                 10,
1427                                 6,
1428                                 33)
1429         elif empty:
1430             data += struct.pack(">BBQQ",
1431                                 3,
1432                                 10,
1433                                 0,
1434                                 0)
1435         else:
1436             data += struct.pack(">BBQQ",
1437                                 3,
1438                                 10,
1439                                 6,
1440                                 36)
1441         # Now we'll build the offsets.
1442         sharedata = ""
1443         if not tail_segment and not empty:
1444             for i in xrange(6):
1445                 sharedata += self.salt + self.block
1446         elif tail_segment:
1447             for i in xrange(5):
1448                 sharedata += self.salt + self.block
1449             sharedata += self.salt + "a"
1450
1451         # The encrypted private key comes after the shares + salts
1452         offset_size = struct.calcsize(MDMFOFFSETS)
1453         encrypted_private_key_offset = len(data) + offset_size
1454         # The share has chain comes after the private key
1455         sharehashes_offset = encrypted_private_key_offset + \
1456             len(self.encprivkey)
1457
1458         # The signature comes after the share hash chain.
1459         signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
1460
1461         verification_key_offset = signature_offset + len(self.signature)
1462         verification_key_end = verification_key_offset + \
1463             len(self.verification_key)
1464
1465         share_data_offset = offset_size
1466         share_data_offset += PRIVATE_KEY_SIZE
1467         share_data_offset += SIGNATURE_SIZE
1468         share_data_offset += VERIFICATION_KEY_SIZE
1469         share_data_offset += SHARE_HASH_CHAIN_SIZE
1470
1471         blockhashes_offset = share_data_offset + len(sharedata)
1472         eof_offset = blockhashes_offset + len(self.block_hash_tree_s)
1473
1474         data += struct.pack(MDMFOFFSETS,
1475                             encrypted_private_key_offset,
1476                             sharehashes_offset,
1477                             signature_offset,
1478                             verification_key_offset,
1479                             verification_key_end,
1480                             share_data_offset,
1481                             blockhashes_offset,
1482                             eof_offset)
1483
1484         self.offsets = {}
1485         self.offsets['enc_privkey'] = encrypted_private_key_offset
1486         self.offsets['block_hash_tree'] = blockhashes_offset
1487         self.offsets['share_hash_chain'] = sharehashes_offset
1488         self.offsets['signature'] = signature_offset
1489         self.offsets['verification_key'] = verification_key_offset
1490         self.offsets['share_data'] = share_data_offset
1491         self.offsets['verification_key_end'] = verification_key_end
1492         self.offsets['EOF'] = eof_offset
1493
1494         # the private key,
1495         data += self.encprivkey
1496         # the sharehashes
1497         data += self.share_hash_chain_s
1498         # the signature,
1499         data += self.signature
1500         # and the verification key
1501         data += self.verification_key
1502         # Then we'll add in gibberish until we get to the right point.
1503         nulls = "".join([" " for i in xrange(len(data), share_data_offset)])
1504         data += nulls
1505
1506         # Then the share data
1507         data += sharedata
1508         # the blockhashes
1509         data += self.block_hash_tree_s
1510         return data
1511
1512
1513     def write_test_share_to_server(self,
1514                                    storage_index,
1515                                    tail_segment=False,
1516                                    empty=False):
1517         """
1518         I write some data for the read tests to read to self.ss
1519
1520         If tail_segment=True, then I will write a share that has a
1521         smaller tail segment than other segments.
1522         """
1523         write = self.ss.remote_slot_testv_and_readv_and_writev
1524         data = self.build_test_mdmf_share(tail_segment, empty)
1525         # Finally, we write the whole thing to the storage server in one
1526         # pass.
1527         testvs = [(0, 1, "eq", "")]
1528         tws = {}
1529         tws[0] = (testvs, [(0, data)], None)
1530         readv = [(0, 1)]
1531         results = write(storage_index, self.secrets, tws, readv)
1532         self.failUnless(results[0])
1533
1534
1535     def build_test_sdmf_share(self, empty=False):
1536         if empty:
1537             sharedata = ""
1538         else:
1539             sharedata = self.segment * 6
1540         self.sharedata = sharedata
1541         blocksize = len(sharedata) / 3
1542         block = sharedata[:blocksize]
1543         self.blockdata = block
1544         prefix = struct.pack(">BQ32s16s BBQQ",
1545                              0, # version,
1546                              0,
1547                              self.root_hash,
1548                              self.salt,
1549                              3,
1550                              10,
1551                              len(sharedata),
1552                              len(sharedata),
1553                             )
1554         post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
1555         signature_offset = post_offset + len(self.verification_key)
1556         sharehashes_offset = signature_offset + len(self.signature)
1557         blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
1558         sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
1559         encprivkey_offset = sharedata_offset + len(block)
1560         eof_offset = encprivkey_offset + len(self.encprivkey)
1561         offsets = struct.pack(">LLLLQQ",
1562                               signature_offset,
1563                               sharehashes_offset,
1564                               blockhashes_offset,
1565                               sharedata_offset,
1566                               encprivkey_offset,
1567                               eof_offset)
1568         final_share = "".join([prefix,
1569                            offsets,
1570                            self.verification_key,
1571                            self.signature,
1572                            self.share_hash_chain_s,
1573                            self.block_hash_tree_s,
1574                            block,
1575                            self.encprivkey])
1576         self.offsets = {}
1577         self.offsets['signature'] = signature_offset
1578         self.offsets['share_hash_chain'] = sharehashes_offset
1579         self.offsets['block_hash_tree'] = blockhashes_offset
1580         self.offsets['share_data'] = sharedata_offset
1581         self.offsets['enc_privkey'] = encprivkey_offset
1582         self.offsets['EOF'] = eof_offset
1583         return final_share
1584
1585
1586     def write_sdmf_share_to_server(self,
1587                                    storage_index,
1588                                    empty=False):
1589         # Some tests need SDMF shares to verify that we can still 
1590         # read them. This method writes one, which resembles but is not
1591         assert self.rref
1592         write = self.ss.remote_slot_testv_and_readv_and_writev
1593         share = self.build_test_sdmf_share(empty)
1594         testvs = [(0, 1, "eq", "")]
1595         tws = {}
1596         tws[0] = (testvs, [(0, share)], None)
1597         readv = []
1598         results = write(storage_index, self.secrets, tws, readv)
1599         self.failUnless(results[0])
1600
1601
1602     def test_read(self):
1603         self.write_test_share_to_server("si1")
1604         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1605         # Check that every method equals what we expect it to.
1606         d = defer.succeed(None)
1607         def _check_block_and_salt((block, salt)):
1608             self.failUnlessEqual(block, self.block)
1609             self.failUnlessEqual(salt, self.salt)
1610
1611         for i in xrange(6):
1612             d.addCallback(lambda ignored, i=i:
1613                 mr.get_block_and_salt(i))
1614             d.addCallback(_check_block_and_salt)
1615
1616         d.addCallback(lambda ignored:
1617             mr.get_encprivkey())
1618         d.addCallback(lambda encprivkey:
1619             self.failUnlessEqual(self.encprivkey, encprivkey))
1620
1621         d.addCallback(lambda ignored:
1622             mr.get_blockhashes())
1623         d.addCallback(lambda blockhashes:
1624             self.failUnlessEqual(self.block_hash_tree, blockhashes))
1625
1626         d.addCallback(lambda ignored:
1627             mr.get_sharehashes())
1628         d.addCallback(lambda sharehashes:
1629             self.failUnlessEqual(self.share_hash_chain, sharehashes))
1630
1631         d.addCallback(lambda ignored:
1632             mr.get_signature())
1633         d.addCallback(lambda signature:
1634             self.failUnlessEqual(signature, self.signature))
1635
1636         d.addCallback(lambda ignored:
1637             mr.get_verification_key())
1638         d.addCallback(lambda verification_key:
1639             self.failUnlessEqual(verification_key, self.verification_key))
1640
1641         d.addCallback(lambda ignored:
1642             mr.get_seqnum())
1643         d.addCallback(lambda seqnum:
1644             self.failUnlessEqual(seqnum, 0))
1645
1646         d.addCallback(lambda ignored:
1647             mr.get_root_hash())
1648         d.addCallback(lambda root_hash:
1649             self.failUnlessEqual(self.root_hash, root_hash))
1650
1651         d.addCallback(lambda ignored:
1652             mr.get_seqnum())
1653         d.addCallback(lambda seqnum:
1654             self.failUnlessEqual(0, seqnum))
1655
1656         d.addCallback(lambda ignored:
1657             mr.get_encoding_parameters())
1658         def _check_encoding_parameters((k, n, segsize, datalen)):
1659             self.failUnlessEqual(k, 3)
1660             self.failUnlessEqual(n, 10)
1661             self.failUnlessEqual(segsize, 6)
1662             self.failUnlessEqual(datalen, 36)
1663         d.addCallback(_check_encoding_parameters)
1664
1665         d.addCallback(lambda ignored:
1666             mr.get_checkstring())
1667         d.addCallback(lambda checkstring:
1668             self.failUnlessEqual(checkstring, checkstring))
1669         return d
1670
1671
1672     def test_read_with_different_tail_segment_size(self):
1673         self.write_test_share_to_server("si1", tail_segment=True)
1674         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1675         d = mr.get_block_and_salt(5)
1676         def _check_tail_segment(results):
1677             block, salt = results
1678             self.failUnlessEqual(len(block), 1)
1679             self.failUnlessEqual(block, "a")
1680         d.addCallback(_check_tail_segment)
1681         return d
1682
1683
1684     def test_get_block_with_invalid_segnum(self):
1685         self.write_test_share_to_server("si1")
1686         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1687         d = defer.succeed(None)
1688         d.addCallback(lambda ignored:
1689             self.shouldFail(LayoutInvalid, "test invalid segnum",
1690                             None,
1691                             mr.get_block_and_salt, 7))
1692         return d
1693
1694
1695     def test_get_encoding_parameters_first(self):
1696         self.write_test_share_to_server("si1")
1697         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1698         d = mr.get_encoding_parameters()
1699         def _check_encoding_parameters((k, n, segment_size, datalen)):
1700             self.failUnlessEqual(k, 3)
1701             self.failUnlessEqual(n, 10)
1702             self.failUnlessEqual(segment_size, 6)
1703             self.failUnlessEqual(datalen, 36)
1704         d.addCallback(_check_encoding_parameters)
1705         return d
1706
1707
1708     def test_get_seqnum_first(self):
1709         self.write_test_share_to_server("si1")
1710         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1711         d = mr.get_seqnum()
1712         d.addCallback(lambda seqnum:
1713             self.failUnlessEqual(seqnum, 0))
1714         return d
1715
1716
1717     def test_get_root_hash_first(self):
1718         self.write_test_share_to_server("si1")
1719         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1720         d = mr.get_root_hash()
1721         d.addCallback(lambda root_hash:
1722             self.failUnlessEqual(root_hash, self.root_hash))
1723         return d
1724
1725
1726     def test_get_checkstring_first(self):
1727         self.write_test_share_to_server("si1")
1728         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1729         d = mr.get_checkstring()
1730         d.addCallback(lambda checkstring:
1731             self.failUnlessEqual(checkstring, self.checkstring))
1732         return d
1733
1734
1735     def test_write_read_vectors(self):
1736         # When writing for us, the storage server will return to us a
1737         # read vector, along with its result. If a write fails because
1738         # the test vectors failed, this read vector can help us to
1739         # diagnose the problem. This test ensures that the read vector
1740         # is working appropriately.
1741         mw = self._make_new_mw("si1", 0)
1742
1743         for i in xrange(6):
1744             mw.put_block(self.block, i, self.salt)
1745         mw.put_encprivkey(self.encprivkey)
1746         mw.put_blockhashes(self.block_hash_tree)
1747         mw.put_sharehashes(self.share_hash_chain)
1748         mw.put_root_hash(self.root_hash)
1749         mw.put_signature(self.signature)
1750         mw.put_verification_key(self.verification_key)
1751         d = mw.finish_publishing()
1752         def _then(results):
1753             self.failUnless(len(results), 2)
1754             result, readv = results
1755             self.failUnless(result)
1756             self.failIf(readv)
1757             self.old_checkstring = mw.get_checkstring()
1758             mw.set_checkstring("")
1759         d.addCallback(_then)
1760         d.addCallback(lambda ignored:
1761             mw.finish_publishing())
1762         def _then_again(results):
1763             self.failUnlessEqual(len(results), 2)
1764             result, readvs = results
1765             self.failIf(result)
1766             self.failUnlessIn(0, readvs)
1767             readv = readvs[0][0]
1768             self.failUnlessEqual(readv, self.old_checkstring)
1769         d.addCallback(_then_again)
1770         # The checkstring remains the same for the rest of the process.
1771         return d
1772
1773
1774     def test_private_key_after_share_hash_chain(self):
1775         mw = self._make_new_mw("si1", 0)
1776         d = defer.succeed(None)
1777         for i in xrange(6):
1778             d.addCallback(lambda ignored, i=i:
1779                 mw.put_block(self.block, i, self.salt))
1780         d.addCallback(lambda ignored:
1781             mw.put_encprivkey(self.encprivkey))
1782         d.addCallback(lambda ignored:
1783             mw.put_sharehashes(self.share_hash_chain))
1784
1785         # Now try to put the private key again.
1786         d.addCallback(lambda ignored:
1787             self.shouldFail(LayoutInvalid, "test repeat private key",
1788                             None,
1789                             mw.put_encprivkey, self.encprivkey))
1790         return d
1791
1792
1793     def test_signature_after_verification_key(self):
1794         mw = self._make_new_mw("si1", 0)
1795         d = defer.succeed(None)
1796         # Put everything up to and including the verification key.
1797         for i in xrange(6):
1798             d.addCallback(lambda ignored, i=i:
1799                 mw.put_block(self.block, i, self.salt))
1800         d.addCallback(lambda ignored:
1801             mw.put_encprivkey(self.encprivkey))
1802         d.addCallback(lambda ignored:
1803             mw.put_blockhashes(self.block_hash_tree))
1804         d.addCallback(lambda ignored:
1805             mw.put_sharehashes(self.share_hash_chain))
1806         d.addCallback(lambda ignored:
1807             mw.put_root_hash(self.root_hash))
1808         d.addCallback(lambda ignored:
1809             mw.put_signature(self.signature))
1810         d.addCallback(lambda ignored:
1811             mw.put_verification_key(self.verification_key))
1812         # Now try to put the signature again. This should fail
1813         d.addCallback(lambda ignored:
1814             self.shouldFail(LayoutInvalid, "signature after verification",
1815                             None,
1816                             mw.put_signature, self.signature))
1817         return d
1818
1819
1820     def test_uncoordinated_write(self):
1821         # Make two mutable writers, both pointing to the same storage
1822         # server, both at the same storage index, and try writing to the
1823         # same share.
1824         mw1 = self._make_new_mw("si1", 0)
1825         mw2 = self._make_new_mw("si1", 0)
1826
1827         def _check_success(results):
1828             result, readvs = results
1829             self.failUnless(result)
1830
1831         def _check_failure(results):
1832             result, readvs = results
1833             self.failIf(result)
1834
1835         def _write_share(mw):
1836             for i in xrange(6):
1837                 mw.put_block(self.block, i, self.salt)
1838             mw.put_encprivkey(self.encprivkey)
1839             mw.put_blockhashes(self.block_hash_tree)
1840             mw.put_sharehashes(self.share_hash_chain)
1841             mw.put_root_hash(self.root_hash)
1842             mw.put_signature(self.signature)
1843             mw.put_verification_key(self.verification_key)
1844             return mw.finish_publishing()
1845         d = _write_share(mw1)
1846         d.addCallback(_check_success)
1847         d.addCallback(lambda ignored:
1848             _write_share(mw2))
1849         d.addCallback(_check_failure)
1850         return d
1851
1852
1853     def test_invalid_salt_size(self):
1854         # Salts need to be 16 bytes in size. Writes that attempt to
1855         # write more or less than this should be rejected.
1856         mw = self._make_new_mw("si1", 0)
1857         invalid_salt = "a" * 17 # 17 bytes
1858         another_invalid_salt = "b" * 15 # 15 bytes
1859         d = defer.succeed(None)
1860         d.addCallback(lambda ignored:
1861             self.shouldFail(LayoutInvalid, "salt too big",
1862                             None,
1863                             mw.put_block, self.block, 0, invalid_salt))
1864         d.addCallback(lambda ignored:
1865             self.shouldFail(LayoutInvalid, "salt too small",
1866                             None,
1867                             mw.put_block, self.block, 0,
1868                             another_invalid_salt))
1869         return d
1870
1871
1872     def test_write_test_vectors(self):
1873         # If we give the write proxy a bogus test vector at 
1874         # any point during the process, it should fail to write when we 
1875         # tell it to write.
1876         def _check_failure(results):
1877             self.failUnlessEqual(len(results), 2)
1878             res, d = results
1879             self.failIf(res)
1880
1881         def _check_success(results):
1882             self.failUnlessEqual(len(results), 2)
1883             res, d = results
1884             self.failUnless(results)
1885
1886         mw = self._make_new_mw("si1", 0)
1887         mw.set_checkstring("this is a lie")
1888         for i in xrange(6):
1889             mw.put_block(self.block, i, self.salt)
1890         mw.put_encprivkey(self.encprivkey)
1891         mw.put_blockhashes(self.block_hash_tree)
1892         mw.put_sharehashes(self.share_hash_chain)
1893         mw.put_root_hash(self.root_hash)
1894         mw.put_signature(self.signature)
1895         mw.put_verification_key(self.verification_key)
1896         d = mw.finish_publishing()
1897         d.addCallback(_check_failure)
1898         d.addCallback(lambda ignored:
1899             mw.set_checkstring(""))
1900         d.addCallback(lambda ignored:
1901             mw.finish_publishing())
1902         d.addCallback(_check_success)
1903         return d
1904
1905
1906     def serialize_blockhashes(self, blockhashes):
1907         return "".join(blockhashes)
1908
1909
1910     def serialize_sharehashes(self, sharehashes):
1911         ret = "".join([struct.pack(">H32s", i, sharehashes[i])
1912                         for i in sorted(sharehashes.keys())])
1913         return ret
1914
1915
1916     def test_write(self):
1917         # This translates to a file with 6 6-byte segments, and with 2-byte
1918         # blocks.
1919         mw = self._make_new_mw("si1", 0)
1920         # Test writing some blocks.
1921         read = self.ss.remote_slot_readv
1922         expected_private_key_offset = struct.calcsize(MDMFHEADER)
1923         expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
1924                                     PRIVATE_KEY_SIZE + \
1925                                     SIGNATURE_SIZE + \
1926                                     VERIFICATION_KEY_SIZE + \
1927                                     SHARE_HASH_CHAIN_SIZE
1928         written_block_size = 2 + len(self.salt)
1929         written_block = self.block + self.salt
1930         for i in xrange(6):
1931             mw.put_block(self.block, i, self.salt)
1932
1933         mw.put_encprivkey(self.encprivkey)
1934         mw.put_blockhashes(self.block_hash_tree)
1935         mw.put_sharehashes(self.share_hash_chain)
1936         mw.put_root_hash(self.root_hash)
1937         mw.put_signature(self.signature)
1938         mw.put_verification_key(self.verification_key)
1939         d = mw.finish_publishing()
1940         def _check_publish(results):
1941             self.failUnlessEqual(len(results), 2)
1942             result, ign = results
1943             self.failUnless(result, "publish failed")
1944             for i in xrange(6):
1945                 self.failUnlessEqual(read("si1", [0], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]),
1946                                 {0: [written_block]})
1947
1948             self.failUnlessEqual(len(self.encprivkey), 7)
1949             self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]),
1950                                  {0: [self.encprivkey]})
1951
1952             expected_block_hash_offset = expected_sharedata_offset + \
1953                         (6 * written_block_size)
1954             self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6)
1955             self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]),
1956                                  {0: [self.block_hash_tree_s]})
1957
1958             expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey)
1959             self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
1960                                  {0: [self.share_hash_chain_s]})
1961
1962             self.failUnlessEqual(read("si1", [0], [(9, 32)]),
1963                                  {0: [self.root_hash]})
1964             expected_signature_offset = expected_share_hash_offset + \
1965                 len(self.share_hash_chain_s)
1966             self.failUnlessEqual(len(self.signature), 9)
1967             self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]),
1968                                  {0: [self.signature]})
1969
1970             expected_verification_key_offset = expected_signature_offset + len(self.signature)
1971             self.failUnlessEqual(len(self.verification_key), 6)
1972             self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]),
1973                                  {0: [self.verification_key]})
1974
1975             signable = mw.get_signable()
1976             verno, seq, roothash, k, n, segsize, datalen = \
1977                                             struct.unpack(">BQ32sBBQQ",
1978                                                           signable)
1979             self.failUnlessEqual(verno, 1)
1980             self.failUnlessEqual(seq, 0)
1981             self.failUnlessEqual(roothash, self.root_hash)
1982             self.failUnlessEqual(k, 3)
1983             self.failUnlessEqual(n, 10)
1984             self.failUnlessEqual(segsize, 6)
1985             self.failUnlessEqual(datalen, 36)
1986             expected_eof_offset = expected_block_hash_offset + \
1987                 len(self.block_hash_tree_s)
1988
1989             # Check the version number to make sure that it is correct.
1990             expected_version_number = struct.pack(">B", 1)
1991             self.failUnlessEqual(read("si1", [0], [(0, 1)]),
1992                                  {0: [expected_version_number]})
1993             # Check the sequence number to make sure that it is correct
1994             expected_sequence_number = struct.pack(">Q", 0)
1995             self.failUnlessEqual(read("si1", [0], [(1, 8)]),
1996                                  {0: [expected_sequence_number]})
1997             # Check that the encoding parameters (k, N, segement size, data
1998             # length) are what they should be. These are  3, 10, 6, 36
1999             expected_k = struct.pack(">B", 3)
2000             self.failUnlessEqual(read("si1", [0], [(41, 1)]),
2001                                  {0: [expected_k]})
2002             expected_n = struct.pack(">B", 10)
2003             self.failUnlessEqual(read("si1", [0], [(42, 1)]),
2004                                  {0: [expected_n]})
2005             expected_segment_size = struct.pack(">Q", 6)
2006             self.failUnlessEqual(read("si1", [0], [(43, 8)]),
2007                                  {0: [expected_segment_size]})
2008             expected_data_length = struct.pack(">Q", 36)
2009             self.failUnlessEqual(read("si1", [0], [(51, 8)]),
2010                                  {0: [expected_data_length]})
2011             expected_offset = struct.pack(">Q", expected_private_key_offset)
2012             self.failUnlessEqual(read("si1", [0], [(59, 8)]),
2013                                  {0: [expected_offset]})
2014             expected_offset = struct.pack(">Q", expected_share_hash_offset)
2015             self.failUnlessEqual(read("si1", [0], [(67, 8)]),
2016                                  {0: [expected_offset]})
2017             expected_offset = struct.pack(">Q", expected_signature_offset)
2018             self.failUnlessEqual(read("si1", [0], [(75, 8)]),
2019                                  {0: [expected_offset]})
2020             expected_offset = struct.pack(">Q", expected_verification_key_offset)
2021             self.failUnlessEqual(read("si1", [0], [(83, 8)]),
2022                                  {0: [expected_offset]})
2023             expected_offset = struct.pack(">Q", expected_verification_key_offset + len(self.verification_key))
2024             self.failUnlessEqual(read("si1", [0], [(91, 8)]),
2025                                  {0: [expected_offset]})
2026             expected_offset = struct.pack(">Q", expected_sharedata_offset)
2027             self.failUnlessEqual(read("si1", [0], [(99, 8)]),
2028                                  {0: [expected_offset]})
2029             expected_offset = struct.pack(">Q", expected_block_hash_offset)
2030             self.failUnlessEqual(read("si1", [0], [(107, 8)]),
2031                                  {0: [expected_offset]})
2032             expected_offset = struct.pack(">Q", expected_eof_offset)
2033             self.failUnlessEqual(read("si1", [0], [(115, 8)]),
2034                                  {0: [expected_offset]})
2035         d.addCallback(_check_publish)
2036         return d
2037
2038     def _make_new_mw(self, si, share, datalength=36):
2039         # This is a file of size 36 bytes. Since it has a segment
2040         # size of 6, we know that it has 6 byte segments, which will
2041         # be split into blocks of 2 bytes because our FEC k
2042         # parameter is 3.
2043         mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
2044                                 6, datalength)
2045         return mw
2046
2047
2048     def test_write_rejected_with_too_many_blocks(self):
2049         mw = self._make_new_mw("si0", 0)
2050
2051         # Try writing too many blocks. We should not be able to write
2052         # more than 6
2053         # blocks into each share.
2054         d = defer.succeed(None)
2055         for i in xrange(6):
2056             d.addCallback(lambda ignored, i=i:
2057                 mw.put_block(self.block, i, self.salt))
2058         d.addCallback(lambda ignored:
2059             self.shouldFail(LayoutInvalid, "too many blocks",
2060                             None,
2061                             mw.put_block, self.block, 7, self.salt))
2062         return d
2063
2064
2065     def test_write_rejected_with_invalid_salt(self):
2066         # Try writing an invalid salt. Salts are 16 bytes -- any more or
2067         # less should cause an error.
2068         mw = self._make_new_mw("si1", 0)
2069         bad_salt = "a" * 17 # 17 bytes
2070         d = defer.succeed(None)
2071         d.addCallback(lambda ignored:
2072             self.shouldFail(LayoutInvalid, "test_invalid_salt",
2073                             None, mw.put_block, self.block, 7, bad_salt))
2074         return d
2075
2076
2077     def test_write_rejected_with_invalid_root_hash(self):
2078         # Try writing an invalid root hash. This should be SHA256d, and
2079         # 32 bytes long as a result.
2080         mw = self._make_new_mw("si2", 0)
2081         # 17 bytes != 32 bytes
2082         invalid_root_hash = "a" * 17
2083         d = defer.succeed(None)
2084         # Before this test can work, we need to put some blocks + salts,
2085         # a block hash tree, and a share hash tree. Otherwise, we'll see
2086         # failures that match what we are looking for, but are caused by
2087         # the constraints imposed on operation ordering.
2088         for i in xrange(6):
2089             d.addCallback(lambda ignored, i=i:
2090                 mw.put_block(self.block, i, self.salt))
2091         d.addCallback(lambda ignored:
2092             mw.put_encprivkey(self.encprivkey))
2093         d.addCallback(lambda ignored:
2094             mw.put_blockhashes(self.block_hash_tree))
2095         d.addCallback(lambda ignored:
2096             mw.put_sharehashes(self.share_hash_chain))
2097         d.addCallback(lambda ignored:
2098             self.shouldFail(LayoutInvalid, "invalid root hash",
2099                             None, mw.put_root_hash, invalid_root_hash))
2100         return d
2101
2102
2103     def test_write_rejected_with_invalid_blocksize(self):
2104         # The blocksize implied by the writer that we get from
2105         # _make_new_mw is 2bytes -- any more or any less than this
2106         # should be cause for failure, unless it is the tail segment, in
2107         # which case it may not be failure.
2108         invalid_block = "a"
2109         mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
2110                                              # one byte blocks
2111         # 1 bytes != 2 bytes
2112         d = defer.succeed(None)
2113         d.addCallback(lambda ignored, invalid_block=invalid_block:
2114             self.shouldFail(LayoutInvalid, "test blocksize too small",
2115                             None, mw.put_block, invalid_block, 0,
2116                             self.salt))
2117         invalid_block = invalid_block * 3
2118         # 3 bytes != 2 bytes
2119         d.addCallback(lambda ignored:
2120             self.shouldFail(LayoutInvalid, "test blocksize too large",
2121                             None,
2122                             mw.put_block, invalid_block, 0, self.salt))
2123         for i in xrange(5):
2124             d.addCallback(lambda ignored, i=i:
2125                 mw.put_block(self.block, i, self.salt))
2126         # Try to put an invalid tail segment
2127         d.addCallback(lambda ignored:
2128             self.shouldFail(LayoutInvalid, "test invalid tail segment",
2129                             None,
2130                             mw.put_block, self.block, 5, self.salt))
2131         valid_block = "a"
2132         d.addCallback(lambda ignored:
2133             mw.put_block(valid_block, 5, self.salt))
2134         return d
2135
2136
2137     def test_write_enforces_order_constraints(self):
2138         # We require that the MDMFSlotWriteProxy be interacted with in a
2139         # specific way.
2140         # That way is:
2141         # 0: __init__
2142         # 1: write blocks and salts
2143         # 2: Write the encrypted private key
2144         # 3: Write the block hashes
2145         # 4: Write the share hashes
2146         # 5: Write the root hash and salt hash
2147         # 6: Write the signature and verification key
2148         # 7: Write the file.
2149         # 
2150         # Some of these can be performed out-of-order, and some can't.
2151         # The dependencies that I want to test here are:
2152         #  - Private key before block hashes
2153         #  - share hashes and block hashes before root hash
2154         #  - root hash before signature
2155         #  - signature before verification key
2156         mw0 = self._make_new_mw("si0", 0)
2157         # Write some shares
2158         d = defer.succeed(None)
2159         for i in xrange(6):
2160             d.addCallback(lambda ignored, i=i:
2161                 mw0.put_block(self.block, i, self.salt))
2162
2163         # Try to write the share hash chain without writing the
2164         # encrypted private key
2165         d.addCallback(lambda ignored:
2166             self.shouldFail(LayoutInvalid, "share hash chain before "
2167                                            "private key",
2168                             None,
2169                             mw0.put_sharehashes, self.share_hash_chain))
2170         # Write the private key.
2171         d.addCallback(lambda ignored:
2172             mw0.put_encprivkey(self.encprivkey))
2173
2174         # Now write the block hashes and try again
2175         d.addCallback(lambda ignored:
2176             mw0.put_blockhashes(self.block_hash_tree))
2177
2178         # We haven't yet put the root hash on the share, so we shouldn't
2179         # be able to sign it.
2180         d.addCallback(lambda ignored:
2181             self.shouldFail(LayoutInvalid, "signature before root hash",
2182                             None, mw0.put_signature, self.signature))
2183
2184         d.addCallback(lambda ignored:
2185             self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
2186
2187         # ..and, since that fails, we also shouldn't be able to put the
2188         # verification key.
2189         d.addCallback(lambda ignored:
2190             self.shouldFail(LayoutInvalid, "key before signature",
2191                             None, mw0.put_verification_key,
2192                             self.verification_key))
2193
2194         # Now write the share hashes.
2195         d.addCallback(lambda ignored:
2196             mw0.put_sharehashes(self.share_hash_chain))
2197         # We should be able to write the root hash now too
2198         d.addCallback(lambda ignored:
2199             mw0.put_root_hash(self.root_hash))
2200
2201         # We should still be unable to put the verification key
2202         d.addCallback(lambda ignored:
2203             self.shouldFail(LayoutInvalid, "key before signature",
2204                             None, mw0.put_verification_key,
2205                             self.verification_key))
2206
2207         d.addCallback(lambda ignored:
2208             mw0.put_signature(self.signature))
2209
2210         # We shouldn't be able to write the offsets to the remote server
2211         # until the offset table is finished; IOW, until we have written
2212         # the verification key.
2213         d.addCallback(lambda ignored:
2214             self.shouldFail(LayoutInvalid, "offsets before verification key",
2215                             None,
2216                             mw0.finish_publishing))
2217
2218         d.addCallback(lambda ignored:
2219             mw0.put_verification_key(self.verification_key))
2220         return d
2221
2222
2223     def test_end_to_end(self):
2224         mw = self._make_new_mw("si1", 0)
2225         # Write a share using the mutable writer, and make sure that the
2226         # reader knows how to read everything back to us.
2227         d = defer.succeed(None)
2228         for i in xrange(6):
2229             d.addCallback(lambda ignored, i=i:
2230                 mw.put_block(self.block, i, self.salt))
2231         d.addCallback(lambda ignored:
2232             mw.put_encprivkey(self.encprivkey))
2233         d.addCallback(lambda ignored:
2234             mw.put_blockhashes(self.block_hash_tree))
2235         d.addCallback(lambda ignored:
2236             mw.put_sharehashes(self.share_hash_chain))
2237         d.addCallback(lambda ignored:
2238             mw.put_root_hash(self.root_hash))
2239         d.addCallback(lambda ignored:
2240             mw.put_signature(self.signature))
2241         d.addCallback(lambda ignored:
2242             mw.put_verification_key(self.verification_key))
2243         d.addCallback(lambda ignored:
2244             mw.finish_publishing())
2245
2246         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2247         def _check_block_and_salt((block, salt)):
2248             self.failUnlessEqual(block, self.block)
2249             self.failUnlessEqual(salt, self.salt)
2250
2251         for i in xrange(6):
2252             d.addCallback(lambda ignored, i=i:
2253                 mr.get_block_and_salt(i))
2254             d.addCallback(_check_block_and_salt)
2255
2256         d.addCallback(lambda ignored:
2257             mr.get_encprivkey())
2258         d.addCallback(lambda encprivkey:
2259             self.failUnlessEqual(self.encprivkey, encprivkey))
2260
2261         d.addCallback(lambda ignored:
2262             mr.get_blockhashes())
2263         d.addCallback(lambda blockhashes:
2264             self.failUnlessEqual(self.block_hash_tree, blockhashes))
2265
2266         d.addCallback(lambda ignored:
2267             mr.get_sharehashes())
2268         d.addCallback(lambda sharehashes:
2269             self.failUnlessEqual(self.share_hash_chain, sharehashes))
2270
2271         d.addCallback(lambda ignored:
2272             mr.get_signature())
2273         d.addCallback(lambda signature:
2274             self.failUnlessEqual(signature, self.signature))
2275
2276         d.addCallback(lambda ignored:
2277             mr.get_verification_key())
2278         d.addCallback(lambda verification_key:
2279             self.failUnlessEqual(verification_key, self.verification_key))
2280
2281         d.addCallback(lambda ignored:
2282             mr.get_seqnum())
2283         d.addCallback(lambda seqnum:
2284             self.failUnlessEqual(seqnum, 0))
2285
2286         d.addCallback(lambda ignored:
2287             mr.get_root_hash())
2288         d.addCallback(lambda root_hash:
2289             self.failUnlessEqual(self.root_hash, root_hash))
2290
2291         d.addCallback(lambda ignored:
2292             mr.get_encoding_parameters())
2293         def _check_encoding_parameters((k, n, segsize, datalen)):
2294             self.failUnlessEqual(k, 3)
2295             self.failUnlessEqual(n, 10)
2296             self.failUnlessEqual(segsize, 6)
2297             self.failUnlessEqual(datalen, 36)
2298         d.addCallback(_check_encoding_parameters)
2299
2300         d.addCallback(lambda ignored:
2301             mr.get_checkstring())
2302         d.addCallback(lambda checkstring:
2303             self.failUnlessEqual(checkstring, mw.get_checkstring()))
2304         return d
2305
2306
2307     def test_is_sdmf(self):
2308         # The MDMFSlotReadProxy should also know how to read SDMF files,
2309         # since it will encounter them on the grid. Callers use the
2310         # is_sdmf method to test this.
2311         self.write_sdmf_share_to_server("si1")
2312         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2313         d = mr.is_sdmf()
2314         d.addCallback(lambda issdmf:
2315             self.failUnless(issdmf))
2316         return d
2317
2318
2319     def test_reads_sdmf(self):
2320         # The slot read proxy should, naturally, know how to tell us
2321         # about data in the SDMF format
2322         self.write_sdmf_share_to_server("si1")
2323         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2324         d = defer.succeed(None)
2325         d.addCallback(lambda ignored:
2326             mr.is_sdmf())
2327         d.addCallback(lambda issdmf:
2328             self.failUnless(issdmf))
2329
2330         # What do we need to read?
2331         #  - The sharedata
2332         #  - The salt
2333         d.addCallback(lambda ignored:
2334             mr.get_block_and_salt(0))
2335         def _check_block_and_salt(results):
2336             block, salt = results
2337             # Our original file is 36 bytes long. Then each share is 12
2338             # bytes in size. The share is composed entirely of the
2339             # letter a. self.block contains 2 as, so 6 * self.block is
2340             # what we are looking for.
2341             self.failUnlessEqual(block, self.block * 6)
2342             self.failUnlessEqual(salt, self.salt)
2343         d.addCallback(_check_block_and_salt)
2344
2345         #  - The blockhashes
2346         d.addCallback(lambda ignored:
2347             mr.get_blockhashes())
2348         d.addCallback(lambda blockhashes:
2349             self.failUnlessEqual(self.block_hash_tree,
2350                                  blockhashes,
2351                                  blockhashes))
2352         #  - The sharehashes
2353         d.addCallback(lambda ignored:
2354             mr.get_sharehashes())
2355         d.addCallback(lambda sharehashes:
2356             self.failUnlessEqual(self.share_hash_chain,
2357                                  sharehashes))
2358         #  - The keys
2359         d.addCallback(lambda ignored:
2360             mr.get_encprivkey())
2361         d.addCallback(lambda encprivkey:
2362             self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
2363         d.addCallback(lambda ignored:
2364             mr.get_verification_key())
2365         d.addCallback(lambda verification_key:
2366             self.failUnlessEqual(verification_key,
2367                                  self.verification_key,
2368                                  verification_key))
2369         #  - The signature
2370         d.addCallback(lambda ignored:
2371             mr.get_signature())
2372         d.addCallback(lambda signature:
2373             self.failUnlessEqual(signature, self.signature, signature))
2374
2375         #  - The sequence number
2376         d.addCallback(lambda ignored:
2377             mr.get_seqnum())
2378         d.addCallback(lambda seqnum:
2379             self.failUnlessEqual(seqnum, 0, seqnum))
2380
2381         #  - The root hash
2382         d.addCallback(lambda ignored:
2383             mr.get_root_hash())
2384         d.addCallback(lambda root_hash:
2385             self.failUnlessEqual(root_hash, self.root_hash, root_hash))
2386         return d
2387
2388
2389     def test_only_reads_one_segment_sdmf(self):
2390         # SDMF shares have only one segment, so it doesn't make sense to
2391         # read more segments than that. The reader should know this and
2392         # complain if we try to do that.
2393         self.write_sdmf_share_to_server("si1")
2394         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2395         d = defer.succeed(None)
2396         d.addCallback(lambda ignored:
2397             mr.is_sdmf())
2398         d.addCallback(lambda issdmf:
2399             self.failUnless(issdmf))
2400         d.addCallback(lambda ignored:
2401             self.shouldFail(LayoutInvalid, "test bad segment",
2402                             None,
2403                             mr.get_block_and_salt, 1))
2404         return d
2405
2406
2407     def test_read_with_prefetched_mdmf_data(self):
2408         # The MDMFSlotReadProxy will prefill certain fields if you pass
2409         # it data that you have already fetched. This is useful for
2410         # cases like the Servermap, which prefetches ~2kb of data while
2411         # finding out which shares are on the remote peer so that it
2412         # doesn't waste round trips.
2413         mdmf_data = self.build_test_mdmf_share()
2414         self.write_test_share_to_server("si1")
2415         def _make_mr(ignored, length):
2416             mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
2417             return mr
2418
2419         d = defer.succeed(None)
2420         # This should be enough to fill in both the encoding parameters
2421         # and the table of offsets, which will complete the version
2422         # information tuple.
2423         d.addCallback(_make_mr, 123)
2424         d.addCallback(lambda mr:
2425             mr.get_verinfo())
2426         def _check_verinfo(verinfo):
2427             self.failUnless(verinfo)
2428             self.failUnlessEqual(len(verinfo), 9)
2429             (seqnum,
2430              root_hash,
2431              salt_hash,
2432              segsize,
2433              datalen,
2434              k,
2435              n,
2436              prefix,
2437              offsets) = verinfo
2438             self.failUnlessEqual(seqnum, 0)
2439             self.failUnlessEqual(root_hash, self.root_hash)
2440             self.failUnlessEqual(segsize, 6)
2441             self.failUnlessEqual(datalen, 36)
2442             self.failUnlessEqual(k, 3)
2443             self.failUnlessEqual(n, 10)
2444             expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
2445                                           1,
2446                                           seqnum,
2447                                           root_hash,
2448                                           k,
2449                                           n,
2450                                           segsize,
2451                                           datalen)
2452             self.failUnlessEqual(expected_prefix, prefix)
2453             self.failUnlessEqual(self.rref.read_count, 0)
2454         d.addCallback(_check_verinfo)
2455         # This is not enough data to read a block and a share, so the
2456         # wrapper should attempt to read this from the remote server.
2457         d.addCallback(_make_mr, 123)
2458         d.addCallback(lambda mr:
2459             mr.get_block_and_salt(0))
2460         def _check_block_and_salt((block, salt)):
2461             self.failUnlessEqual(block, self.block)
2462             self.failUnlessEqual(salt, self.salt)
2463             self.failUnlessEqual(self.rref.read_count, 1)
2464         # This should be enough data to read one block.
2465         d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140)
2466         d.addCallback(lambda mr:
2467             mr.get_block_and_salt(0))
2468         d.addCallback(_check_block_and_salt)
2469         return d
2470
2471
2472     def test_read_with_prefetched_sdmf_data(self):
2473         sdmf_data = self.build_test_sdmf_share()
2474         self.write_sdmf_share_to_server("si1")
2475         def _make_mr(ignored, length):
2476             mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
2477             return mr
2478
2479         d = defer.succeed(None)
2480         # This should be enough to get us the encoding parameters,
2481         # offset table, and everything else we need to build a verinfo
2482         # string.
2483         d.addCallback(_make_mr, 123)
2484         d.addCallback(lambda mr:
2485             mr.get_verinfo())
2486         def _check_verinfo(verinfo):
2487             self.failUnless(verinfo)
2488             self.failUnlessEqual(len(verinfo), 9)
2489             (seqnum,
2490              root_hash,
2491              salt,
2492              segsize,
2493              datalen,
2494              k,
2495              n,
2496              prefix,
2497              offsets) = verinfo
2498             self.failUnlessEqual(seqnum, 0)
2499             self.failUnlessEqual(root_hash, self.root_hash)
2500             self.failUnlessEqual(salt, self.salt)
2501             self.failUnlessEqual(segsize, 36)
2502             self.failUnlessEqual(datalen, 36)
2503             self.failUnlessEqual(k, 3)
2504             self.failUnlessEqual(n, 10)
2505             expected_prefix = struct.pack(SIGNED_PREFIX,
2506                                           0,
2507                                           seqnum,
2508                                           root_hash,
2509                                           salt,
2510                                           k,
2511                                           n,
2512                                           segsize,
2513                                           datalen)
2514             self.failUnlessEqual(expected_prefix, prefix)
2515             self.failUnlessEqual(self.rref.read_count, 0)
2516         d.addCallback(_check_verinfo)
2517         # This shouldn't be enough to read any share data.
2518         d.addCallback(_make_mr, 123)
2519         d.addCallback(lambda mr:
2520             mr.get_block_and_salt(0))
2521         def _check_block_and_salt((block, salt)):
2522             self.failUnlessEqual(block, self.block * 6)
2523             self.failUnlessEqual(salt, self.salt)
2524             # TODO: Fix the read routine so that it reads only the data
2525             #       that it has cached if it can't read all of it.
2526             self.failUnlessEqual(self.rref.read_count, 2)
2527
2528         # This should be enough to read share data.
2529         d.addCallback(_make_mr, self.offsets['share_data'])
2530         d.addCallback(lambda mr:
2531             mr.get_block_and_salt(0))
2532         d.addCallback(_check_block_and_salt)
2533         return d
2534
2535
2536     def test_read_with_empty_mdmf_file(self):
2537         # Some tests upload a file with no contents to test things
2538         # unrelated to the actual handling of the content of the file.
2539         # The reader should behave intelligently in these cases.
2540         self.write_test_share_to_server("si1", empty=True)
2541         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2542         # We should be able to get the encoding parameters, and they
2543         # should be correct.
2544         d = defer.succeed(None)
2545         d.addCallback(lambda ignored:
2546             mr.get_encoding_parameters())
2547         def _check_encoding_parameters(params):
2548             self.failUnlessEqual(len(params), 4)
2549             k, n, segsize, datalen = params
2550             self.failUnlessEqual(k, 3)
2551             self.failUnlessEqual(n, 10)
2552             self.failUnlessEqual(segsize, 0)
2553             self.failUnlessEqual(datalen, 0)
2554         d.addCallback(_check_encoding_parameters)
2555
2556         # We should not be able to fetch a block, since there are no
2557         # blocks to fetch
2558         d.addCallback(lambda ignored:
2559             self.shouldFail(LayoutInvalid, "get block on empty file",
2560                             None,
2561                             mr.get_block_and_salt, 0))
2562         return d
2563
2564
2565     def test_read_with_empty_sdmf_file(self):
2566         self.write_sdmf_share_to_server("si1", empty=True)
2567         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2568         # We should be able to get the encoding parameters, and they
2569         # should be correct
2570         d = defer.succeed(None)
2571         d.addCallback(lambda ignored:
2572             mr.get_encoding_parameters())
2573         def _check_encoding_parameters(params):
2574             self.failUnlessEqual(len(params), 4)
2575             k, n, segsize, datalen = params
2576             self.failUnlessEqual(k, 3)
2577             self.failUnlessEqual(n, 10)
2578             self.failUnlessEqual(segsize, 0)
2579             self.failUnlessEqual(datalen, 0)
2580         d.addCallback(_check_encoding_parameters)
2581
2582         # It does not make sense to get a block in this format, so we
2583         # should not be able to.
2584         d.addCallback(lambda ignored:
2585             self.shouldFail(LayoutInvalid, "get block on an empty file",
2586                             None,
2587                             mr.get_block_and_salt, 0))
2588         return d
2589
2590
2591     def test_verinfo_with_sdmf_file(self):
2592         self.write_sdmf_share_to_server("si1")
2593         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2594         # We should be able to get the version information.
2595         d = defer.succeed(None)
2596         d.addCallback(lambda ignored:
2597             mr.get_verinfo())
2598         def _check_verinfo(verinfo):
2599             self.failUnless(verinfo)
2600             self.failUnlessEqual(len(verinfo), 9)
2601             (seqnum,
2602              root_hash,
2603              salt,
2604              segsize,
2605              datalen,
2606              k,
2607              n,
2608              prefix,
2609              offsets) = verinfo
2610             self.failUnlessEqual(seqnum, 0)
2611             self.failUnlessEqual(root_hash, self.root_hash)
2612             self.failUnlessEqual(salt, self.salt)
2613             self.failUnlessEqual(segsize, 36)
2614             self.failUnlessEqual(datalen, 36)
2615             self.failUnlessEqual(k, 3)
2616             self.failUnlessEqual(n, 10)
2617             expected_prefix = struct.pack(">BQ32s16s BBQQ",
2618                                           0,
2619                                           seqnum,
2620                                           root_hash,
2621                                           salt,
2622                                           k,
2623                                           n,
2624                                           segsize,
2625                                           datalen)
2626             self.failUnlessEqual(prefix, expected_prefix)
2627             self.failUnlessEqual(offsets, self.offsets)
2628         d.addCallback(_check_verinfo)
2629         return d
2630
2631
2632     def test_verinfo_with_mdmf_file(self):
2633         self.write_test_share_to_server("si1")
2634         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2635         d = defer.succeed(None)
2636         d.addCallback(lambda ignored:
2637             mr.get_verinfo())
2638         def _check_verinfo(verinfo):
2639             self.failUnless(verinfo)
2640             self.failUnlessEqual(len(verinfo), 9)
2641             (seqnum,
2642              root_hash,
2643              IV,
2644              segsize,
2645              datalen,
2646              k,
2647              n,
2648              prefix,
2649              offsets) = verinfo
2650             self.failUnlessEqual(seqnum, 0)
2651             self.failUnlessEqual(root_hash, self.root_hash)
2652             self.failIf(IV)
2653             self.failUnlessEqual(segsize, 6)
2654             self.failUnlessEqual(datalen, 36)
2655             self.failUnlessEqual(k, 3)
2656             self.failUnlessEqual(n, 10)
2657             expected_prefix = struct.pack(">BQ32s BBQQ",
2658                                           1,
2659                                           seqnum,
2660                                           root_hash,
2661                                           k,
2662                                           n,
2663                                           segsize,
2664                                           datalen)
2665             self.failUnlessEqual(prefix, expected_prefix)
2666             self.failUnlessEqual(offsets, self.offsets)
2667         d.addCallback(_check_verinfo)
2668         return d
2669
2670
2671     def test_sdmf_writer(self):
2672         # Go through the motions of writing an SDMF share to the storage
2673         # server. Then read the storage server to see that the share got
2674         # written in the way that we think it should have. 
2675
2676         # We do this first so that the necessary instance variables get
2677         # set the way we want them for the tests below.
2678         data = self.build_test_sdmf_share()
2679         sdmfr = SDMFSlotWriteProxy(0,
2680                                    self.rref,
2681                                    "si1",
2682                                    self.secrets,
2683                                    0, 3, 10, 36, 36)
2684         # Put the block and salt.
2685         sdmfr.put_block(self.blockdata, 0, self.salt)
2686
2687         # Put the encprivkey
2688         sdmfr.put_encprivkey(self.encprivkey)
2689
2690         # Put the block and share hash chains
2691         sdmfr.put_blockhashes(self.block_hash_tree)
2692         sdmfr.put_sharehashes(self.share_hash_chain)
2693         sdmfr.put_root_hash(self.root_hash)
2694
2695         # Put the signature
2696         sdmfr.put_signature(self.signature)
2697
2698         # Put the verification key
2699         sdmfr.put_verification_key(self.verification_key)
2700
2701         # Now check to make sure that nothing has been written yet.
2702         self.failUnlessEqual(self.rref.write_count, 0)
2703
2704         # Now finish publishing
2705         d = sdmfr.finish_publishing()
2706         def _then(ignored):
2707             self.failUnlessEqual(self.rref.write_count, 1)
2708             read = self.ss.remote_slot_readv
2709             self.failUnlessEqual(read("si1", [0], [(0, len(data))]),
2710                                  {0: [data]})
2711         d.addCallback(_then)
2712         return d
2713
2714
2715     def test_sdmf_writer_preexisting_share(self):
2716         data = self.build_test_sdmf_share()
2717         self.write_sdmf_share_to_server("si1")
2718
2719         # Now there is a share on the storage server. To successfully
2720         # write, we need to set the checkstring correctly. When we
2721         # don't, no write should occur.
2722         sdmfw = SDMFSlotWriteProxy(0,
2723                                    self.rref,
2724                                    "si1",
2725                                    self.secrets,
2726                                    1, 3, 10, 36, 36)
2727         sdmfw.put_block(self.blockdata, 0, self.salt)
2728
2729         # Put the encprivkey
2730         sdmfw.put_encprivkey(self.encprivkey)
2731
2732         # Put the block and share hash chains
2733         sdmfw.put_blockhashes(self.block_hash_tree)
2734         sdmfw.put_sharehashes(self.share_hash_chain)
2735
2736         # Put the root hash
2737         sdmfw.put_root_hash(self.root_hash)
2738
2739         # Put the signature
2740         sdmfw.put_signature(self.signature)
2741
2742         # Put the verification key
2743         sdmfw.put_verification_key(self.verification_key)
2744
2745         # We shouldn't have a checkstring yet
2746         self.failUnlessEqual(sdmfw.get_checkstring(), "")
2747
2748         d = sdmfw.finish_publishing()
2749         def _then(results):
2750             self.failIf(results[0])
2751             # this is the correct checkstring
2752             self._expected_checkstring = results[1][0][0]
2753             return self._expected_checkstring
2754
2755         d.addCallback(_then)
2756         d.addCallback(sdmfw.set_checkstring)
2757         d.addCallback(lambda ignored:
2758             sdmfw.get_checkstring())
2759         d.addCallback(lambda checkstring:
2760             self.failUnlessEqual(checkstring, self._expected_checkstring))
2761         d.addCallback(lambda ignored:
2762             sdmfw.finish_publishing())
2763         def _then_again(results):
2764             self.failUnless(results[0])
2765             read = self.ss.remote_slot_readv
2766             self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2767                                  {0: [struct.pack(">Q", 1)]})
2768             self.failUnlessEqual(read("si1", [0], [(9, len(data) - 9)]),
2769                                  {0: [data[9:]]})
2770         d.addCallback(_then_again)
2771         return d
2772
2773
2774 class Stats(unittest.TestCase):
2775
2776     def setUp(self):
2777         self.sparent = LoggingServiceParent()
2778         self._lease_secret = itertools.count()
2779     def tearDown(self):
2780         return self.sparent.stopService()
2781
2782     def workdir(self, name):
2783         basedir = os.path.join("storage", "Server", name)
2784         return basedir
2785
2786     def create(self, name):
2787         workdir = self.workdir(name)
2788         ss = StorageServer(workdir, "\x00" * 20)
2789         ss.setServiceParent(self.sparent)
2790         return ss
2791
2792     def test_latencies(self):
2793         ss = self.create("test_latencies")
2794         for i in range(10000):
2795             ss.add_latency("allocate", 1.0 * i)
2796         for i in range(1000):
2797             ss.add_latency("renew", 1.0 * i)
2798         for i in range(20):
2799             ss.add_latency("write", 1.0 * i)
2800         for i in range(10):
2801             ss.add_latency("cancel", 2.0 * i)
2802         ss.add_latency("get", 5.0)
2803
2804         output = ss.get_latencies()
2805
2806         self.failUnlessEqual(sorted(output.keys()),
2807                              sorted(["allocate", "renew", "cancel", "write", "get"]))
2808         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
2809         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
2810         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
2811         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
2812         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
2813         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
2814         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
2815         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
2816         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
2817
2818         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
2819         self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
2820         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1, output)
2821         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
2822         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
2823         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
2824         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
2825         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
2826         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
2827
2828         self.failUnlessEqual(len(ss.latencies["write"]), 20)
2829         self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
2830         self.failUnless(output["write"]["01_0_percentile"] is None, output)
2831         self.failUnless(abs(output["write"]["10_0_percentile"] -  2) < 1, output)
2832         self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
2833         self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
2834         self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
2835         self.failUnless(output["write"]["99_0_percentile"] is None, output)
2836         self.failUnless(output["write"]["99_9_percentile"] is None, output)
2837
2838         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
2839         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
2840         self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
2841         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1, output)
2842         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
2843         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
2844         self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
2845         self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
2846         self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
2847
2848         self.failUnlessEqual(len(ss.latencies["get"]), 1)
2849         self.failUnless(output["get"]["mean"] is None, output)
2850         self.failUnless(output["get"]["01_0_percentile"] is None, output)
2851         self.failUnless(output["get"]["10_0_percentile"] is None, output)
2852         self.failUnless(output["get"]["50_0_percentile"] is None, output)
2853         self.failUnless(output["get"]["90_0_percentile"] is None, output)
2854         self.failUnless(output["get"]["95_0_percentile"] is None, output)
2855         self.failUnless(output["get"]["99_0_percentile"] is None, output)
2856         self.failUnless(output["get"]["99_9_percentile"] is None, output)
2857
2858 def remove_tags(s):
2859     s = re.sub(r'<[^>]*>', ' ', s)
2860     s = re.sub(r'\s+', ' ', s)
2861     return s
2862
2863 class MyBucketCountingCrawler(BucketCountingCrawler):
2864     def finished_prefix(self, cycle, prefix):
2865         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
2866         if self.hook_ds:
2867             d = self.hook_ds.pop(0)
2868             d.callback(None)
2869
2870 class MyStorageServer(StorageServer):
2871     def add_bucket_counter(self):
2872         statefile = os.path.join(self.storedir, "bucket_counter.state")
2873         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
2874         self.bucket_counter.setServiceParent(self)
2875
2876 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
2877
2878     def setUp(self):
2879         self.s = service.MultiService()
2880         self.s.startService()
2881     def tearDown(self):
2882         return self.s.stopService()
2883
2884     def test_bucket_counter(self):
2885         basedir = "storage/BucketCounter/bucket_counter"
2886         fileutil.make_dirs(basedir)
2887         ss = StorageServer(basedir, "\x00" * 20)
2888         # to make sure we capture the bucket-counting-crawler in the middle
2889         # of a cycle, we reach in and reduce its maximum slice time to 0. We
2890         # also make it start sooner than usual.
2891         ss.bucket_counter.slow_start = 0
2892         orig_cpu_slice = ss.bucket_counter.cpu_slice
2893         ss.bucket_counter.cpu_slice = 0
2894         ss.setServiceParent(self.s)
2895
2896         w = StorageStatus(ss)
2897
2898         # this sample is before the crawler has started doing anything
2899         html = w.renderSynchronously()
2900         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2901         s = remove_tags(html)
2902         self.failUnlessIn("Accepting new shares: Yes", s)
2903         self.failUnlessIn("Reserved space: - 0 B (0)", s)
2904         self.failUnlessIn("Total buckets: Not computed yet", s)
2905         self.failUnlessIn("Next crawl in", s)
2906
2907         # give the bucket-counting-crawler one tick to get started. The
2908         # cpu_slice=0 will force it to yield right after it processes the
2909         # first prefix
2910
2911         d = fireEventually()
2912         def _check(ignored):
2913             # are we really right after the first prefix?
2914             state = ss.bucket_counter.get_state()
2915             if state["last-complete-prefix"] is None:
2916                 d2 = fireEventually()
2917                 d2.addCallback(_check)
2918                 return d2
2919             self.failUnlessEqual(state["last-complete-prefix"],
2920                                  ss.bucket_counter.prefixes[0])
2921             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2922             html = w.renderSynchronously()
2923             s = remove_tags(html)
2924             self.failUnlessIn(" Current crawl ", s)
2925             self.failUnlessIn(" (next work in ", s)
2926         d.addCallback(_check)
2927
2928         # now give it enough time to complete a full cycle
2929         def _watch():
2930             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2931         d.addCallback(lambda ignored: self.poll(_watch))
2932         def _check2(ignored):
2933             ss.bucket_counter.cpu_slice = orig_cpu_slice
2934             html = w.renderSynchronously()
2935             s = remove_tags(html)
2936             self.failUnlessIn("Total buckets: 0 (the number of", s)
2937             self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
2938         d.addCallback(_check2)
2939         return d
2940
2941     def test_bucket_counter_cleanup(self):
2942         basedir = "storage/BucketCounter/bucket_counter_cleanup"
2943         fileutil.make_dirs(basedir)
2944         ss = StorageServer(basedir, "\x00" * 20)
2945         # to make sure we capture the bucket-counting-crawler in the middle
2946         # of a cycle, we reach in and reduce its maximum slice time to 0.
2947         ss.bucket_counter.slow_start = 0
2948         orig_cpu_slice = ss.bucket_counter.cpu_slice
2949         ss.bucket_counter.cpu_slice = 0
2950         ss.setServiceParent(self.s)
2951
2952         d = fireEventually()
2953
2954         def _after_first_prefix(ignored):
2955             state = ss.bucket_counter.state
2956             if state["last-complete-prefix"] is None:
2957                 d2 = fireEventually()
2958                 d2.addCallback(_after_first_prefix)
2959                 return d2
2960             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2961             # now sneak in and mess with its state, to make sure it cleans up
2962             # properly at the end of the cycle
2963             self.failUnlessEqual(state["last-complete-prefix"],
2964                                  ss.bucket_counter.prefixes[0])
2965             state["bucket-counts"][-12] = {}
2966             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
2967             ss.bucket_counter.save_state()
2968         d.addCallback(_after_first_prefix)
2969
2970         # now give it enough time to complete a cycle
2971         def _watch():
2972             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2973         d.addCallback(lambda ignored: self.poll(_watch))
2974         def _check2(ignored):
2975             ss.bucket_counter.cpu_slice = orig_cpu_slice
2976             s = ss.bucket_counter.get_state()
2977             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
2978             self.failIf("bogusprefix!" in s["storage-index-samples"],
2979                         s["storage-index-samples"].keys())
2980         d.addCallback(_check2)
2981         return d
2982
2983     def test_bucket_counter_eta(self):
2984         basedir = "storage/BucketCounter/bucket_counter_eta"
2985         fileutil.make_dirs(basedir)
2986         ss = MyStorageServer(basedir, "\x00" * 20)
2987         ss.bucket_counter.slow_start = 0
2988         # these will be fired inside finished_prefix()
2989         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
2990         w = StorageStatus(ss)
2991
2992         d = defer.Deferred()
2993
2994         def _check_1(ignored):
2995             # no ETA is available yet
2996             html = w.renderSynchronously()
2997             s = remove_tags(html)
2998             self.failUnlessIn("complete (next work", s)
2999
3000         def _check_2(ignored):
3001             # one prefix has finished, so an ETA based upon that elapsed time
3002             # should be available.
3003             html = w.renderSynchronously()
3004             s = remove_tags(html)
3005             self.failUnlessIn("complete (ETA ", s)
3006
3007         def _check_3(ignored):
3008             # two prefixes have finished
3009             html = w.renderSynchronously()
3010             s = remove_tags(html)
3011             self.failUnlessIn("complete (ETA ", s)
3012             d.callback("done")
3013
3014         hooks[0].addCallback(_check_1).addErrback(d.errback)
3015         hooks[1].addCallback(_check_2).addErrback(d.errback)
3016         hooks[2].addCallback(_check_3).addErrback(d.errback)
3017
3018         ss.setServiceParent(self.s)
3019         return d
3020
3021 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
3022     stop_after_first_bucket = False
3023     def process_bucket(self, *args, **kwargs):
3024         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
3025         if self.stop_after_first_bucket:
3026             self.stop_after_first_bucket = False
3027             self.cpu_slice = -1.0
3028     def yielding(self, sleep_time):
3029         if not self.stop_after_first_bucket:
3030             self.cpu_slice = 500
3031
3032 class BrokenStatResults:
3033     pass
3034 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
3035     def stat(self, fn):
3036         s = os.stat(fn)
3037         bsr = BrokenStatResults()
3038         for attrname in dir(s):
3039             if attrname.startswith("_"):
3040                 continue
3041             if attrname == "st_blocks":
3042                 continue
3043             setattr(bsr, attrname, getattr(s, attrname))
3044         return bsr
3045
3046 class InstrumentedStorageServer(StorageServer):
3047     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
3048 class No_ST_BLOCKS_StorageServer(StorageServer):
3049     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
3050
3051 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3052
3053     def setUp(self):
3054         self.s = service.MultiService()
3055         self.s.startService()
3056     def tearDown(self):
3057         return self.s.stopService()
3058
3059     def make_shares(self, ss):
3060         def make(si):
3061             return (si, hashutil.tagged_hash("renew", si),
3062                     hashutil.tagged_hash("cancel", si))
3063         def make_mutable(si):
3064             return (si, hashutil.tagged_hash("renew", si),
3065                     hashutil.tagged_hash("cancel", si),
3066                     hashutil.tagged_hash("write-enabler", si))
3067         def make_extra_lease(si, num):
3068             return (hashutil.tagged_hash("renew-%d" % num, si),
3069                     hashutil.tagged_hash("cancel-%d" % num, si))
3070
3071         immutable_si_0, rs0, cs0 = make("\x00" * 16)
3072         immutable_si_1, rs1, cs1 = make("\x01" * 16)
3073         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
3074         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
3075         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
3076         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
3077         sharenums = [0]
3078         canary = FakeCanary()
3079         # note: 'tahoe debug dump-share' will not handle this file, since the
3080         # inner contents are not a valid CHK share
3081         data = "\xff" * 1000
3082
3083         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
3084                                          1000, canary)
3085         w[0].remote_write(0, data)
3086         w[0].remote_close()
3087
3088         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
3089                                          1000, canary)
3090         w[0].remote_write(0, data)
3091         w[0].remote_close()
3092         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
3093
3094         writev = ss.remote_slot_testv_and_readv_and_writev
3095         writev(mutable_si_2, (we2, rs2, cs2),
3096                {0: ([], [(0,data)], len(data))}, [])
3097         writev(mutable_si_3, (we3, rs3, cs3),
3098                {0: ([], [(0,data)], len(data))}, [])
3099         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
3100
3101         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
3102         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
3103         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
3104
3105     def test_basic(self):
3106         basedir = "storage/LeaseCrawler/basic"
3107         fileutil.make_dirs(basedir)
3108         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3109         # make it start sooner than usual.
3110         lc = ss.lease_checker
3111         lc.slow_start = 0
3112         lc.cpu_slice = 500
3113         lc.stop_after_first_bucket = True
3114         webstatus = StorageStatus(ss)
3115
3116         # create a few shares, with some leases on them
3117         self.make_shares(ss)
3118         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3119
3120         # add a non-sharefile to exercise another code path
3121         fn = os.path.join(ss.sharedir,
3122                           storage_index_to_dir(immutable_si_0),
3123                           "not-a-share")
3124         f = open(fn, "wb")
3125         f.write("I am not a share.\n")
3126         f.close()
3127
3128         # this is before the crawl has started, so we're not in a cycle yet
3129         initial_state = lc.get_state()
3130         self.failIf(lc.get_progress()["cycle-in-progress"])
3131         self.failIfIn("cycle-to-date", initial_state)
3132         self.failIfIn("estimated-remaining-cycle", initial_state)
3133         self.failIfIn("estimated-current-cycle", initial_state)
3134         self.failUnlessIn("history", initial_state)
3135         self.failUnlessEqual(initial_state["history"], {})
3136
3137         ss.setServiceParent(self.s)
3138
3139         DAY = 24*60*60
3140
3141         d = fireEventually()
3142
3143         # now examine the state right after the first bucket has been
3144         # processed.
3145         def _after_first_bucket(ignored):
3146             initial_state = lc.get_state()
3147             if "cycle-to-date" not in initial_state:
3148                 d2 = fireEventually()
3149                 d2.addCallback(_after_first_bucket)
3150                 return d2
3151             self.failUnlessIn("cycle-to-date", initial_state)
3152             self.failUnlessIn("estimated-remaining-cycle", initial_state)
3153             self.failUnlessIn("estimated-current-cycle", initial_state)
3154             self.failUnlessIn("history", initial_state)
3155             self.failUnlessEqual(initial_state["history"], {})
3156
3157             so_far = initial_state["cycle-to-date"]
3158             self.failUnlessEqual(so_far["expiration-enabled"], False)
3159             self.failUnlessIn("configured-expiration-mode", so_far)
3160             self.failUnlessIn("lease-age-histogram", so_far)
3161             lah = so_far["lease-age-histogram"]
3162             self.failUnlessEqual(type(lah), list)
3163             self.failUnlessEqual(len(lah), 1)
3164             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
3165             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
3166             self.failUnlessEqual(so_far["corrupt-shares"], [])
3167             sr1 = so_far["space-recovered"]
3168             self.failUnlessEqual(sr1["examined-buckets"], 1)
3169             self.failUnlessEqual(sr1["examined-shares"], 1)
3170             self.failUnlessEqual(sr1["actual-shares"], 0)
3171             self.failUnlessEqual(sr1["configured-diskbytes"], 0)
3172             self.failUnlessEqual(sr1["original-sharebytes"], 0)
3173             left = initial_state["estimated-remaining-cycle"]
3174             sr2 = left["space-recovered"]
3175             self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
3176             self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
3177             self.failIfEqual(sr2["actual-shares"], None)
3178             self.failIfEqual(sr2["configured-diskbytes"], None)
3179             self.failIfEqual(sr2["original-sharebytes"], None)
3180         d.addCallback(_after_first_bucket)
3181         d.addCallback(lambda ign: self.render1(webstatus))
3182         def _check_html_in_cycle(html):
3183             s = remove_tags(html)
3184             self.failUnlessIn("So far, this cycle has examined "
3185                               "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
3186             self.failUnlessIn("and has recovered: "
3187                               "0 shares, 0 buckets (0 mutable / 0 immutable), "
3188                               "0 B (0 B / 0 B)", s)
3189             self.failUnlessIn("If expiration were enabled, "
3190                               "we would have recovered: "
3191                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3192                               " 0 B (0 B / 0 B) by now", s)
3193             self.failUnlessIn("and the remainder of this cycle "
3194                               "would probably recover: "
3195                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3196                               " 0 B (0 B / 0 B)", s)
3197             self.failUnlessIn("and the whole cycle would probably recover: "
3198                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3199                               " 0 B (0 B / 0 B)", s)
3200             self.failUnlessIn("if we were strictly using each lease's default "
3201                               "31-day lease lifetime", s)
3202             self.failUnlessIn("this cycle would be expected to recover: ", s)
3203         d.addCallback(_check_html_in_cycle)
3204
3205         # wait for the crawler to finish the first cycle. Nothing should have
3206         # been removed.
3207         def _wait():
3208             return bool(lc.get_state()["last-cycle-finished"] is not None)
3209         d.addCallback(lambda ign: self.poll(_wait))
3210
3211         def _after_first_cycle(ignored):
3212             s = lc.get_state()
3213             self.failIf("cycle-to-date" in s)
3214             self.failIf("estimated-remaining-cycle" in s)
3215             self.failIf("estimated-current-cycle" in s)
3216             last = s["history"][0]
3217             self.failUnlessIn("cycle-start-finish-times", last)
3218             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
3219             self.failUnlessEqual(last["expiration-enabled"], False)
3220             self.failUnlessIn("configured-expiration-mode", last)
3221
3222             self.failUnlessIn("lease-age-histogram", last)
3223             lah = last["lease-age-histogram"]
3224             self.failUnlessEqual(type(lah), list)
3225             self.failUnlessEqual(len(lah), 1)
3226             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
3227
3228             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3229             self.failUnlessEqual(last["corrupt-shares"], [])
3230
3231             rec = last["space-recovered"]
3232             self.failUnlessEqual(rec["examined-buckets"], 4)
3233             self.failUnlessEqual(rec["examined-shares"], 4)
3234             self.failUnlessEqual(rec["actual-buckets"], 0)
3235             self.failUnlessEqual(rec["original-buckets"], 0)
3236             self.failUnlessEqual(rec["configured-buckets"], 0)
3237             self.failUnlessEqual(rec["actual-shares"], 0)
3238             self.failUnlessEqual(rec["original-shares"], 0)
3239             self.failUnlessEqual(rec["configured-shares"], 0)
3240             self.failUnlessEqual(rec["actual-diskbytes"], 0)
3241             self.failUnlessEqual(rec["original-diskbytes"], 0)
3242             self.failUnlessEqual(rec["configured-diskbytes"], 0)
3243             self.failUnlessEqual(rec["actual-sharebytes"], 0)
3244             self.failUnlessEqual(rec["original-sharebytes"], 0)
3245             self.failUnlessEqual(rec["configured-sharebytes"], 0)
3246
3247             def _get_sharefile(si):
3248                 return list(ss._iter_share_files(si))[0]
3249             def count_leases(si):
3250                 return len(list(_get_sharefile(si).get_leases()))
3251             self.failUnlessEqual(count_leases(immutable_si_0), 1)
3252             self.failUnlessEqual(count_leases(immutable_si_1), 2)
3253             self.failUnlessEqual(count_leases(mutable_si_2), 1)
3254             self.failUnlessEqual(count_leases(mutable_si_3), 2)
3255         d.addCallback(_after_first_cycle)
3256         d.addCallback(lambda ign: self.render1(webstatus))
3257         def _check_html(html):
3258             s = remove_tags(html)
3259             self.failUnlessIn("recovered: 0 shares, 0 buckets "
3260                               "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
3261             self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
3262                               "(2 mutable / 2 immutable),", s)
3263             self.failUnlessIn("but expiration was not enabled", s)
3264         d.addCallback(_check_html)
3265         d.addCallback(lambda ign: self.render_json(webstatus))
3266         def _check_json(json):
3267             data = simplejson.loads(json)
3268             self.failUnlessIn("lease-checker", data)
3269             self.failUnlessIn("lease-checker-progress", data)
3270         d.addCallback(_check_json)
3271         return d
3272
3273     def backdate_lease(self, sf, renew_secret, new_expire_time):
3274         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
3275         # "renew" a lease with a new_expire_time that is older than what the
3276         # current lease has), so we have to reach inside it.
3277         for i,lease in enumerate(sf.get_leases()):
3278             if lease.renew_secret == renew_secret:
3279                 lease.expiration_time = new_expire_time
3280                 f = open(sf.home, 'rb+')
3281                 sf._write_lease_record(f, i, lease)
3282                 f.close()
3283                 return
3284         raise IndexError("unable to renew non-existent lease")
3285
3286     def test_expire_age(self):
3287         basedir = "storage/LeaseCrawler/expire_age"
3288         fileutil.make_dirs(basedir)
3289         # setting expiration_time to 2000 means that any lease which is more
3290         # than 2000s old will be expired.
3291         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3292                                        expiration_enabled=True,
3293                                        expiration_mode="age",
3294                                        expiration_override_lease_duration=2000)
3295         # make it start sooner than usual.
3296         lc = ss.lease_checker
3297         lc.slow_start = 0
3298         lc.stop_after_first_bucket = True
3299         webstatus = StorageStatus(ss)
3300
3301         # create a few shares, with some leases on them
3302         self.make_shares(ss)
3303         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3304
3305         def count_shares(si):
3306             return len(list(ss._iter_share_files(si)))
3307         def _get_sharefile(si):
3308             return list(ss._iter_share_files(si))[0]
3309         def count_leases(si):
3310             return len(list(_get_sharefile(si).get_leases()))
3311
3312         self.failUnlessEqual(count_shares(immutable_si_0), 1)
3313         self.failUnlessEqual(count_leases(immutable_si_0), 1)
3314         self.failUnlessEqual(count_shares(immutable_si_1), 1)
3315         self.failUnlessEqual(count_leases(immutable_si_1), 2)
3316         self.failUnlessEqual(count_shares(mutable_si_2), 1)
3317         self.failUnlessEqual(count_leases(mutable_si_2), 1)
3318         self.failUnlessEqual(count_shares(mutable_si_3), 1)
3319         self.failUnlessEqual(count_leases(mutable_si_3), 2)
3320
3321         # artificially crank back the expiration time on the first lease of
3322         # each share, to make it look like it expired already (age=1000s).
3323         # Some shares have an extra lease which is set to expire at the
3324         # default time in 31 days from now (age=31days). We then run the
3325         # crawler, which will expire the first lease, making some shares get
3326         # deleted and others stay alive (with one remaining lease)
3327         now = time.time()
3328
3329         sf0 = _get_sharefile(immutable_si_0)
3330         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
3331         sf0_size = os.stat(sf0.home).st_size
3332
3333         # immutable_si_1 gets an extra lease
3334         sf1 = _get_sharefile(immutable_si_1)
3335         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
3336
3337         sf2 = _get_sharefile(mutable_si_2)
3338         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
3339         sf2_size = os.stat(sf2.home).st_size
3340
3341         # mutable_si_3 gets an extra lease
3342         sf3 = _get_sharefile(mutable_si_3)
3343         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
3344
3345         ss.setServiceParent(self.s)
3346
3347         d = fireEventually()
3348         # examine the state right after the first bucket has been processed
3349         def _after_first_bucket(ignored):
3350             p = lc.get_progress()
3351             if not p["cycle-in-progress"]:
3352                 d2 = fireEventually()
3353                 d2.addCallback(_after_first_bucket)
3354                 return d2
3355         d.addCallback(_after_first_bucket)
3356         d.addCallback(lambda ign: self.render1(webstatus))
3357         def _check_html_in_cycle(html):
3358             s = remove_tags(html)
3359             # the first bucket encountered gets deleted, and its prefix
3360             # happens to be about 1/5th of the way through the ring, so the
3361             # predictor thinks we'll have 5 shares and that we'll delete them
3362             # all. This part of the test depends upon the SIs landing right
3363             # where they do now.
3364             self.failUnlessIn("The remainder of this cycle is expected to "
3365                               "recover: 4 shares, 4 buckets", s)
3366             self.failUnlessIn("The whole cycle is expected to examine "
3367                               "5 shares in 5 buckets and to recover: "
3368                               "5 shares, 5 buckets", s)
3369         d.addCallback(_check_html_in_cycle)
3370
3371         # wait for the crawler to finish the first cycle. Two shares should
3372         # have been removed
3373         def _wait():
3374             return bool(lc.get_state()["last-cycle-finished"] is not None)
3375         d.addCallback(lambda ign: self.poll(_wait))
3376
3377         def _after_first_cycle(ignored):
3378             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3379             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3380             self.failUnlessEqual(count_leases(immutable_si_1), 1)
3381             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3382             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3383             self.failUnlessEqual(count_leases(mutable_si_3), 1)
3384
3385             s = lc.get_state()
3386             last = s["history"][0]
3387
3388             self.failUnlessEqual(last["expiration-enabled"], True)
3389             self.failUnlessEqual(last["configured-expiration-mode"],
3390                                  ("age", 2000, None, ("mutable", "immutable")))
3391             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3392
3393             rec = last["space-recovered"]
3394             self.failUnlessEqual(rec["examined-buckets"], 4)
3395             self.failUnlessEqual(rec["examined-shares"], 4)
3396             self.failUnlessEqual(rec["actual-buckets"], 2)
3397             self.failUnlessEqual(rec["original-buckets"], 2)
3398             self.failUnlessEqual(rec["configured-buckets"], 2)
3399             self.failUnlessEqual(rec["actual-shares"], 2)
3400             self.failUnlessEqual(rec["original-shares"], 2)
3401             self.failUnlessEqual(rec["configured-shares"], 2)
3402             size = sf0_size + sf2_size
3403             self.failUnlessEqual(rec["actual-sharebytes"], size)
3404             self.failUnlessEqual(rec["original-sharebytes"], size)
3405             self.failUnlessEqual(rec["configured-sharebytes"], size)
3406             # different platforms have different notions of "blocks used by
3407             # this file", so merely assert that it's a number
3408             self.failUnless(rec["actual-diskbytes"] >= 0,
3409                             rec["actual-diskbytes"])
3410             self.failUnless(rec["original-diskbytes"] >= 0,
3411                             rec["original-diskbytes"])
3412             self.failUnless(rec["configured-diskbytes"] >= 0,
3413                             rec["configured-diskbytes"])
3414         d.addCallback(_after_first_cycle)
3415         d.addCallback(lambda ign: self.render1(webstatus))
3416         def _check_html(html):
3417             s = remove_tags(html)
3418             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
3419             self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
3420             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3421         d.addCallback(_check_html)
3422         return d
3423
3424     def test_expire_cutoff_date(self):
3425         basedir = "storage/LeaseCrawler/expire_cutoff_date"
3426         fileutil.make_dirs(basedir)
3427         # setting cutoff-date to 2000 seconds ago means that any lease which
3428         # is more than 2000s old will be expired.
3429         now = time.time()
3430         then = int(now - 2000)
3431         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3432                                        expiration_enabled=True,
3433                                        expiration_mode="cutoff-date",
3434                                        expiration_cutoff_date=then)
3435         # make it start sooner than usual.
3436         lc = ss.lease_checker
3437         lc.slow_start = 0
3438         lc.stop_after_first_bucket = True
3439         webstatus = StorageStatus(ss)
3440
3441         # create a few shares, with some leases on them
3442         self.make_shares(ss)
3443         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3444
3445         def count_shares(si):
3446             return len(list(ss._iter_share_files(si)))
3447         def _get_sharefile(si):
3448             return list(ss._iter_share_files(si))[0]
3449         def count_leases(si):
3450             return len(list(_get_sharefile(si).get_leases()))
3451
3452         self.failUnlessEqual(count_shares(immutable_si_0), 1)
3453         self.failUnlessEqual(count_leases(immutable_si_0), 1)
3454         self.failUnlessEqual(count_shares(immutable_si_1), 1)
3455         self.failUnlessEqual(count_leases(immutable_si_1), 2)
3456         self.failUnlessEqual(count_shares(mutable_si_2), 1)
3457         self.failUnlessEqual(count_leases(mutable_si_2), 1)
3458         self.failUnlessEqual(count_shares(mutable_si_3), 1)
3459         self.failUnlessEqual(count_leases(mutable_si_3), 2)
3460
3461         # artificially crank back the expiration time on the first lease of
3462         # each share, to make it look like was renewed 3000s ago. To achieve
3463         # this, we need to set the expiration time to now-3000+31days. This
3464         # will change when the lease format is improved to contain both
3465         # create/renew time and duration.
3466         new_expiration_time = now - 3000 + 31*24*60*60
3467
3468         # Some shares have an extra lease which is set to expire at the
3469         # default time in 31 days from now (age=31days). We then run the
3470         # crawler, which will expire the first lease, making some shares get
3471         # deleted and others stay alive (with one remaining lease)
3472
3473         sf0 = _get_sharefile(immutable_si_0)
3474         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3475         sf0_size = os.stat(sf0.home).st_size
3476
3477         # immutable_si_1 gets an extra lease
3478         sf1 = _get_sharefile(immutable_si_1)
3479         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3480
3481         sf2 = _get_sharefile(mutable_si_2)
3482         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3483         sf2_size = os.stat(sf2.home).st_size
3484
3485         # mutable_si_3 gets an extra lease
3486         sf3 = _get_sharefile(mutable_si_3)
3487         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3488
3489         ss.setServiceParent(self.s)
3490
3491         d = fireEventually()
3492         # examine the state right after the first bucket has been processed
3493         def _after_first_bucket(ignored):
3494             p = lc.get_progress()
3495             if not p["cycle-in-progress"]:
3496                 d2 = fireEventually()
3497                 d2.addCallback(_after_first_bucket)
3498                 return d2
3499         d.addCallback(_after_first_bucket)
3500         d.addCallback(lambda ign: self.render1(webstatus))
3501         def _check_html_in_cycle(html):
3502             s = remove_tags(html)
3503             # the first bucket encountered gets deleted, and its prefix
3504             # happens to be about 1/5th of the way through the ring, so the
3505             # predictor thinks we'll have 5 shares and that we'll delete them
3506             # all. This part of the test depends upon the SIs landing right
3507             # where they do now.
3508             self.failUnlessIn("The remainder of this cycle is expected to "
3509                               "recover: 4 shares, 4 buckets", s)
3510             self.failUnlessIn("The whole cycle is expected to examine "
3511                               "5 shares in 5 buckets and to recover: "
3512                               "5 shares, 5 buckets", s)
3513         d.addCallback(_check_html_in_cycle)
3514
3515         # wait for the crawler to finish the first cycle. Two shares should
3516         # have been removed
3517         def _wait():
3518             return bool(lc.get_state()["last-cycle-finished"] is not None)
3519         d.addCallback(lambda ign: self.poll(_wait))
3520
3521         def _after_first_cycle(ignored):
3522             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3523             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3524             self.failUnlessEqual(count_leases(immutable_si_1), 1)
3525             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3526             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3527             self.failUnlessEqual(count_leases(mutable_si_3), 1)
3528
3529             s = lc.get_state()
3530             last = s["history"][0]
3531
3532             self.failUnlessEqual(last["expiration-enabled"], True)
3533             self.failUnlessEqual(last["configured-expiration-mode"],
3534                                  ("cutoff-date", None, then,
3535                                   ("mutable", "immutable")))
3536             self.failUnlessEqual(last["leases-per-share-histogram"],
3537                                  {1: 2, 2: 2})
3538
3539             rec = last["space-recovered"]
3540             self.failUnlessEqual(rec["examined-buckets"], 4)
3541             self.failUnlessEqual(rec["examined-shares"], 4)
3542             self.failUnlessEqual(rec["actual-buckets"], 2)
3543             self.failUnlessEqual(rec["original-buckets"], 0)
3544             self.failUnlessEqual(rec["configured-buckets"], 2)
3545             self.failUnlessEqual(rec["actual-shares"], 2)
3546             self.failUnlessEqual(rec["original-shares"], 0)
3547             self.failUnlessEqual(rec["configured-shares"], 2)
3548             size = sf0_size + sf2_size
3549             self.failUnlessEqual(rec["actual-sharebytes"], size)
3550             self.failUnlessEqual(rec["original-sharebytes"], 0)
3551             self.failUnlessEqual(rec["configured-sharebytes"], size)
3552             # different platforms have different notions of "blocks used by
3553             # this file", so merely assert that it's a number
3554             self.failUnless(rec["actual-diskbytes"] >= 0,
3555                             rec["actual-diskbytes"])
3556             self.failUnless(rec["original-diskbytes"] >= 0,
3557                             rec["original-diskbytes"])
3558             self.failUnless(rec["configured-diskbytes"] >= 0,
3559                             rec["configured-diskbytes"])
3560         d.addCallback(_after_first_cycle)
3561         d.addCallback(lambda ign: self.render1(webstatus))
3562         def _check_html(html):
3563             s = remove_tags(html)
3564             self.failUnlessIn("Expiration Enabled:"
3565                               " expired leases will be removed", s)
3566             date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
3567             substr = "Leases created or last renewed before %s will be considered expired." % date
3568             self.failUnlessIn(substr, s)
3569             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3570         d.addCallback(_check_html)
3571         return d
3572
3573     def test_only_immutable(self):
3574         basedir = "storage/LeaseCrawler/only_immutable"
3575         fileutil.make_dirs(basedir)
3576         now = time.time()
3577         then = int(now - 2000)
3578         ss = StorageServer(basedir, "\x00" * 20,
3579                            expiration_enabled=True,
3580                            expiration_mode="cutoff-date",
3581                            expiration_cutoff_date=then,
3582                            expiration_sharetypes=("immutable",))
3583         lc = ss.lease_checker
3584         lc.slow_start = 0
3585         webstatus = StorageStatus(ss)
3586
3587         self.make_shares(ss)
3588         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3589         # set all leases to be expirable
3590         new_expiration_time = now - 3000 + 31*24*60*60
3591
3592         def count_shares(si):
3593             return len(list(ss._iter_share_files(si)))
3594         def _get_sharefile(si):
3595             return list(ss._iter_share_files(si))[0]
3596         def count_leases(si):
3597             return len(list(_get_sharefile(si).get_leases()))
3598
3599         sf0 = _get_sharefile(immutable_si_0)
3600         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3601         sf1 = _get_sharefile(immutable_si_1)
3602         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3603         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3604         sf2 = _get_sharefile(mutable_si_2)
3605         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3606         sf3 = _get_sharefile(mutable_si_3)
3607         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3608         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3609
3610         ss.setServiceParent(self.s)
3611         def _wait():
3612             return bool(lc.get_state()["last-cycle-finished"] is not None)
3613         d = self.poll(_wait)
3614
3615         def _after_first_cycle(ignored):
3616             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3617             self.failUnlessEqual(count_shares(immutable_si_1), 0)
3618             self.failUnlessEqual(count_shares(mutable_si_2), 1)
3619             self.failUnlessEqual(count_leases(mutable_si_2), 1)
3620             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3621             self.failUnlessEqual(count_leases(mutable_si_3), 2)
3622         d.addCallback(_after_first_cycle)
3623         d.addCallback(lambda ign: self.render1(webstatus))
3624         def _check_html(html):
3625             s = remove_tags(html)
3626             self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
3627         d.addCallback(_check_html)
3628         return d
3629
3630     def test_only_mutable(self):
3631         basedir = "storage/LeaseCrawler/only_mutable"
3632         fileutil.make_dirs(basedir)
3633         now = time.time()
3634         then = int(now - 2000)
3635         ss = StorageServer(basedir, "\x00" * 20,
3636                            expiration_enabled=True,
3637                            expiration_mode="cutoff-date",
3638                            expiration_cutoff_date=then,
3639                            expiration_sharetypes=("mutable",))
3640         lc = ss.lease_checker
3641         lc.slow_start = 0
3642         webstatus = StorageStatus(ss)
3643
3644         self.make_shares(ss)
3645         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3646         # set all leases to be expirable
3647         new_expiration_time = now - 3000 + 31*24*60*60
3648
3649         def count_shares(si):
3650             return len(list(ss._iter_share_files(si)))
3651         def _get_sharefile(si):
3652             return list(ss._iter_share_files(si))[0]
3653         def count_leases(si):
3654             return len(list(_get_sharefile(si).get_leases()))
3655
3656         sf0 = _get_sharefile(immutable_si_0)
3657         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3658         sf1 = _get_sharefile(immutable_si_1)
3659         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3660         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3661         sf2 = _get_sharefile(mutable_si_2)
3662         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3663         sf3 = _get_sharefile(mutable_si_3)
3664         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3665         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3666
3667         ss.setServiceParent(self.s)
3668         def _wait():
3669             return bool(lc.get_state()["last-cycle-finished"] is not None)
3670         d = self.poll(_wait)
3671
3672         def _after_first_cycle(ignored):
3673             self.failUnlessEqual(count_shares(immutable_si_0), 1)
3674             self.failUnlessEqual(count_leases(immutable_si_0), 1)
3675             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3676             self.failUnlessEqual(count_leases(immutable_si_1), 2)
3677             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3678             self.failUnlessEqual(count_shares(mutable_si_3), 0)
3679         d.addCallback(_after_first_cycle)
3680         d.addCallback(lambda ign: self.render1(webstatus))
3681         def _check_html(html):
3682             s = remove_tags(html)
3683             self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
3684         d.addCallback(_check_html)
3685         return d
3686
3687     def test_bad_mode(self):
3688         basedir = "storage/LeaseCrawler/bad_mode"
3689         fileutil.make_dirs(basedir)
3690         e = self.failUnlessRaises(ValueError,
3691                                   StorageServer, basedir, "\x00" * 20,
3692                                   expiration_mode="bogus")
3693         self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
3694
3695     def test_parse_duration(self):
3696         DAY = 24*60*60
3697         MONTH = 31*DAY
3698         YEAR = 365*DAY
3699         p = time_format.parse_duration
3700         self.failUnlessEqual(p("7days"), 7*DAY)
3701         self.failUnlessEqual(p("31day"), 31*DAY)
3702         self.failUnlessEqual(p("60 days"), 60*DAY)
3703         self.failUnlessEqual(p("2mo"), 2*MONTH)
3704         self.failUnlessEqual(p("3 month"), 3*MONTH)
3705         self.failUnlessEqual(p("2years"), 2*YEAR)
3706         e = self.failUnlessRaises(ValueError, p, "2kumquats")
3707         self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
3708
3709     def test_parse_date(self):
3710         p = time_format.parse_date
3711         self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
3712         self.failUnlessEqual(p("2009-03-18"), 1237334400)
3713
3714     def test_limited_history(self):
3715         basedir = "storage/LeaseCrawler/limited_history"
3716         fileutil.make_dirs(basedir)
3717         ss = StorageServer(basedir, "\x00" * 20)
3718         # make it start sooner than usual.
3719         lc = ss.lease_checker
3720         lc.slow_start = 0
3721         lc.cpu_slice = 500
3722
3723         # create a few shares, with some leases on them
3724         self.make_shares(ss)
3725
3726         ss.setServiceParent(self.s)
3727
3728         def _wait_until_15_cycles_done():
3729             last = lc.state["last-cycle-finished"]
3730             if last is not None and last >= 15:
3731                 return True
3732             if lc.timer:
3733                 lc.timer.reset(0)
3734             return False
3735         d = self.poll(_wait_until_15_cycles_done)
3736
3737         def _check(ignored):
3738             s = lc.get_state()
3739             h = s["history"]
3740             self.failUnlessEqual(len(h), 10)
3741             self.failUnlessEqual(max(h.keys()), 15)
3742             self.failUnlessEqual(min(h.keys()), 6)
3743         d.addCallback(_check)
3744         return d
3745
3746     def test_unpredictable_future(self):
3747         basedir = "storage/LeaseCrawler/unpredictable_future"
3748         fileutil.make_dirs(basedir)
3749         ss = StorageServer(basedir, "\x00" * 20)
3750         # make it start sooner than usual.
3751         lc = ss.lease_checker
3752         lc.slow_start = 0
3753         lc.cpu_slice = -1.0 # stop quickly
3754
3755         self.make_shares(ss)
3756
3757         ss.setServiceParent(self.s)
3758
3759         d = fireEventually()
3760         def _check(ignored):
3761             # this should fire after the first bucket is complete, but before
3762             # the first prefix is complete, so the progress-measurer won't
3763             # think we've gotten far enough to raise our percent-complete
3764             # above 0%, triggering the cannot-predict-the-future code in
3765             # expirer.py . This will have to change if/when the
3766             # progress-measurer gets smart enough to count buckets (we'll
3767             # have to interrupt it even earlier, before it's finished the
3768             # first bucket).
3769             s = lc.get_state()
3770             if "cycle-to-date" not in s:
3771                 d2 = fireEventually()
3772                 d2.addCallback(_check)
3773                 return d2
3774             self.failUnlessIn("cycle-to-date", s)
3775             self.failUnlessIn("estimated-remaining-cycle", s)
3776             self.failUnlessIn("estimated-current-cycle", s)
3777
3778             left = s["estimated-remaining-cycle"]["space-recovered"]
3779             self.failUnlessEqual(left["actual-buckets"], None)
3780             self.failUnlessEqual(left["original-buckets"], None)
3781             self.failUnlessEqual(left["configured-buckets"], None)
3782             self.failUnlessEqual(left["actual-shares"], None)
3783             self.failUnlessEqual(left["original-shares"], None)
3784             self.failUnlessEqual(left["configured-shares"], None)
3785             self.failUnlessEqual(left["actual-diskbytes"], None)
3786             self.failUnlessEqual(left["original-diskbytes"], None)
3787             self.failUnlessEqual(left["configured-diskbytes"], None)
3788             self.failUnlessEqual(left["actual-sharebytes"], None)
3789             self.failUnlessEqual(left["original-sharebytes"], None)
3790             self.failUnlessEqual(left["configured-sharebytes"], None)
3791
3792             full = s["estimated-remaining-cycle"]["space-recovered"]
3793             self.failUnlessEqual(full["actual-buckets"], None)
3794             self.failUnlessEqual(full["original-buckets"], None)
3795             self.failUnlessEqual(full["configured-buckets"], None)
3796             self.failUnlessEqual(full["actual-shares"], None)
3797             self.failUnlessEqual(full["original-shares"], None)
3798             self.failUnlessEqual(full["configured-shares"], None)
3799             self.failUnlessEqual(full["actual-diskbytes"], None)
3800             self.failUnlessEqual(full["original-diskbytes"], None)
3801             self.failUnlessEqual(full["configured-diskbytes"], None)
3802             self.failUnlessEqual(full["actual-sharebytes"], None)
3803             self.failUnlessEqual(full["original-sharebytes"], None)
3804             self.failUnlessEqual(full["configured-sharebytes"], None)
3805
3806         d.addCallback(_check)
3807         return d
3808
3809     def test_no_st_blocks(self):
3810         basedir = "storage/LeaseCrawler/no_st_blocks"
3811         fileutil.make_dirs(basedir)
3812         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
3813                                         expiration_mode="age",
3814                                         expiration_override_lease_duration=-1000)
3815         # a negative expiration_time= means the "configured-"
3816         # space-recovered counts will be non-zero, since all shares will have
3817         # expired by then
3818
3819         # make it start sooner than usual.
3820         lc = ss.lease_checker
3821         lc.slow_start = 0
3822
3823         self.make_shares(ss)
3824         ss.setServiceParent(self.s)
3825         def _wait():
3826             return bool(lc.get_state()["last-cycle-finished"] is not None)
3827         d = self.poll(_wait)
3828
3829         def _check(ignored):
3830             s = lc.get_state()
3831             last = s["history"][0]
3832             rec = last["space-recovered"]
3833             self.failUnlessEqual(rec["configured-buckets"], 4)
3834             self.failUnlessEqual(rec["configured-shares"], 4)
3835             self.failUnless(rec["configured-sharebytes"] > 0,
3836                             rec["configured-sharebytes"])
3837             # without the .st_blocks field in os.stat() results, we should be
3838             # reporting diskbytes==sharebytes
3839             self.failUnlessEqual(rec["configured-sharebytes"],
3840                                  rec["configured-diskbytes"])
3841         d.addCallback(_check)
3842         return d
3843
3844     def test_share_corruption(self):
3845         self._poll_should_ignore_these_errors = [
3846             UnknownMutableContainerVersionError,
3847             UnknownImmutableContainerVersionError,
3848             ]
3849         basedir = "storage/LeaseCrawler/share_corruption"
3850         fileutil.make_dirs(basedir)
3851         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3852         w = StorageStatus(ss)
3853         # make it start sooner than usual.
3854         lc = ss.lease_checker
3855         lc.stop_after_first_bucket = True
3856         lc.slow_start = 0
3857         lc.cpu_slice = 500
3858
3859         # create a few shares, with some leases on them
3860         self.make_shares(ss)
3861
3862         # now corrupt one, and make sure the lease-checker keeps going
3863         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3864         first = min(self.sis)
3865         first_b32 = base32.b2a(first)
3866         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
3867         f = open(fn, "rb+")
3868         f.seek(0)
3869         f.write("BAD MAGIC")
3870         f.close()
3871         # if get_share_file() doesn't see the correct mutable magic, it
3872         # assumes the file is an immutable share, and then
3873         # immutable.ShareFile sees a bad version. So regardless of which kind
3874         # of share we corrupted, this will trigger an
3875         # UnknownImmutableContainerVersionError.
3876
3877         # also create an empty bucket
3878         empty_si = base32.b2a("\x04"*16)
3879         empty_bucket_dir = os.path.join(ss.sharedir,
3880                                         storage_index_to_dir(empty_si))
3881         fileutil.make_dirs(empty_bucket_dir)
3882
3883         ss.setServiceParent(self.s)
3884
3885         d = fireEventually()
3886
3887         # now examine the state right after the first bucket has been
3888         # processed.
3889         def _after_first_bucket(ignored):
3890             s = lc.get_state()
3891             if "cycle-to-date" not in s:
3892                 d2 = fireEventually()
3893                 d2.addCallback(_after_first_bucket)
3894                 return d2
3895             so_far = s["cycle-to-date"]
3896             rec = so_far["space-recovered"]
3897             self.failUnlessEqual(rec["examined-buckets"], 1)
3898             self.failUnlessEqual(rec["examined-shares"], 0)
3899             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
3900         d.addCallback(_after_first_bucket)
3901
3902         d.addCallback(lambda ign: self.render_json(w))
3903         def _check_json(json):
3904             data = simplejson.loads(json)
3905             # grr. json turns all dict keys into strings.
3906             so_far = data["lease-checker"]["cycle-to-date"]
3907             corrupt_shares = so_far["corrupt-shares"]
3908             # it also turns all tuples into lists
3909             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3910         d.addCallback(_check_json)
3911         d.addCallback(lambda ign: self.render1(w))
3912         def _check_html(html):
3913             s = remove_tags(html)
3914             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3915         d.addCallback(_check_html)
3916
3917         def _wait():
3918             return bool(lc.get_state()["last-cycle-finished"] is not None)
3919         d.addCallback(lambda ign: self.poll(_wait))
3920
3921         def _after_first_cycle(ignored):
3922             s = lc.get_state()
3923             last = s["history"][0]
3924             rec = last["space-recovered"]
3925             self.failUnlessEqual(rec["examined-buckets"], 5)
3926             self.failUnlessEqual(rec["examined-shares"], 3)
3927             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
3928         d.addCallback(_after_first_cycle)
3929         d.addCallback(lambda ign: self.render_json(w))
3930         def _check_json_history(json):
3931             data = simplejson.loads(json)
3932             last = data["lease-checker"]["history"]["0"]
3933             corrupt_shares = last["corrupt-shares"]
3934             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3935         d.addCallback(_check_json_history)
3936         d.addCallback(lambda ign: self.render1(w))
3937         def _check_html_history(html):
3938             s = remove_tags(html)
3939             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3940         d.addCallback(_check_html_history)
3941
3942         def _cleanup(res):
3943             self.flushLoggedErrors(UnknownMutableContainerVersionError,
3944                                    UnknownImmutableContainerVersionError)
3945             return res
3946         d.addBoth(_cleanup)
3947         return d
3948
3949     def render_json(self, page):
3950         d = self.render1(page, args={"t": ["json"]})
3951         return d
3952
3953 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3954
3955     def setUp(self):
3956         self.s = service.MultiService()
3957         self.s.startService()
3958     def tearDown(self):
3959         return self.s.stopService()
3960
3961     def test_no_server(self):
3962         w = StorageStatus(None)
3963         html = w.renderSynchronously()
3964         self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
3965
3966     def test_status(self):
3967         basedir = "storage/WebStatus/status"
3968         fileutil.make_dirs(basedir)
3969         ss = StorageServer(basedir, "\x00" * 20)
3970         ss.setServiceParent(self.s)
3971         w = StorageStatus(ss)
3972         d = self.render1(w)
3973         def _check_html(html):
3974             self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3975             s = remove_tags(html)
3976             self.failUnlessIn("Accepting new shares: Yes", s)
3977             self.failUnlessIn("Reserved space: - 0 B (0)", s)
3978         d.addCallback(_check_html)
3979         d.addCallback(lambda ign: self.render_json(w))
3980         def _check_json(json):
3981             data = simplejson.loads(json)
3982             s = data["stats"]
3983             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
3984             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
3985             self.failUnlessIn("bucket-counter", data)
3986             self.failUnlessIn("lease-checker", data)
3987         d.addCallback(_check_json)
3988         return d
3989
3990     def render_json(self, page):
3991         d = self.render1(page, args={"t": ["json"]})
3992         return d
3993
3994     @mock.patch('allmydata.util.fileutil.get_disk_stats')
3995     def test_status_no_disk_stats(self, mock_get_disk_stats):
3996         mock_get_disk_stats.side_effect = AttributeError()
3997
3998         # Some platforms may have no disk stats API. Make sure the code can handle that
3999         # (test runs on all platforms).
4000         basedir = "storage/WebStatus/status_no_disk_stats"
4001         fileutil.make_dirs(basedir)
4002         ss = StorageServer(basedir, "\x00" * 20)
4003         ss.setServiceParent(self.s)
4004         w = StorageStatus(ss)
4005         html = w.renderSynchronously()
4006         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4007         s = remove_tags(html)
4008         self.failUnlessIn("Accepting new shares: Yes", s)
4009         self.failUnlessIn("Total disk space: ?", s)
4010         self.failUnlessIn("Space Available to Tahoe: ?", s)
4011         self.failUnless(ss.get_available_space() is None)
4012
4013     @mock.patch('allmydata.util.fileutil.get_disk_stats')
4014     def test_status_bad_disk_stats(self, mock_get_disk_stats):
4015         mock_get_disk_stats.side_effect = OSError()
4016
4017         # If the API to get disk stats exists but a call to it fails, then the status should
4018         # show that no shares will be accepted, and get_available_space() should be 0.
4019         basedir = "storage/WebStatus/status_bad_disk_stats"
4020         fileutil.make_dirs(basedir)
4021         ss = StorageServer(basedir, "\x00" * 20)
4022         ss.setServiceParent(self.s)
4023         w = StorageStatus(ss)
4024         html = w.renderSynchronously()
4025         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4026         s = remove_tags(html)
4027         self.failUnlessIn("Accepting new shares: No", s)
4028         self.failUnlessIn("Total disk space: ?", s)
4029         self.failUnlessIn("Space Available to Tahoe: ?", s)
4030         self.failUnlessEqual(ss.get_available_space(), 0)
4031
4032     @mock.patch('allmydata.util.fileutil.get_disk_stats')
4033     def test_status_right_disk_stats(self, mock_get_disk_stats):
4034         GB = 1000000000
4035         total            = 5*GB
4036         free_for_root    = 4*GB
4037         free_for_nonroot = 3*GB
4038         reserved_space   = 1*GB
4039         used = total - free_for_root
4040         avail = max(free_for_nonroot - reserved_space, 0)
4041         mock_get_disk_stats.return_value = {
4042             'total': total,
4043             'free_for_root': free_for_root,
4044             'free_for_nonroot': free_for_nonroot,
4045             'used': used,
4046             'avail': avail,
4047         }
4048
4049         basedir = "storage/WebStatus/status_right_disk_stats"
4050         fileutil.make_dirs(basedir)
4051         ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved_space)
4052         expecteddir = ss.sharedir
4053         ss.setServiceParent(self.s)
4054         w = StorageStatus(ss)
4055         html = w.renderSynchronously()
4056
4057         self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
4058                     mock_get_disk_stats.call_args_list)
4059
4060         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4061         s = remove_tags(html)
4062         self.failUnlessIn("Total disk space: 5.00 GB", s)
4063         self.failUnlessIn("Disk space used: - 1.00 GB", s)
4064         self.failUnlessIn("Disk space free (root): 4.00 GB", s)
4065         self.failUnlessIn("Disk space free (non-root): 3.00 GB", s)
4066         self.failUnlessIn("Reserved space: - 1.00 GB", s)
4067         self.failUnlessIn("Space Available to Tahoe: 2.00 GB", s)
4068         self.failUnlessEqual(ss.get_available_space(), 2*GB)
4069
4070     def test_readonly(self):
4071         basedir = "storage/WebStatus/readonly"
4072         fileutil.make_dirs(basedir)
4073         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
4074         ss.setServiceParent(self.s)
4075         w = StorageStatus(ss)
4076         html = w.renderSynchronously()
4077         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4078         s = remove_tags(html)
4079         self.failUnlessIn("Accepting new shares: No", s)
4080
4081     def test_reserved(self):
4082         basedir = "storage/WebStatus/reserved"
4083         fileutil.make_dirs(basedir)
4084         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4085         ss.setServiceParent(self.s)
4086         w = StorageStatus(ss)
4087         html = w.renderSynchronously()
4088         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4089         s = remove_tags(html)
4090         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4091
4092     def test_huge_reserved(self):
4093         basedir = "storage/WebStatus/reserved"
4094         fileutil.make_dirs(basedir)
4095         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4096         ss.setServiceParent(self.s)
4097         w = StorageStatus(ss)
4098         html = w.renderSynchronously()
4099         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4100         s = remove_tags(html)
4101         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4102
4103     def test_util(self):
4104         w = StorageStatus(None)
4105         self.failUnlessEqual(w.render_space(None, None), "?")
4106         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
4107         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
4108         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
4109         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
4110         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)