]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
c63323ad59ca33071030da0e4631d0bf400ad77a
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1 import time, os.path, platform, stat, re, simplejson, struct, shutil
2
3 import mock
4
5 from twisted.trial import unittest
6
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
10 import itertools
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22      ReadBucketProxy
23 from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
24                                      LayoutInvalid, MDMFSIGNABLEHEADER, \
25                                      SIGNED_PREFIX, MDMFHEADER, \
26                                      MDMFOFFSETS, SDMFSlotWriteProxy, \
27                                      PRIVATE_KEY_SIZE, \
28                                      SIGNATURE_SIZE, \
29                                      VERIFICATION_KEY_SIZE, \
30                                      SHARE_HASH_CHAIN_SIZE
31 from allmydata.interfaces import BadWriteEnablerError
32 from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
33 from allmydata.test.common_web import WebRenderingMixin
34 from allmydata.test.no_network import NoNetworkServer
35 from allmydata.web.storage import StorageStatus, remove_prefix
36
37 class Marker:
38     pass
39 class FakeCanary:
40     def __init__(self, ignore_disconnectors=False):
41         self.ignore = ignore_disconnectors
42         self.disconnectors = {}
43     def notifyOnDisconnect(self, f, *args, **kwargs):
44         if self.ignore:
45             return
46         m = Marker()
47         self.disconnectors[m] = (f, args, kwargs)
48         return m
49     def dontNotifyOnDisconnect(self, marker):
50         if self.ignore:
51             return
52         del self.disconnectors[marker]
53
54 class FakeStatsProvider:
55     def count(self, name, delta=1):
56         pass
57     def register_producer(self, producer):
58         pass
59
60 class Bucket(unittest.TestCase):
61     def make_workdir(self, name):
62         basedir = os.path.join("storage", "Bucket", name)
63         incoming = os.path.join(basedir, "tmp", "bucket")
64         final = os.path.join(basedir, "bucket")
65         fileutil.make_dirs(basedir)
66         fileutil.make_dirs(os.path.join(basedir, "tmp"))
67         return incoming, final
68
69     def bucket_writer_closed(self, bw, consumed):
70         pass
71     def add_latency(self, category, latency):
72         pass
73     def count(self, name, delta=1):
74         pass
75
76     def make_lease(self):
77         owner_num = 0
78         renew_secret = os.urandom(32)
79         cancel_secret = os.urandom(32)
80         expiration_time = time.time() + 5000
81         return LeaseInfo(owner_num, renew_secret, cancel_secret,
82                          expiration_time, "\x00" * 20)
83
84     def test_create(self):
85         incoming, final = self.make_workdir("test_create")
86         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
87                           FakeCanary())
88         bw.remote_write(0, "a"*25)
89         bw.remote_write(25, "b"*25)
90         bw.remote_write(50, "c"*25)
91         bw.remote_write(75, "d"*7)
92         bw.remote_close()
93
94     def test_readwrite(self):
95         incoming, final = self.make_workdir("test_readwrite")
96         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
97                           FakeCanary())
98         bw.remote_write(0, "a"*25)
99         bw.remote_write(25, "b"*25)
100         bw.remote_write(50, "c"*7) # last block may be short
101         bw.remote_close()
102
103         # now read from it
104         br = BucketReader(self, bw.finalhome)
105         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
106         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
107         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
108
109     def test_read_past_end_of_share_data(self):
110         # test vector for immutable files (hard-coded contents of an immutable share
111         # file):
112
113         # The following immutable share file content is identical to that
114         # generated with storage.immutable.ShareFile from Tahoe-LAFS v1.8.2
115         # with share data == 'a'. The total size of this content is 85
116         # bytes.
117
118         containerdata = struct.pack('>LLL', 1, 1, 1)
119
120         # A Tahoe-LAFS storage client would send as the share_data a
121         # complicated string involving hash trees and a URI Extension Block
122         # -- see allmydata/immutable/layout.py . This test, which is
123         # simulating a client, just sends 'a'.
124         share_data = 'a'
125
126         ownernumber = struct.pack('>L', 0)
127         renewsecret  = 'THIS LETS ME RENEW YOUR FILE....'
128         assert len(renewsecret) == 32
129         cancelsecret = 'THIS LETS ME KILL YOUR FILE HAHA'
130         assert len(cancelsecret) == 32
131         expirationtime = struct.pack('>L', 60*60*24*31) # 31 days in seconds
132
133         lease_data = ownernumber + renewsecret + cancelsecret + expirationtime
134
135         share_file_data = containerdata + share_data + lease_data
136
137         incoming, final = self.make_workdir("test_read_past_end_of_share_data")
138
139         fileutil.write(final, share_file_data)
140
141         mockstorageserver = mock.Mock()
142
143         # Now read from it.
144         br = BucketReader(mockstorageserver, final)
145
146         self.failUnlessEqual(br.remote_read(0, len(share_data)), share_data)
147
148         # Read past the end of share data to get the cancel secret.
149         read_length = len(share_data) + len(ownernumber) + len(renewsecret) + len(cancelsecret)
150
151         result_of_read = br.remote_read(0, read_length)
152         self.failUnlessEqual(result_of_read, share_data)
153
154         result_of_read = br.remote_read(0, len(share_data)+1)
155         self.failUnlessEqual(result_of_read, share_data)
156
157 class RemoteBucket:
158
159     def __init__(self):
160         self.read_count = 0
161         self.write_count = 0
162
163     def callRemote(self, methname, *args, **kwargs):
164         def _call():
165             meth = getattr(self.target, "remote_" + methname)
166             return meth(*args, **kwargs)
167
168         if methname == "slot_readv":
169             self.read_count += 1
170         if "writev" in methname:
171             self.write_count += 1
172
173         return defer.maybeDeferred(_call)
174
175
176 class BucketProxy(unittest.TestCase):
177     def make_bucket(self, name, size):
178         basedir = os.path.join("storage", "BucketProxy", name)
179         incoming = os.path.join(basedir, "tmp", "bucket")
180         final = os.path.join(basedir, "bucket")
181         fileutil.make_dirs(basedir)
182         fileutil.make_dirs(os.path.join(basedir, "tmp"))
183         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
184                           FakeCanary())
185         rb = RemoteBucket()
186         rb.target = bw
187         return bw, rb, final
188
189     def make_lease(self):
190         owner_num = 0
191         renew_secret = os.urandom(32)
192         cancel_secret = os.urandom(32)
193         expiration_time = time.time() + 5000
194         return LeaseInfo(owner_num, renew_secret, cancel_secret,
195                          expiration_time, "\x00" * 20)
196
197     def bucket_writer_closed(self, bw, consumed):
198         pass
199     def add_latency(self, category, latency):
200         pass
201     def count(self, name, delta=1):
202         pass
203
204     def test_create(self):
205         bw, rb, sharefname = self.make_bucket("test_create", 500)
206         bp = WriteBucketProxy(rb, None,
207                               data_size=300,
208                               block_size=10,
209                               num_segments=5,
210                               num_share_hashes=3,
211                               uri_extension_size_max=500)
212         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
213
214     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
215         # Let's pretend each share has 100 bytes of data, and that there are
216         # 4 segments (25 bytes each), and 8 shares total. So the two
217         # per-segment merkle trees (crypttext_hash_tree,
218         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
219         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
220         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
221         # long. That should make the whole share:
222         #
223         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
224         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
225
226         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
227
228         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
229                             for i in range(7)]
230         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
231                         for i in range(7)]
232         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
233                         for i in (1,9,13)]
234         uri_extension = "s" + "E"*498 + "e"
235
236         bw, rb, sharefname = self.make_bucket(name, sharesize)
237         bp = wbp_class(rb, None,
238                        data_size=95,
239                        block_size=25,
240                        num_segments=4,
241                        num_share_hashes=3,
242                        uri_extension_size_max=len(uri_extension))
243
244         d = bp.put_header()
245         d.addCallback(lambda res: bp.put_block(0, "a"*25))
246         d.addCallback(lambda res: bp.put_block(1, "b"*25))
247         d.addCallback(lambda res: bp.put_block(2, "c"*25))
248         d.addCallback(lambda res: bp.put_block(3, "d"*20))
249         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
250         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
251         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
252         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
253         d.addCallback(lambda res: bp.close())
254
255         # now read everything back
256         def _start_reading(res):
257             br = BucketReader(self, sharefname)
258             rb = RemoteBucket()
259             rb.target = br
260             server = NoNetworkServer("abc", None)
261             rbp = rbp_class(rb, server, storage_index="")
262             self.failUnlessIn("to peer", repr(rbp))
263             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
264
265             d1 = rbp.get_block_data(0, 25, 25)
266             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
267             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
268             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
269             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
270             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
271             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
272             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
273
274             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
275             d1.addCallback(lambda res:
276                            self.failUnlessEqual(res, crypttext_hashes))
277             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
278             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
279             d1.addCallback(lambda res: rbp.get_share_hashes())
280             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
281             d1.addCallback(lambda res: rbp.get_uri_extension())
282             d1.addCallback(lambda res:
283                            self.failUnlessEqual(res, uri_extension))
284
285             return d1
286
287         d.addCallback(_start_reading)
288
289         return d
290
291     def test_readwrite_v1(self):
292         return self._do_test_readwrite("test_readwrite_v1",
293                                        0x24, WriteBucketProxy, ReadBucketProxy)
294
295     def test_readwrite_v2(self):
296         return self._do_test_readwrite("test_readwrite_v2",
297                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
298
299 class Server(unittest.TestCase):
300
301     def setUp(self):
302         self.sparent = LoggingServiceParent()
303         self.sparent.startService()
304         self._lease_secret = itertools.count()
305     def tearDown(self):
306         return self.sparent.stopService()
307
308     def workdir(self, name):
309         basedir = os.path.join("storage", "Server", name)
310         return basedir
311
312     def create(self, name, reserved_space=0, klass=StorageServer):
313         workdir = self.workdir(name)
314         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
315                    stats_provider=FakeStatsProvider())
316         ss.setServiceParent(self.sparent)
317         return ss
318
319     def test_create(self):
320         self.create("test_create")
321
322     def test_declares_fixed_1528(self):
323         ss = self.create("test_declares_fixed_1528")
324         ver = ss.remote_get_version()
325         sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
326         self.failUnless(sv1.get('prevents-read-past-end-of-share-data'), sv1)
327
328     def test_declares_maximum_share_sizes(self):
329         ss = self.create("test_declares_maximum_share_sizes")
330         ver = ss.remote_get_version()
331         sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
332         self.failUnlessIn('maximum-immutable-share-size', sv1)
333         self.failUnlessIn('maximum-mutable-share-size', sv1)
334
335     def test_declares_available_space(self):
336         ss = self.create("test_declares_available_space")
337         ver = ss.remote_get_version()
338         sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
339         self.failUnlessIn('available-space', sv1)
340
341     def allocate(self, ss, storage_index, sharenums, size, canary=None):
342         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
343         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
344         if not canary:
345             canary = FakeCanary()
346         return ss.remote_allocate_buckets(storage_index,
347                                           renew_secret, cancel_secret,
348                                           sharenums, size, canary)
349
350     def test_large_share(self):
351         syslow = platform.system().lower()
352         if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
353             raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
354
355         avail = fileutil.get_available_space('.', 512*2**20)
356         if avail <= 4*2**30:
357             raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
358
359         ss = self.create("test_large_share")
360
361         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
362         self.failUnlessEqual(already, set())
363         self.failUnlessEqual(set(writers.keys()), set([0]))
364
365         shnum, bucket = writers.items()[0]
366         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
367         bucket.remote_write(2**32, "ab")
368         bucket.remote_close()
369
370         readers = ss.remote_get_buckets("allocate")
371         reader = readers[shnum]
372         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
373
374     def test_dont_overfill_dirs(self):
375         """
376         This test asserts that if you add a second share whose storage index
377         share lots of leading bits with an extant share (but isn't the exact
378         same storage index), this won't add an entry to the share directory.
379         """
380         ss = self.create("test_dont_overfill_dirs")
381         already, writers = self.allocate(ss, "storageindex", [0], 10)
382         for i, wb in writers.items():
383             wb.remote_write(0, "%10d" % i)
384             wb.remote_close()
385         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
386                                 "shares")
387         children_of_storedir = set(os.listdir(storedir))
388
389         # Now store another one under another storageindex that has leading
390         # chars the same as the first storageindex.
391         already, writers = self.allocate(ss, "storageindey", [0], 10)
392         for i, wb in writers.items():
393             wb.remote_write(0, "%10d" % i)
394             wb.remote_close()
395         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
396                                 "shares")
397         new_children_of_storedir = set(os.listdir(storedir))
398         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
399
400     def test_remove_incoming(self):
401         ss = self.create("test_remove_incoming")
402         already, writers = self.allocate(ss, "vid", range(3), 10)
403         for i,wb in writers.items():
404             wb.remote_write(0, "%10d" % i)
405             wb.remote_close()
406         incoming_share_dir = wb.incominghome
407         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
408         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
409         incoming_dir = os.path.dirname(incoming_prefix_dir)
410         self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
411         self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
412         self.failUnless(os.path.exists(incoming_dir), incoming_dir)
413
414     def test_abort(self):
415         # remote_abort, when called on a writer, should make sure that
416         # the allocated size of the bucket is not counted by the storage
417         # server when accounting for space.
418         ss = self.create("test_abort")
419         already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
420         self.failIfEqual(ss.allocated_size(), 0)
421
422         # Now abort the writers.
423         for writer in writers.itervalues():
424             writer.remote_abort()
425         self.failUnlessEqual(ss.allocated_size(), 0)
426
427
428     def test_allocate(self):
429         ss = self.create("test_allocate")
430
431         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
432
433         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
434         self.failUnlessEqual(already, set())
435         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
436
437         # while the buckets are open, they should not count as readable
438         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
439
440         # close the buckets
441         for i,wb in writers.items():
442             wb.remote_write(0, "%25d" % i)
443             wb.remote_close()
444             # aborting a bucket that was already closed is a no-op
445             wb.remote_abort()
446
447         # now they should be readable
448         b = ss.remote_get_buckets("allocate")
449         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
450         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
451         b_str = str(b[0])
452         self.failUnlessIn("BucketReader", b_str)
453         self.failUnlessIn("mfwgy33dmf2g 0", b_str)
454
455         # now if we ask about writing again, the server should offer those
456         # three buckets as already present. It should offer them even if we
457         # don't ask about those specific ones.
458         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
459         self.failUnlessEqual(already, set([0,1,2]))
460         self.failUnlessEqual(set(writers.keys()), set([3,4]))
461
462         # while those two buckets are open for writing, the server should
463         # refuse to offer them to uploaders
464
465         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
466         self.failUnlessEqual(already2, set([0,1,2]))
467         self.failUnlessEqual(set(writers2.keys()), set([5]))
468
469         # aborting the writes should remove the tempfiles
470         for i,wb in writers2.items():
471             wb.remote_abort()
472         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
473         self.failUnlessEqual(already2, set([0,1,2]))
474         self.failUnlessEqual(set(writers2.keys()), set([5]))
475
476         for i,wb in writers2.items():
477             wb.remote_abort()
478         for i,wb in writers.items():
479             wb.remote_abort()
480
481     def test_bad_container_version(self):
482         ss = self.create("test_bad_container_version")
483         a,w = self.allocate(ss, "si1", [0], 10)
484         w[0].remote_write(0, "\xff"*10)
485         w[0].remote_close()
486
487         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
488         f = open(fn, "rb+")
489         f.seek(0)
490         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
491         f.close()
492
493         ss.remote_get_buckets("allocate")
494
495         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
496                                   ss.remote_get_buckets, "si1")
497         self.failUnlessIn(" had version 0 but we wanted 1", str(e))
498
499     def test_disconnect(self):
500         # simulate a disconnection
501         ss = self.create("test_disconnect")
502         canary = FakeCanary()
503         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
504         self.failUnlessEqual(already, set())
505         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
506         for (f,args,kwargs) in canary.disconnectors.values():
507             f(*args, **kwargs)
508         del already
509         del writers
510
511         # that ought to delete the incoming shares
512         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
513         self.failUnlessEqual(already, set())
514         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
515
516     @mock.patch('allmydata.util.fileutil.get_disk_stats')
517     def test_reserved_space(self, mock_get_disk_stats):
518         reserved_space=10000
519         mock_get_disk_stats.return_value = {
520             'free_for_nonroot': 15000,
521             'avail': max(15000 - reserved_space, 0),
522             }
523
524         ss = self.create("test_reserved_space", reserved_space=reserved_space)
525         # 15k available, 10k reserved, leaves 5k for shares
526
527         # a newly created and filled share incurs this much overhead, beyond
528         # the size we request.
529         OVERHEAD = 3*4
530         LEASE_SIZE = 4+32+32+4
531         canary = FakeCanary(True)
532         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
533         self.failUnlessEqual(len(writers), 3)
534         # now the StorageServer should have 3000 bytes provisionally
535         # allocated, allowing only 2000 more to be claimed
536         self.failUnlessEqual(len(ss._active_writers), 3)
537
538         # allocating 1001-byte shares only leaves room for one
539         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
540         self.failUnlessEqual(len(writers2), 1)
541         self.failUnlessEqual(len(ss._active_writers), 4)
542
543         # we abandon the first set, so their provisional allocation should be
544         # returned
545         del already
546         del writers
547         self.failUnlessEqual(len(ss._active_writers), 1)
548         # now we have a provisional allocation of 1001 bytes
549
550         # and we close the second set, so their provisional allocation should
551         # become real, long-term allocation, and grows to include the
552         # overhead.
553         for bw in writers2.values():
554             bw.remote_write(0, "a"*25)
555             bw.remote_close()
556         del already2
557         del writers2
558         del bw
559         self.failUnlessEqual(len(ss._active_writers), 0)
560
561         allocated = 1001 + OVERHEAD + LEASE_SIZE
562
563         # we have to manually increase available, since we're not doing real
564         # disk measurements
565         mock_get_disk_stats.return_value = {
566             'free_for_nonroot': 15000 - allocated,
567             'avail': max(15000 - allocated - reserved_space, 0),
568             }
569
570         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
571         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
572         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
573         self.failUnlessEqual(len(writers3), 39)
574         self.failUnlessEqual(len(ss._active_writers), 39)
575
576         del already3
577         del writers3
578         self.failUnlessEqual(len(ss._active_writers), 0)
579         ss.disownServiceParent()
580         del ss
581
582     def test_seek(self):
583         basedir = self.workdir("test_seek_behavior")
584         fileutil.make_dirs(basedir)
585         filename = os.path.join(basedir, "testfile")
586         f = open(filename, "wb")
587         f.write("start")
588         f.close()
589         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
590         # files. mode="a" preserves previous contents but does not allow
591         # seeking-to-create-holes. mode="r+" allows both.
592         f = open(filename, "rb+")
593         f.seek(100)
594         f.write("100")
595         f.close()
596         filelen = os.stat(filename)[stat.ST_SIZE]
597         self.failUnlessEqual(filelen, 100+3)
598         f2 = open(filename, "rb")
599         self.failUnlessEqual(f2.read(5), "start")
600
601
602     def test_leases(self):
603         ss = self.create("test_leases")
604         canary = FakeCanary()
605         sharenums = range(5)
606         size = 100
607
608         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
609                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
610         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
611                                                      sharenums, size, canary)
612         self.failUnlessEqual(len(already), 0)
613         self.failUnlessEqual(len(writers), 5)
614         for wb in writers.values():
615             wb.remote_close()
616
617         leases = list(ss.get_leases("si0"))
618         self.failUnlessEqual(len(leases), 1)
619         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
620
621         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
622                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
623         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
624                                                      sharenums, size, canary)
625         for wb in writers.values():
626             wb.remote_close()
627
628         # take out a second lease on si1
629         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
630                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
631         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
632                                                      sharenums, size, canary)
633         self.failUnlessEqual(len(already), 5)
634         self.failUnlessEqual(len(writers), 0)
635
636         leases = list(ss.get_leases("si1"))
637         self.failUnlessEqual(len(leases), 2)
638         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
639
640         # and a third lease, using add-lease
641         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
642                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
643         ss.remote_add_lease("si1", rs2a, cs2a)
644         leases = list(ss.get_leases("si1"))
645         self.failUnlessEqual(len(leases), 3)
646         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
647
648         # add-lease on a missing storage index is silently ignored
649         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
650
651         # check that si0 is readable
652         readers = ss.remote_get_buckets("si0")
653         self.failUnlessEqual(len(readers), 5)
654
655         # renew the first lease. Only the proper renew_secret should work
656         ss.remote_renew_lease("si0", rs0)
657         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
658         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
659
660         # check that si0 is still readable
661         readers = ss.remote_get_buckets("si0")
662         self.failUnlessEqual(len(readers), 5)
663
664         # There is no such method as remote_cancel_lease for now -- see
665         # ticket #1528.
666         self.failIf(hasattr(ss, 'remote_cancel_lease'), \
667                         "ss should not have a 'remote_cancel_lease' method/attribute")
668
669         # test overlapping uploads
670         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
671                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
672         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
673                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
674         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
675                                                      sharenums, size, canary)
676         self.failUnlessEqual(len(already), 0)
677         self.failUnlessEqual(len(writers), 5)
678         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
679                                                        sharenums, size, canary)
680         self.failUnlessEqual(len(already2), 0)
681         self.failUnlessEqual(len(writers2), 0)
682         for wb in writers.values():
683             wb.remote_close()
684
685         leases = list(ss.get_leases("si3"))
686         self.failUnlessEqual(len(leases), 1)
687
688         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
689                                                        sharenums, size, canary)
690         self.failUnlessEqual(len(already3), 5)
691         self.failUnlessEqual(len(writers3), 0)
692
693         leases = list(ss.get_leases("si3"))
694         self.failUnlessEqual(len(leases), 2)
695
696     def test_readonly(self):
697         workdir = self.workdir("test_readonly")
698         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
699         ss.setServiceParent(self.sparent)
700
701         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
702         self.failUnlessEqual(already, set())
703         self.failUnlessEqual(writers, {})
704
705         stats = ss.get_stats()
706         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
707         if "storage_server.disk_avail" in stats:
708             # Some platforms may not have an API to get disk stats.
709             # But if there are stats, readonly_storage means disk_avail=0
710             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
711
712     def test_discard(self):
713         # discard is really only used for other tests, but we test it anyways
714         workdir = self.workdir("test_discard")
715         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
716         ss.setServiceParent(self.sparent)
717
718         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
719         self.failUnlessEqual(already, set())
720         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
721         for i,wb in writers.items():
722             wb.remote_write(0, "%25d" % i)
723             wb.remote_close()
724         # since we discard the data, the shares should be present but sparse.
725         # Since we write with some seeks, the data we read back will be all
726         # zeros.
727         b = ss.remote_get_buckets("vid")
728         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
729         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
730
731     def test_advise_corruption(self):
732         workdir = self.workdir("test_advise_corruption")
733         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
734         ss.setServiceParent(self.sparent)
735
736         si0_s = base32.b2a("si0")
737         ss.remote_advise_corrupt_share("immutable", "si0", 0,
738                                        "This share smells funny.\n")
739         reportdir = os.path.join(workdir, "corruption-advisories")
740         reports = os.listdir(reportdir)
741         self.failUnlessEqual(len(reports), 1)
742         report_si0 = reports[0]
743         self.failUnlessIn(si0_s, report_si0)
744         f = open(os.path.join(reportdir, report_si0), "r")
745         report = f.read()
746         f.close()
747         self.failUnlessIn("type: immutable", report)
748         self.failUnlessIn("storage_index: %s" % si0_s, report)
749         self.failUnlessIn("share_number: 0", report)
750         self.failUnlessIn("This share smells funny.", report)
751
752         # test the RIBucketWriter version too
753         si1_s = base32.b2a("si1")
754         already,writers = self.allocate(ss, "si1", [1], 75)
755         self.failUnlessEqual(already, set())
756         self.failUnlessEqual(set(writers.keys()), set([1]))
757         writers[1].remote_write(0, "data")
758         writers[1].remote_close()
759
760         b = ss.remote_get_buckets("si1")
761         self.failUnlessEqual(set(b.keys()), set([1]))
762         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
763
764         reports = os.listdir(reportdir)
765         self.failUnlessEqual(len(reports), 2)
766         report_si1 = [r for r in reports if si1_s in r][0]
767         f = open(os.path.join(reportdir, report_si1), "r")
768         report = f.read()
769         f.close()
770         self.failUnlessIn("type: immutable", report)
771         self.failUnlessIn("storage_index: %s" % si1_s, report)
772         self.failUnlessIn("share_number: 1", report)
773         self.failUnlessIn("This share tastes like dust.", report)
774
775
776
777 class MutableServer(unittest.TestCase):
778
779     def setUp(self):
780         self.sparent = LoggingServiceParent()
781         self._lease_secret = itertools.count()
782     def tearDown(self):
783         return self.sparent.stopService()
784
785     def workdir(self, name):
786         basedir = os.path.join("storage", "MutableServer", name)
787         return basedir
788
789     def create(self, name):
790         workdir = self.workdir(name)
791         ss = StorageServer(workdir, "\x00" * 20)
792         ss.setServiceParent(self.sparent)
793         return ss
794
795     def test_create(self):
796         self.create("test_create")
797
798     def write_enabler(self, we_tag):
799         return hashutil.tagged_hash("we_blah", we_tag)
800
801     def renew_secret(self, tag):
802         return hashutil.tagged_hash("renew_blah", str(tag))
803
804     def cancel_secret(self, tag):
805         return hashutil.tagged_hash("cancel_blah", str(tag))
806
807     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
808         write_enabler = self.write_enabler(we_tag)
809         renew_secret = self.renew_secret(lease_tag)
810         cancel_secret = self.cancel_secret(lease_tag)
811         rstaraw = ss.remote_slot_testv_and_readv_and_writev
812         testandwritev = dict( [ (shnum, ([], [], None) )
813                          for shnum in sharenums ] )
814         readv = []
815         rc = rstaraw(storage_index,
816                      (write_enabler, renew_secret, cancel_secret),
817                      testandwritev,
818                      readv)
819         (did_write, readv_data) = rc
820         self.failUnless(did_write)
821         self.failUnless(isinstance(readv_data, dict))
822         self.failUnlessEqual(len(readv_data), 0)
823
824     def test_bad_magic(self):
825         ss = self.create("test_bad_magic")
826         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
827         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
828         f = open(fn, "rb+")
829         f.seek(0)
830         f.write("BAD MAGIC")
831         f.close()
832         read = ss.remote_slot_readv
833         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
834                                   read, "si1", [0], [(0,10)])
835         self.failUnlessIn(" had magic ", str(e))
836         self.failUnlessIn(" but we wanted ", str(e))
837
838     def test_container_size(self):
839         ss = self.create("test_container_size")
840         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
841                       set([0,1,2]), 100)
842         read = ss.remote_slot_readv
843         rstaraw = ss.remote_slot_testv_and_readv_and_writev
844         secrets = ( self.write_enabler("we1"),
845                     self.renew_secret("we1"),
846                     self.cancel_secret("we1") )
847         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
848         answer = rstaraw("si1", secrets,
849                          {0: ([], [(0,data)], len(data)+12)},
850                          [])
851         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
852
853         # Trying to make the container too large (by sending a write vector
854         # whose offset is too high) will raise an exception.
855         TOOBIG = MutableShareFile.MAX_SIZE + 10
856         self.failUnlessRaises(DataTooLargeError,
857                               rstaraw, "si1", secrets,
858                               {0: ([], [(TOOBIG,data)], None)},
859                               [])
860
861         answer = rstaraw("si1", secrets,
862                          {0: ([], [(0,data)], None)},
863                          [])
864         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
865
866         read_answer = read("si1", [0], [(0,10)])
867         self.failUnlessEqual(read_answer, {0: [data[:10]]})
868
869         # Sending a new_length shorter than the current length truncates the
870         # data.
871         answer = rstaraw("si1", secrets,
872                          {0: ([], [], 9)},
873                          [])
874         read_answer = read("si1", [0], [(0,10)])
875         self.failUnlessEqual(read_answer, {0: [data[:9]]})
876
877         # Sending a new_length longer than the current length doesn't change
878         # the data.
879         answer = rstaraw("si1", secrets,
880                          {0: ([], [], 20)},
881                          [])
882         assert answer == (True, {0:[],1:[],2:[]})
883         read_answer = read("si1", [0], [(0, 20)])
884         self.failUnlessEqual(read_answer, {0: [data[:9]]})
885
886         # Sending a write vector whose start is after the end of the current
887         # data doesn't reveal "whatever was there last time" (palimpsest),
888         # but instead fills with zeroes.
889
890         # To test this, we fill the data area with a recognizable pattern.
891         pattern = ''.join([chr(i) for i in range(100)])
892         answer = rstaraw("si1", secrets,
893                          {0: ([], [(0, pattern)], None)},
894                          [])
895         assert answer == (True, {0:[],1:[],2:[]})
896         # Then truncate the data...
897         answer = rstaraw("si1", secrets,
898                          {0: ([], [], 20)},
899                          [])
900         assert answer == (True, {0:[],1:[],2:[]})
901         # Just confirm that you get an empty string if you try to read from
902         # past the (new) endpoint now.
903         answer = rstaraw("si1", secrets,
904                          {0: ([], [], None)},
905                          [(20, 1980)])
906         self.failUnlessEqual(answer, (True, {0:[''],1:[''],2:['']}))
907
908         # Then the extend the file by writing a vector which starts out past
909         # the end...
910         answer = rstaraw("si1", secrets,
911                          {0: ([], [(50, 'hellothere')], None)},
912                          [])
913         assert answer == (True, {0:[],1:[],2:[]})
914         # Now if you read the stuff between 20 (where we earlier truncated)
915         # and 50, it had better be all zeroes.
916         answer = rstaraw("si1", secrets,
917                          {0: ([], [], None)},
918                          [(20, 30)])
919         self.failUnlessEqual(answer, (True, {0:['\x00'*30],1:[''],2:['']}))
920
921         # Also see if the server explicitly declares that it supports this
922         # feature.
923         ver = ss.remote_get_version()
924         storage_v1_ver = ver["http://allmydata.org/tahoe/protocols/storage/v1"]
925         self.failUnless(storage_v1_ver.get("fills-holes-with-zero-bytes"))
926
927         # If the size is dropped to zero the share is deleted.
928         answer = rstaraw("si1", secrets,
929                          {0: ([], [(0,data)], 0)},
930                          [])
931         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
932
933         read_answer = read("si1", [0], [(0,10)])
934         self.failUnlessEqual(read_answer, {})
935
936     def test_allocate(self):
937         ss = self.create("test_allocate")
938         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
939                       set([0,1,2]), 100)
940
941         read = ss.remote_slot_readv
942         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
943                              {0: [""]})
944         self.failUnlessEqual(read("si1", [], [(0, 10)]),
945                              {0: [""], 1: [""], 2: [""]})
946         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
947                              {0: [""]})
948
949         # try writing to one
950         secrets = ( self.write_enabler("we1"),
951                     self.renew_secret("we1"),
952                     self.cancel_secret("we1") )
953         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
954         write = ss.remote_slot_testv_and_readv_and_writev
955         answer = write("si1", secrets,
956                        {0: ([], [(0,data)], None)},
957                        [])
958         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
959
960         self.failUnlessEqual(read("si1", [0], [(0,20)]),
961                              {0: ["00000000001111111111"]})
962         self.failUnlessEqual(read("si1", [0], [(95,10)]),
963                              {0: ["99999"]})
964         #self.failUnlessEqual(s0.remote_get_length(), 100)
965
966         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
967         f = self.failUnlessRaises(BadWriteEnablerError,
968                                   write, "si1", bad_secrets,
969                                   {}, [])
970         self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
971
972         # this testv should fail
973         answer = write("si1", secrets,
974                        {0: ([(0, 12, "eq", "444444444444"),
975                              (20, 5, "eq", "22222"),
976                              ],
977                             [(0, "x"*100)],
978                             None),
979                         },
980                        [(0,12), (20,5)],
981                        )
982         self.failUnlessEqual(answer, (False,
983                                       {0: ["000000000011", "22222"],
984                                        1: ["", ""],
985                                        2: ["", ""],
986                                        }))
987         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
988
989         # as should this one
990         answer = write("si1", secrets,
991                        {0: ([(10, 5, "lt", "11111"),
992                              ],
993                             [(0, "x"*100)],
994                             None),
995                         },
996                        [(10,5)],
997                        )
998         self.failUnlessEqual(answer, (False,
999                                       {0: ["11111"],
1000                                        1: [""],
1001                                        2: [""]},
1002                                       ))
1003         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1004
1005
1006     def test_operators(self):
1007         # test operators, the data we're comparing is '11111' in all cases.
1008         # test both fail+pass, reset data after each one.
1009         ss = self.create("test_operators")
1010
1011         secrets = ( self.write_enabler("we1"),
1012                     self.renew_secret("we1"),
1013                     self.cancel_secret("we1") )
1014         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1015         write = ss.remote_slot_testv_and_readv_and_writev
1016         read = ss.remote_slot_readv
1017
1018         def reset():
1019             write("si1", secrets,
1020                   {0: ([], [(0,data)], None)},
1021                   [])
1022
1023         reset()
1024
1025         #  lt
1026         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
1027                                              ],
1028                                             [(0, "x"*100)],
1029                                             None,
1030                                             )}, [(10,5)])
1031         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1032         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1033         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
1034         reset()
1035
1036         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
1037                                              ],
1038                                             [(0, "x"*100)],
1039                                             None,
1040                                             )}, [(10,5)])
1041         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1042         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1043         reset()
1044
1045         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
1046                                              ],
1047                                             [(0, "y"*100)],
1048                                             None,
1049                                             )}, [(10,5)])
1050         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1051         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1052         reset()
1053
1054         #  le
1055         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
1056                                              ],
1057                                             [(0, "x"*100)],
1058                                             None,
1059                                             )}, [(10,5)])
1060         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1061         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1062         reset()
1063
1064         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
1065                                              ],
1066                                             [(0, "y"*100)],
1067                                             None,
1068                                             )}, [(10,5)])
1069         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1070         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1071         reset()
1072
1073         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
1074                                              ],
1075                                             [(0, "y"*100)],
1076                                             None,
1077                                             )}, [(10,5)])
1078         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1079         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1080         reset()
1081
1082         #  eq
1083         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
1084                                              ],
1085                                             [(0, "x"*100)],
1086                                             None,
1087                                             )}, [(10,5)])
1088         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1089         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1090         reset()
1091
1092         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
1093                                              ],
1094                                             [(0, "y"*100)],
1095                                             None,
1096                                             )}, [(10,5)])
1097         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1098         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1099         reset()
1100
1101         #  ne
1102         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
1103                                              ],
1104                                             [(0, "x"*100)],
1105                                             None,
1106                                             )}, [(10,5)])
1107         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1108         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1109         reset()
1110
1111         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1112                                              ],
1113                                             [(0, "y"*100)],
1114                                             None,
1115                                             )}, [(10,5)])
1116         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1117         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1118         reset()
1119
1120         #  ge
1121         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1122                                              ],
1123                                             [(0, "y"*100)],
1124                                             None,
1125                                             )}, [(10,5)])
1126         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1127         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1128         reset()
1129
1130         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1131                                              ],
1132                                             [(0, "y"*100)],
1133                                             None,
1134                                             )}, [(10,5)])
1135         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1136         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1137         reset()
1138
1139         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1140                                              ],
1141                                             [(0, "y"*100)],
1142                                             None,
1143                                             )}, [(10,5)])
1144         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1145         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1146         reset()
1147
1148         #  gt
1149         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1150                                              ],
1151                                             [(0, "y"*100)],
1152                                             None,
1153                                             )}, [(10,5)])
1154         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1155         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1156         reset()
1157
1158         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1159                                              ],
1160                                             [(0, "x"*100)],
1161                                             None,
1162                                             )}, [(10,5)])
1163         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1164         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1165         reset()
1166
1167         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1168                                              ],
1169                                             [(0, "x"*100)],
1170                                             None,
1171                                             )}, [(10,5)])
1172         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1173         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1174         reset()
1175
1176         # finally, test some operators against empty shares
1177         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1178                                              ],
1179                                             [(0, "x"*100)],
1180                                             None,
1181                                             )}, [(10,5)])
1182         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1183         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1184         reset()
1185
1186     def test_readv(self):
1187         ss = self.create("test_readv")
1188         secrets = ( self.write_enabler("we1"),
1189                     self.renew_secret("we1"),
1190                     self.cancel_secret("we1") )
1191         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1192         write = ss.remote_slot_testv_and_readv_and_writev
1193         read = ss.remote_slot_readv
1194         data = [("%d" % i) * 100 for i in range(3)]
1195         rc = write("si1", secrets,
1196                    {0: ([], [(0,data[0])], None),
1197                     1: ([], [(0,data[1])], None),
1198                     2: ([], [(0,data[2])], None),
1199                     }, [])
1200         self.failUnlessEqual(rc, (True, {}))
1201
1202         answer = read("si1", [], [(0, 10)])
1203         self.failUnlessEqual(answer, {0: ["0"*10],
1204                                       1: ["1"*10],
1205                                       2: ["2"*10]})
1206
1207     def compare_leases_without_timestamps(self, leases_a, leases_b):
1208         self.failUnlessEqual(len(leases_a), len(leases_b))
1209         for i in range(len(leases_a)):
1210             a = leases_a[i]
1211             b = leases_b[i]
1212             self.failUnlessEqual(a.owner_num,       b.owner_num)
1213             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1214             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1215             self.failUnlessEqual(a.nodeid,          b.nodeid)
1216
1217     def compare_leases(self, leases_a, leases_b):
1218         self.failUnlessEqual(len(leases_a), len(leases_b))
1219         for i in range(len(leases_a)):
1220             a = leases_a[i]
1221             b = leases_b[i]
1222             self.failUnlessEqual(a.owner_num,       b.owner_num)
1223             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1224             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1225             self.failUnlessEqual(a.nodeid,          b.nodeid)
1226             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1227
1228     def test_leases(self):
1229         ss = self.create("test_leases")
1230         def secrets(n):
1231             return ( self.write_enabler("we1"),
1232                      self.renew_secret("we1-%d" % n),
1233                      self.cancel_secret("we1-%d" % n) )
1234         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1235         write = ss.remote_slot_testv_and_readv_and_writev
1236         read = ss.remote_slot_readv
1237         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1238         self.failUnlessEqual(rc, (True, {}))
1239
1240         # create a random non-numeric file in the bucket directory, to
1241         # exercise the code that's supposed to ignore those.
1242         bucket_dir = os.path.join(self.workdir("test_leases"),
1243                                   "shares", storage_index_to_dir("si1"))
1244         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1245         f.write("you ought to be ignoring me\n")
1246         f.close()
1247
1248         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1249         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1250
1251         # add-lease on a missing storage index is silently ignored
1252         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1253
1254         # re-allocate the slots and use the same secrets, that should update
1255         # the lease
1256         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1257         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1258
1259         # renew it directly
1260         ss.remote_renew_lease("si1", secrets(0)[1])
1261         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1262
1263         # now allocate them with a bunch of different secrets, to trigger the
1264         # extended lease code. Use add_lease for one of them.
1265         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1266         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1267         secrets2 = secrets(2)
1268         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1269         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1270         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1271         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1272         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1273
1274         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1275
1276         all_leases = list(s0.get_leases())
1277         # and write enough data to expand the container, forcing the server
1278         # to move the leases
1279         write("si1", secrets(0),
1280               {0: ([], [(0,data)], 200), },
1281               [])
1282
1283         # read back the leases, make sure they're still intact.
1284         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1285
1286         ss.remote_renew_lease("si1", secrets(0)[1])
1287         ss.remote_renew_lease("si1", secrets(1)[1])
1288         ss.remote_renew_lease("si1", secrets(2)[1])
1289         ss.remote_renew_lease("si1", secrets(3)[1])
1290         ss.remote_renew_lease("si1", secrets(4)[1])
1291         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1292         # get a new copy of the leases, with the current timestamps. Reading
1293         # data and failing to renew/cancel leases should leave the timestamps
1294         # alone.
1295         all_leases = list(s0.get_leases())
1296         # renewing with a bogus token should prompt an error message
1297
1298         # examine the exception thus raised, make sure the old nodeid is
1299         # present, to provide for share migration
1300         e = self.failUnlessRaises(IndexError,
1301                                   ss.remote_renew_lease, "si1",
1302                                   secrets(20)[1])
1303         e_s = str(e)
1304         self.failUnlessIn("Unable to renew non-existent lease", e_s)
1305         self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1306         self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1307
1308         self.compare_leases(all_leases, list(s0.get_leases()))
1309
1310         # reading shares should not modify the timestamp
1311         read("si1", [], [(0,200)])
1312         self.compare_leases(all_leases, list(s0.get_leases()))
1313
1314         write("si1", secrets(0),
1315               {0: ([], [(200, "make me bigger")], None)}, [])
1316         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1317
1318         write("si1", secrets(0),
1319               {0: ([], [(500, "make me really bigger")], None)}, [])
1320         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1321
1322     def test_remove(self):
1323         ss = self.create("test_remove")
1324         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1325                       set([0,1,2]), 100)
1326         readv = ss.remote_slot_readv
1327         writev = ss.remote_slot_testv_and_readv_and_writev
1328         secrets = ( self.write_enabler("we1"),
1329                     self.renew_secret("we1"),
1330                     self.cancel_secret("we1") )
1331         # delete sh0 by setting its size to zero
1332         answer = writev("si1", secrets,
1333                         {0: ([], [], 0)},
1334                         [])
1335         # the answer should mention all the shares that existed before the
1336         # write
1337         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1338         # but a new read should show only sh1 and sh2
1339         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1340                              {1: [""], 2: [""]})
1341
1342         # delete sh1 by setting its size to zero
1343         answer = writev("si1", secrets,
1344                         {1: ([], [], 0)},
1345                         [])
1346         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1347         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1348                              {2: [""]})
1349
1350         # delete sh2 by setting its size to zero
1351         answer = writev("si1", secrets,
1352                         {2: ([], [], 0)},
1353                         [])
1354         self.failUnlessEqual(answer, (True, {2:[]}) )
1355         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1356                              {})
1357         # and the bucket directory should now be gone
1358         si = base32.b2a("si1")
1359         # note: this is a detail of the storage server implementation, and
1360         # may change in the future
1361         prefix = si[:2]
1362         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1363         bucketdir = os.path.join(prefixdir, si)
1364         self.failUnless(os.path.exists(prefixdir), prefixdir)
1365         self.failIf(os.path.exists(bucketdir), bucketdir)
1366
1367
1368 class MDMFProxies(unittest.TestCase, ShouldFailMixin):
1369     def setUp(self):
1370         self.sparent = LoggingServiceParent()
1371         self._lease_secret = itertools.count()
1372         self.ss = self.create("MDMFProxies storage test server")
1373         self.rref = RemoteBucket()
1374         self.rref.target = self.ss
1375         self.secrets = (self.write_enabler("we_secret"),
1376                         self.renew_secret("renew_secret"),
1377                         self.cancel_secret("cancel_secret"))
1378         self.segment = "aaaaaa"
1379         self.block = "aa"
1380         self.salt = "a" * 16
1381         self.block_hash = "a" * 32
1382         self.block_hash_tree = [self.block_hash for i in xrange(6)]
1383         self.share_hash = self.block_hash
1384         self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
1385         self.signature = "foobarbaz"
1386         self.verification_key = "vvvvvv"
1387         self.encprivkey = "private"
1388         self.root_hash = self.block_hash
1389         self.salt_hash = self.root_hash
1390         self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
1391         self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
1392         self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
1393         # blockhashes and salt hashes are serialized in the same way,
1394         # only we lop off the first element and store that in the
1395         # header.
1396         self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
1397
1398
1399     def tearDown(self):
1400         self.sparent.stopService()
1401         shutil.rmtree(self.workdir("MDMFProxies storage test server"))
1402
1403
1404     def write_enabler(self, we_tag):
1405         return hashutil.tagged_hash("we_blah", we_tag)
1406
1407
1408     def renew_secret(self, tag):
1409         return hashutil.tagged_hash("renew_blah", str(tag))
1410
1411
1412     def cancel_secret(self, tag):
1413         return hashutil.tagged_hash("cancel_blah", str(tag))
1414
1415
1416     def workdir(self, name):
1417         basedir = os.path.join("storage", "MutableServer", name)
1418         return basedir
1419
1420
1421     def create(self, name):
1422         workdir = self.workdir(name)
1423         ss = StorageServer(workdir, "\x00" * 20)
1424         ss.setServiceParent(self.sparent)
1425         return ss
1426
1427
1428     def build_test_mdmf_share(self, tail_segment=False, empty=False):
1429         # Start with the checkstring
1430         data = struct.pack(">BQ32s",
1431                            1,
1432                            0,
1433                            self.root_hash)
1434         self.checkstring = data
1435         # Next, the encoding parameters
1436         if tail_segment:
1437             data += struct.pack(">BBQQ",
1438                                 3,
1439                                 10,
1440                                 6,
1441                                 33)
1442         elif empty:
1443             data += struct.pack(">BBQQ",
1444                                 3,
1445                                 10,
1446                                 0,
1447                                 0)
1448         else:
1449             data += struct.pack(">BBQQ",
1450                                 3,
1451                                 10,
1452                                 6,
1453                                 36)
1454         # Now we'll build the offsets.
1455         sharedata = ""
1456         if not tail_segment and not empty:
1457             for i in xrange(6):
1458                 sharedata += self.salt + self.block
1459         elif tail_segment:
1460             for i in xrange(5):
1461                 sharedata += self.salt + self.block
1462             sharedata += self.salt + "a"
1463
1464         # The encrypted private key comes after the shares + salts
1465         offset_size = struct.calcsize(MDMFOFFSETS)
1466         encrypted_private_key_offset = len(data) + offset_size
1467         # The share has chain comes after the private key
1468         sharehashes_offset = encrypted_private_key_offset + \
1469             len(self.encprivkey)
1470
1471         # The signature comes after the share hash chain.
1472         signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
1473
1474         verification_key_offset = signature_offset + len(self.signature)
1475         verification_key_end = verification_key_offset + \
1476             len(self.verification_key)
1477
1478         share_data_offset = offset_size
1479         share_data_offset += PRIVATE_KEY_SIZE
1480         share_data_offset += SIGNATURE_SIZE
1481         share_data_offset += VERIFICATION_KEY_SIZE
1482         share_data_offset += SHARE_HASH_CHAIN_SIZE
1483
1484         blockhashes_offset = share_data_offset + len(sharedata)
1485         eof_offset = blockhashes_offset + len(self.block_hash_tree_s)
1486
1487         data += struct.pack(MDMFOFFSETS,
1488                             encrypted_private_key_offset,
1489                             sharehashes_offset,
1490                             signature_offset,
1491                             verification_key_offset,
1492                             verification_key_end,
1493                             share_data_offset,
1494                             blockhashes_offset,
1495                             eof_offset)
1496
1497         self.offsets = {}
1498         self.offsets['enc_privkey'] = encrypted_private_key_offset
1499         self.offsets['block_hash_tree'] = blockhashes_offset
1500         self.offsets['share_hash_chain'] = sharehashes_offset
1501         self.offsets['signature'] = signature_offset
1502         self.offsets['verification_key'] = verification_key_offset
1503         self.offsets['share_data'] = share_data_offset
1504         self.offsets['verification_key_end'] = verification_key_end
1505         self.offsets['EOF'] = eof_offset
1506
1507         # the private key,
1508         data += self.encprivkey
1509         # the sharehashes
1510         data += self.share_hash_chain_s
1511         # the signature,
1512         data += self.signature
1513         # and the verification key
1514         data += self.verification_key
1515         # Then we'll add in gibberish until we get to the right point.
1516         nulls = "".join([" " for i in xrange(len(data), share_data_offset)])
1517         data += nulls
1518
1519         # Then the share data
1520         data += sharedata
1521         # the blockhashes
1522         data += self.block_hash_tree_s
1523         return data
1524
1525
1526     def write_test_share_to_server(self,
1527                                    storage_index,
1528                                    tail_segment=False,
1529                                    empty=False):
1530         """
1531         I write some data for the read tests to read to self.ss
1532
1533         If tail_segment=True, then I will write a share that has a
1534         smaller tail segment than other segments.
1535         """
1536         write = self.ss.remote_slot_testv_and_readv_and_writev
1537         data = self.build_test_mdmf_share(tail_segment, empty)
1538         # Finally, we write the whole thing to the storage server in one
1539         # pass.
1540         testvs = [(0, 1, "eq", "")]
1541         tws = {}
1542         tws[0] = (testvs, [(0, data)], None)
1543         readv = [(0, 1)]
1544         results = write(storage_index, self.secrets, tws, readv)
1545         self.failUnless(results[0])
1546
1547
1548     def build_test_sdmf_share(self, empty=False):
1549         if empty:
1550             sharedata = ""
1551         else:
1552             sharedata = self.segment * 6
1553         self.sharedata = sharedata
1554         blocksize = len(sharedata) / 3
1555         block = sharedata[:blocksize]
1556         self.blockdata = block
1557         prefix = struct.pack(">BQ32s16s BBQQ",
1558                              0, # version,
1559                              0,
1560                              self.root_hash,
1561                              self.salt,
1562                              3,
1563                              10,
1564                              len(sharedata),
1565                              len(sharedata),
1566                             )
1567         post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
1568         signature_offset = post_offset + len(self.verification_key)
1569         sharehashes_offset = signature_offset + len(self.signature)
1570         blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
1571         sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
1572         encprivkey_offset = sharedata_offset + len(block)
1573         eof_offset = encprivkey_offset + len(self.encprivkey)
1574         offsets = struct.pack(">LLLLQQ",
1575                               signature_offset,
1576                               sharehashes_offset,
1577                               blockhashes_offset,
1578                               sharedata_offset,
1579                               encprivkey_offset,
1580                               eof_offset)
1581         final_share = "".join([prefix,
1582                            offsets,
1583                            self.verification_key,
1584                            self.signature,
1585                            self.share_hash_chain_s,
1586                            self.block_hash_tree_s,
1587                            block,
1588                            self.encprivkey])
1589         self.offsets = {}
1590         self.offsets['signature'] = signature_offset
1591         self.offsets['share_hash_chain'] = sharehashes_offset
1592         self.offsets['block_hash_tree'] = blockhashes_offset
1593         self.offsets['share_data'] = sharedata_offset
1594         self.offsets['enc_privkey'] = encprivkey_offset
1595         self.offsets['EOF'] = eof_offset
1596         return final_share
1597
1598
1599     def write_sdmf_share_to_server(self,
1600                                    storage_index,
1601                                    empty=False):
1602         # Some tests need SDMF shares to verify that we can still
1603         # read them. This method writes one, which resembles but is not
1604         assert self.rref
1605         write = self.ss.remote_slot_testv_and_readv_and_writev
1606         share = self.build_test_sdmf_share(empty)
1607         testvs = [(0, 1, "eq", "")]
1608         tws = {}
1609         tws[0] = (testvs, [(0, share)], None)
1610         readv = []
1611         results = write(storage_index, self.secrets, tws, readv)
1612         self.failUnless(results[0])
1613
1614
1615     def test_read(self):
1616         self.write_test_share_to_server("si1")
1617         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1618         # Check that every method equals what we expect it to.
1619         d = defer.succeed(None)
1620         def _check_block_and_salt((block, salt)):
1621             self.failUnlessEqual(block, self.block)
1622             self.failUnlessEqual(salt, self.salt)
1623
1624         for i in xrange(6):
1625             d.addCallback(lambda ignored, i=i:
1626                 mr.get_block_and_salt(i))
1627             d.addCallback(_check_block_and_salt)
1628
1629         d.addCallback(lambda ignored:
1630             mr.get_encprivkey())
1631         d.addCallback(lambda encprivkey:
1632             self.failUnlessEqual(self.encprivkey, encprivkey))
1633
1634         d.addCallback(lambda ignored:
1635             mr.get_blockhashes())
1636         d.addCallback(lambda blockhashes:
1637             self.failUnlessEqual(self.block_hash_tree, blockhashes))
1638
1639         d.addCallback(lambda ignored:
1640             mr.get_sharehashes())
1641         d.addCallback(lambda sharehashes:
1642             self.failUnlessEqual(self.share_hash_chain, sharehashes))
1643
1644         d.addCallback(lambda ignored:
1645             mr.get_signature())
1646         d.addCallback(lambda signature:
1647             self.failUnlessEqual(signature, self.signature))
1648
1649         d.addCallback(lambda ignored:
1650             mr.get_verification_key())
1651         d.addCallback(lambda verification_key:
1652             self.failUnlessEqual(verification_key, self.verification_key))
1653
1654         d.addCallback(lambda ignored:
1655             mr.get_seqnum())
1656         d.addCallback(lambda seqnum:
1657             self.failUnlessEqual(seqnum, 0))
1658
1659         d.addCallback(lambda ignored:
1660             mr.get_root_hash())
1661         d.addCallback(lambda root_hash:
1662             self.failUnlessEqual(self.root_hash, root_hash))
1663
1664         d.addCallback(lambda ignored:
1665             mr.get_seqnum())
1666         d.addCallback(lambda seqnum:
1667             self.failUnlessEqual(0, seqnum))
1668
1669         d.addCallback(lambda ignored:
1670             mr.get_encoding_parameters())
1671         def _check_encoding_parameters((k, n, segsize, datalen)):
1672             self.failUnlessEqual(k, 3)
1673             self.failUnlessEqual(n, 10)
1674             self.failUnlessEqual(segsize, 6)
1675             self.failUnlessEqual(datalen, 36)
1676         d.addCallback(_check_encoding_parameters)
1677
1678         d.addCallback(lambda ignored:
1679             mr.get_checkstring())
1680         d.addCallback(lambda checkstring:
1681             self.failUnlessEqual(checkstring, checkstring))
1682         return d
1683
1684
1685     def test_read_with_different_tail_segment_size(self):
1686         self.write_test_share_to_server("si1", tail_segment=True)
1687         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1688         d = mr.get_block_and_salt(5)
1689         def _check_tail_segment(results):
1690             block, salt = results
1691             self.failUnlessEqual(len(block), 1)
1692             self.failUnlessEqual(block, "a")
1693         d.addCallback(_check_tail_segment)
1694         return d
1695
1696
1697     def test_get_block_with_invalid_segnum(self):
1698         self.write_test_share_to_server("si1")
1699         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1700         d = defer.succeed(None)
1701         d.addCallback(lambda ignored:
1702             self.shouldFail(LayoutInvalid, "test invalid segnum",
1703                             None,
1704                             mr.get_block_and_salt, 7))
1705         return d
1706
1707
1708     def test_get_encoding_parameters_first(self):
1709         self.write_test_share_to_server("si1")
1710         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1711         d = mr.get_encoding_parameters()
1712         def _check_encoding_parameters((k, n, segment_size, datalen)):
1713             self.failUnlessEqual(k, 3)
1714             self.failUnlessEqual(n, 10)
1715             self.failUnlessEqual(segment_size, 6)
1716             self.failUnlessEqual(datalen, 36)
1717         d.addCallback(_check_encoding_parameters)
1718         return d
1719
1720
1721     def test_get_seqnum_first(self):
1722         self.write_test_share_to_server("si1")
1723         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1724         d = mr.get_seqnum()
1725         d.addCallback(lambda seqnum:
1726             self.failUnlessEqual(seqnum, 0))
1727         return d
1728
1729
1730     def test_get_root_hash_first(self):
1731         self.write_test_share_to_server("si1")
1732         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1733         d = mr.get_root_hash()
1734         d.addCallback(lambda root_hash:
1735             self.failUnlessEqual(root_hash, self.root_hash))
1736         return d
1737
1738
1739     def test_get_checkstring_first(self):
1740         self.write_test_share_to_server("si1")
1741         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1742         d = mr.get_checkstring()
1743         d.addCallback(lambda checkstring:
1744             self.failUnlessEqual(checkstring, self.checkstring))
1745         return d
1746
1747
1748     def test_write_read_vectors(self):
1749         # When writing for us, the storage server will return to us a
1750         # read vector, along with its result. If a write fails because
1751         # the test vectors failed, this read vector can help us to
1752         # diagnose the problem. This test ensures that the read vector
1753         # is working appropriately.
1754         mw = self._make_new_mw("si1", 0)
1755
1756         for i in xrange(6):
1757             mw.put_block(self.block, i, self.salt)
1758         mw.put_encprivkey(self.encprivkey)
1759         mw.put_blockhashes(self.block_hash_tree)
1760         mw.put_sharehashes(self.share_hash_chain)
1761         mw.put_root_hash(self.root_hash)
1762         mw.put_signature(self.signature)
1763         mw.put_verification_key(self.verification_key)
1764         d = mw.finish_publishing()
1765         def _then(results):
1766             self.failUnless(len(results), 2)
1767             result, readv = results
1768             self.failUnless(result)
1769             self.failIf(readv)
1770             self.old_checkstring = mw.get_checkstring()
1771             mw.set_checkstring("")
1772         d.addCallback(_then)
1773         d.addCallback(lambda ignored:
1774             mw.finish_publishing())
1775         def _then_again(results):
1776             self.failUnlessEqual(len(results), 2)
1777             result, readvs = results
1778             self.failIf(result)
1779             self.failUnlessIn(0, readvs)
1780             readv = readvs[0][0]
1781             self.failUnlessEqual(readv, self.old_checkstring)
1782         d.addCallback(_then_again)
1783         # The checkstring remains the same for the rest of the process.
1784         return d
1785
1786
1787     def test_private_key_after_share_hash_chain(self):
1788         mw = self._make_new_mw("si1", 0)
1789         d = defer.succeed(None)
1790         for i in xrange(6):
1791             d.addCallback(lambda ignored, i=i:
1792                 mw.put_block(self.block, i, self.salt))
1793         d.addCallback(lambda ignored:
1794             mw.put_encprivkey(self.encprivkey))
1795         d.addCallback(lambda ignored:
1796             mw.put_sharehashes(self.share_hash_chain))
1797
1798         # Now try to put the private key again.
1799         d.addCallback(lambda ignored:
1800             self.shouldFail(LayoutInvalid, "test repeat private key",
1801                             None,
1802                             mw.put_encprivkey, self.encprivkey))
1803         return d
1804
1805
1806     def test_signature_after_verification_key(self):
1807         mw = self._make_new_mw("si1", 0)
1808         d = defer.succeed(None)
1809         # Put everything up to and including the verification key.
1810         for i in xrange(6):
1811             d.addCallback(lambda ignored, i=i:
1812                 mw.put_block(self.block, i, self.salt))
1813         d.addCallback(lambda ignored:
1814             mw.put_encprivkey(self.encprivkey))
1815         d.addCallback(lambda ignored:
1816             mw.put_blockhashes(self.block_hash_tree))
1817         d.addCallback(lambda ignored:
1818             mw.put_sharehashes(self.share_hash_chain))
1819         d.addCallback(lambda ignored:
1820             mw.put_root_hash(self.root_hash))
1821         d.addCallback(lambda ignored:
1822             mw.put_signature(self.signature))
1823         d.addCallback(lambda ignored:
1824             mw.put_verification_key(self.verification_key))
1825         # Now try to put the signature again. This should fail
1826         d.addCallback(lambda ignored:
1827             self.shouldFail(LayoutInvalid, "signature after verification",
1828                             None,
1829                             mw.put_signature, self.signature))
1830         return d
1831
1832
1833     def test_uncoordinated_write(self):
1834         # Make two mutable writers, both pointing to the same storage
1835         # server, both at the same storage index, and try writing to the
1836         # same share.
1837         mw1 = self._make_new_mw("si1", 0)
1838         mw2 = self._make_new_mw("si1", 0)
1839
1840         def _check_success(results):
1841             result, readvs = results
1842             self.failUnless(result)
1843
1844         def _check_failure(results):
1845             result, readvs = results
1846             self.failIf(result)
1847
1848         def _write_share(mw):
1849             for i in xrange(6):
1850                 mw.put_block(self.block, i, self.salt)
1851             mw.put_encprivkey(self.encprivkey)
1852             mw.put_blockhashes(self.block_hash_tree)
1853             mw.put_sharehashes(self.share_hash_chain)
1854             mw.put_root_hash(self.root_hash)
1855             mw.put_signature(self.signature)
1856             mw.put_verification_key(self.verification_key)
1857             return mw.finish_publishing()
1858         d = _write_share(mw1)
1859         d.addCallback(_check_success)
1860         d.addCallback(lambda ignored:
1861             _write_share(mw2))
1862         d.addCallback(_check_failure)
1863         return d
1864
1865
1866     def test_invalid_salt_size(self):
1867         # Salts need to be 16 bytes in size. Writes that attempt to
1868         # write more or less than this should be rejected.
1869         mw = self._make_new_mw("si1", 0)
1870         invalid_salt = "a" * 17 # 17 bytes
1871         another_invalid_salt = "b" * 15 # 15 bytes
1872         d = defer.succeed(None)
1873         d.addCallback(lambda ignored:
1874             self.shouldFail(LayoutInvalid, "salt too big",
1875                             None,
1876                             mw.put_block, self.block, 0, invalid_salt))
1877         d.addCallback(lambda ignored:
1878             self.shouldFail(LayoutInvalid, "salt too small",
1879                             None,
1880                             mw.put_block, self.block, 0,
1881                             another_invalid_salt))
1882         return d
1883
1884
1885     def test_write_test_vectors(self):
1886         # If we give the write proxy a bogus test vector at
1887         # any point during the process, it should fail to write when we
1888         # tell it to write.
1889         def _check_failure(results):
1890             self.failUnlessEqual(len(results), 2)
1891             res, d = results
1892             self.failIf(res)
1893
1894         def _check_success(results):
1895             self.failUnlessEqual(len(results), 2)
1896             res, d = results
1897             self.failUnless(results)
1898
1899         mw = self._make_new_mw("si1", 0)
1900         mw.set_checkstring("this is a lie")
1901         for i in xrange(6):
1902             mw.put_block(self.block, i, self.salt)
1903         mw.put_encprivkey(self.encprivkey)
1904         mw.put_blockhashes(self.block_hash_tree)
1905         mw.put_sharehashes(self.share_hash_chain)
1906         mw.put_root_hash(self.root_hash)
1907         mw.put_signature(self.signature)
1908         mw.put_verification_key(self.verification_key)
1909         d = mw.finish_publishing()
1910         d.addCallback(_check_failure)
1911         d.addCallback(lambda ignored:
1912             mw.set_checkstring(""))
1913         d.addCallback(lambda ignored:
1914             mw.finish_publishing())
1915         d.addCallback(_check_success)
1916         return d
1917
1918
1919     def serialize_blockhashes(self, blockhashes):
1920         return "".join(blockhashes)
1921
1922
1923     def serialize_sharehashes(self, sharehashes):
1924         ret = "".join([struct.pack(">H32s", i, sharehashes[i])
1925                         for i in sorted(sharehashes.keys())])
1926         return ret
1927
1928
1929     def test_write(self):
1930         # This translates to a file with 6 6-byte segments, and with 2-byte
1931         # blocks.
1932         mw = self._make_new_mw("si1", 0)
1933         # Test writing some blocks.
1934         read = self.ss.remote_slot_readv
1935         expected_private_key_offset = struct.calcsize(MDMFHEADER)
1936         expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
1937                                     PRIVATE_KEY_SIZE + \
1938                                     SIGNATURE_SIZE + \
1939                                     VERIFICATION_KEY_SIZE + \
1940                                     SHARE_HASH_CHAIN_SIZE
1941         written_block_size = 2 + len(self.salt)
1942         written_block = self.block + self.salt
1943         for i in xrange(6):
1944             mw.put_block(self.block, i, self.salt)
1945
1946         mw.put_encprivkey(self.encprivkey)
1947         mw.put_blockhashes(self.block_hash_tree)
1948         mw.put_sharehashes(self.share_hash_chain)
1949         mw.put_root_hash(self.root_hash)
1950         mw.put_signature(self.signature)
1951         mw.put_verification_key(self.verification_key)
1952         d = mw.finish_publishing()
1953         def _check_publish(results):
1954             self.failUnlessEqual(len(results), 2)
1955             result, ign = results
1956             self.failUnless(result, "publish failed")
1957             for i in xrange(6):
1958                 self.failUnlessEqual(read("si1", [0], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]),
1959                                 {0: [written_block]})
1960
1961             self.failUnlessEqual(len(self.encprivkey), 7)
1962             self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]),
1963                                  {0: [self.encprivkey]})
1964
1965             expected_block_hash_offset = expected_sharedata_offset + \
1966                         (6 * written_block_size)
1967             self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6)
1968             self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]),
1969                                  {0: [self.block_hash_tree_s]})
1970
1971             expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey)
1972             self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
1973                                  {0: [self.share_hash_chain_s]})
1974
1975             self.failUnlessEqual(read("si1", [0], [(9, 32)]),
1976                                  {0: [self.root_hash]})
1977             expected_signature_offset = expected_share_hash_offset + \
1978                 len(self.share_hash_chain_s)
1979             self.failUnlessEqual(len(self.signature), 9)
1980             self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]),
1981                                  {0: [self.signature]})
1982
1983             expected_verification_key_offset = expected_signature_offset + len(self.signature)
1984             self.failUnlessEqual(len(self.verification_key), 6)
1985             self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]),
1986                                  {0: [self.verification_key]})
1987
1988             signable = mw.get_signable()
1989             verno, seq, roothash, k, n, segsize, datalen = \
1990                                             struct.unpack(">BQ32sBBQQ",
1991                                                           signable)
1992             self.failUnlessEqual(verno, 1)
1993             self.failUnlessEqual(seq, 0)
1994             self.failUnlessEqual(roothash, self.root_hash)
1995             self.failUnlessEqual(k, 3)
1996             self.failUnlessEqual(n, 10)
1997             self.failUnlessEqual(segsize, 6)
1998             self.failUnlessEqual(datalen, 36)
1999             expected_eof_offset = expected_block_hash_offset + \
2000                 len(self.block_hash_tree_s)
2001
2002             # Check the version number to make sure that it is correct.
2003             expected_version_number = struct.pack(">B", 1)
2004             self.failUnlessEqual(read("si1", [0], [(0, 1)]),
2005                                  {0: [expected_version_number]})
2006             # Check the sequence number to make sure that it is correct
2007             expected_sequence_number = struct.pack(">Q", 0)
2008             self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2009                                  {0: [expected_sequence_number]})
2010             # Check that the encoding parameters (k, N, segement size, data
2011             # length) are what they should be. These are  3, 10, 6, 36
2012             expected_k = struct.pack(">B", 3)
2013             self.failUnlessEqual(read("si1", [0], [(41, 1)]),
2014                                  {0: [expected_k]})
2015             expected_n = struct.pack(">B", 10)
2016             self.failUnlessEqual(read("si1", [0], [(42, 1)]),
2017                                  {0: [expected_n]})
2018             expected_segment_size = struct.pack(">Q", 6)
2019             self.failUnlessEqual(read("si1", [0], [(43, 8)]),
2020                                  {0: [expected_segment_size]})
2021             expected_data_length = struct.pack(">Q", 36)
2022             self.failUnlessEqual(read("si1", [0], [(51, 8)]),
2023                                  {0: [expected_data_length]})
2024             expected_offset = struct.pack(">Q", expected_private_key_offset)
2025             self.failUnlessEqual(read("si1", [0], [(59, 8)]),
2026                                  {0: [expected_offset]})
2027             expected_offset = struct.pack(">Q", expected_share_hash_offset)
2028             self.failUnlessEqual(read("si1", [0], [(67, 8)]),
2029                                  {0: [expected_offset]})
2030             expected_offset = struct.pack(">Q", expected_signature_offset)
2031             self.failUnlessEqual(read("si1", [0], [(75, 8)]),
2032                                  {0: [expected_offset]})
2033             expected_offset = struct.pack(">Q", expected_verification_key_offset)
2034             self.failUnlessEqual(read("si1", [0], [(83, 8)]),
2035                                  {0: [expected_offset]})
2036             expected_offset = struct.pack(">Q", expected_verification_key_offset + len(self.verification_key))
2037             self.failUnlessEqual(read("si1", [0], [(91, 8)]),
2038                                  {0: [expected_offset]})
2039             expected_offset = struct.pack(">Q", expected_sharedata_offset)
2040             self.failUnlessEqual(read("si1", [0], [(99, 8)]),
2041                                  {0: [expected_offset]})
2042             expected_offset = struct.pack(">Q", expected_block_hash_offset)
2043             self.failUnlessEqual(read("si1", [0], [(107, 8)]),
2044                                  {0: [expected_offset]})
2045             expected_offset = struct.pack(">Q", expected_eof_offset)
2046             self.failUnlessEqual(read("si1", [0], [(115, 8)]),
2047                                  {0: [expected_offset]})
2048         d.addCallback(_check_publish)
2049         return d
2050
2051     def _make_new_mw(self, si, share, datalength=36):
2052         # This is a file of size 36 bytes. Since it has a segment
2053         # size of 6, we know that it has 6 byte segments, which will
2054         # be split into blocks of 2 bytes because our FEC k
2055         # parameter is 3.
2056         mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
2057                                 6, datalength)
2058         return mw
2059
2060
2061     def test_write_rejected_with_too_many_blocks(self):
2062         mw = self._make_new_mw("si0", 0)
2063
2064         # Try writing too many blocks. We should not be able to write
2065         # more than 6
2066         # blocks into each share.
2067         d = defer.succeed(None)
2068         for i in xrange(6):
2069             d.addCallback(lambda ignored, i=i:
2070                 mw.put_block(self.block, i, self.salt))
2071         d.addCallback(lambda ignored:
2072             self.shouldFail(LayoutInvalid, "too many blocks",
2073                             None,
2074                             mw.put_block, self.block, 7, self.salt))
2075         return d
2076
2077
2078     def test_write_rejected_with_invalid_salt(self):
2079         # Try writing an invalid salt. Salts are 16 bytes -- any more or
2080         # less should cause an error.
2081         mw = self._make_new_mw("si1", 0)
2082         bad_salt = "a" * 17 # 17 bytes
2083         d = defer.succeed(None)
2084         d.addCallback(lambda ignored:
2085             self.shouldFail(LayoutInvalid, "test_invalid_salt",
2086                             None, mw.put_block, self.block, 7, bad_salt))
2087         return d
2088
2089
2090     def test_write_rejected_with_invalid_root_hash(self):
2091         # Try writing an invalid root hash. This should be SHA256d, and
2092         # 32 bytes long as a result.
2093         mw = self._make_new_mw("si2", 0)
2094         # 17 bytes != 32 bytes
2095         invalid_root_hash = "a" * 17
2096         d = defer.succeed(None)
2097         # Before this test can work, we need to put some blocks + salts,
2098         # a block hash tree, and a share hash tree. Otherwise, we'll see
2099         # failures that match what we are looking for, but are caused by
2100         # the constraints imposed on operation ordering.
2101         for i in xrange(6):
2102             d.addCallback(lambda ignored, i=i:
2103                 mw.put_block(self.block, i, self.salt))
2104         d.addCallback(lambda ignored:
2105             mw.put_encprivkey(self.encprivkey))
2106         d.addCallback(lambda ignored:
2107             mw.put_blockhashes(self.block_hash_tree))
2108         d.addCallback(lambda ignored:
2109             mw.put_sharehashes(self.share_hash_chain))
2110         d.addCallback(lambda ignored:
2111             self.shouldFail(LayoutInvalid, "invalid root hash",
2112                             None, mw.put_root_hash, invalid_root_hash))
2113         return d
2114
2115
2116     def test_write_rejected_with_invalid_blocksize(self):
2117         # The blocksize implied by the writer that we get from
2118         # _make_new_mw is 2bytes -- any more or any less than this
2119         # should be cause for failure, unless it is the tail segment, in
2120         # which case it may not be failure.
2121         invalid_block = "a"
2122         mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
2123                                              # one byte blocks
2124         # 1 bytes != 2 bytes
2125         d = defer.succeed(None)
2126         d.addCallback(lambda ignored, invalid_block=invalid_block:
2127             self.shouldFail(LayoutInvalid, "test blocksize too small",
2128                             None, mw.put_block, invalid_block, 0,
2129                             self.salt))
2130         invalid_block = invalid_block * 3
2131         # 3 bytes != 2 bytes
2132         d.addCallback(lambda ignored:
2133             self.shouldFail(LayoutInvalid, "test blocksize too large",
2134                             None,
2135                             mw.put_block, invalid_block, 0, self.salt))
2136         for i in xrange(5):
2137             d.addCallback(lambda ignored, i=i:
2138                 mw.put_block(self.block, i, self.salt))
2139         # Try to put an invalid tail segment
2140         d.addCallback(lambda ignored:
2141             self.shouldFail(LayoutInvalid, "test invalid tail segment",
2142                             None,
2143                             mw.put_block, self.block, 5, self.salt))
2144         valid_block = "a"
2145         d.addCallback(lambda ignored:
2146             mw.put_block(valid_block, 5, self.salt))
2147         return d
2148
2149
2150     def test_write_enforces_order_constraints(self):
2151         # We require that the MDMFSlotWriteProxy be interacted with in a
2152         # specific way.
2153         # That way is:
2154         # 0: __init__
2155         # 1: write blocks and salts
2156         # 2: Write the encrypted private key
2157         # 3: Write the block hashes
2158         # 4: Write the share hashes
2159         # 5: Write the root hash and salt hash
2160         # 6: Write the signature and verification key
2161         # 7: Write the file.
2162         #
2163         # Some of these can be performed out-of-order, and some can't.
2164         # The dependencies that I want to test here are:
2165         #  - Private key before block hashes
2166         #  - share hashes and block hashes before root hash
2167         #  - root hash before signature
2168         #  - signature before verification key
2169         mw0 = self._make_new_mw("si0", 0)
2170         # Write some shares
2171         d = defer.succeed(None)
2172         for i in xrange(6):
2173             d.addCallback(lambda ignored, i=i:
2174                 mw0.put_block(self.block, i, self.salt))
2175
2176         # Try to write the share hash chain without writing the
2177         # encrypted private key
2178         d.addCallback(lambda ignored:
2179             self.shouldFail(LayoutInvalid, "share hash chain before "
2180                                            "private key",
2181                             None,
2182                             mw0.put_sharehashes, self.share_hash_chain))
2183         # Write the private key.
2184         d.addCallback(lambda ignored:
2185             mw0.put_encprivkey(self.encprivkey))
2186
2187         # Now write the block hashes and try again
2188         d.addCallback(lambda ignored:
2189             mw0.put_blockhashes(self.block_hash_tree))
2190
2191         # We haven't yet put the root hash on the share, so we shouldn't
2192         # be able to sign it.
2193         d.addCallback(lambda ignored:
2194             self.shouldFail(LayoutInvalid, "signature before root hash",
2195                             None, mw0.put_signature, self.signature))
2196
2197         d.addCallback(lambda ignored:
2198             self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
2199
2200         # ..and, since that fails, we also shouldn't be able to put the
2201         # verification key.
2202         d.addCallback(lambda ignored:
2203             self.shouldFail(LayoutInvalid, "key before signature",
2204                             None, mw0.put_verification_key,
2205                             self.verification_key))
2206
2207         # Now write the share hashes.
2208         d.addCallback(lambda ignored:
2209             mw0.put_sharehashes(self.share_hash_chain))
2210         # We should be able to write the root hash now too
2211         d.addCallback(lambda ignored:
2212             mw0.put_root_hash(self.root_hash))
2213
2214         # We should still be unable to put the verification key
2215         d.addCallback(lambda ignored:
2216             self.shouldFail(LayoutInvalid, "key before signature",
2217                             None, mw0.put_verification_key,
2218                             self.verification_key))
2219
2220         d.addCallback(lambda ignored:
2221             mw0.put_signature(self.signature))
2222
2223         # We shouldn't be able to write the offsets to the remote server
2224         # until the offset table is finished; IOW, until we have written
2225         # the verification key.
2226         d.addCallback(lambda ignored:
2227             self.shouldFail(LayoutInvalid, "offsets before verification key",
2228                             None,
2229                             mw0.finish_publishing))
2230
2231         d.addCallback(lambda ignored:
2232             mw0.put_verification_key(self.verification_key))
2233         return d
2234
2235
2236     def test_end_to_end(self):
2237         mw = self._make_new_mw("si1", 0)
2238         # Write a share using the mutable writer, and make sure that the
2239         # reader knows how to read everything back to us.
2240         d = defer.succeed(None)
2241         for i in xrange(6):
2242             d.addCallback(lambda ignored, i=i:
2243                 mw.put_block(self.block, i, self.salt))
2244         d.addCallback(lambda ignored:
2245             mw.put_encprivkey(self.encprivkey))
2246         d.addCallback(lambda ignored:
2247             mw.put_blockhashes(self.block_hash_tree))
2248         d.addCallback(lambda ignored:
2249             mw.put_sharehashes(self.share_hash_chain))
2250         d.addCallback(lambda ignored:
2251             mw.put_root_hash(self.root_hash))
2252         d.addCallback(lambda ignored:
2253             mw.put_signature(self.signature))
2254         d.addCallback(lambda ignored:
2255             mw.put_verification_key(self.verification_key))
2256         d.addCallback(lambda ignored:
2257             mw.finish_publishing())
2258
2259         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2260         def _check_block_and_salt((block, salt)):
2261             self.failUnlessEqual(block, self.block)
2262             self.failUnlessEqual(salt, self.salt)
2263
2264         for i in xrange(6):
2265             d.addCallback(lambda ignored, i=i:
2266                 mr.get_block_and_salt(i))
2267             d.addCallback(_check_block_and_salt)
2268
2269         d.addCallback(lambda ignored:
2270             mr.get_encprivkey())
2271         d.addCallback(lambda encprivkey:
2272             self.failUnlessEqual(self.encprivkey, encprivkey))
2273
2274         d.addCallback(lambda ignored:
2275             mr.get_blockhashes())
2276         d.addCallback(lambda blockhashes:
2277             self.failUnlessEqual(self.block_hash_tree, blockhashes))
2278
2279         d.addCallback(lambda ignored:
2280             mr.get_sharehashes())
2281         d.addCallback(lambda sharehashes:
2282             self.failUnlessEqual(self.share_hash_chain, sharehashes))
2283
2284         d.addCallback(lambda ignored:
2285             mr.get_signature())
2286         d.addCallback(lambda signature:
2287             self.failUnlessEqual(signature, self.signature))
2288
2289         d.addCallback(lambda ignored:
2290             mr.get_verification_key())
2291         d.addCallback(lambda verification_key:
2292             self.failUnlessEqual(verification_key, self.verification_key))
2293
2294         d.addCallback(lambda ignored:
2295             mr.get_seqnum())
2296         d.addCallback(lambda seqnum:
2297             self.failUnlessEqual(seqnum, 0))
2298
2299         d.addCallback(lambda ignored:
2300             mr.get_root_hash())
2301         d.addCallback(lambda root_hash:
2302             self.failUnlessEqual(self.root_hash, root_hash))
2303
2304         d.addCallback(lambda ignored:
2305             mr.get_encoding_parameters())
2306         def _check_encoding_parameters((k, n, segsize, datalen)):
2307             self.failUnlessEqual(k, 3)
2308             self.failUnlessEqual(n, 10)
2309             self.failUnlessEqual(segsize, 6)
2310             self.failUnlessEqual(datalen, 36)
2311         d.addCallback(_check_encoding_parameters)
2312
2313         d.addCallback(lambda ignored:
2314             mr.get_checkstring())
2315         d.addCallback(lambda checkstring:
2316             self.failUnlessEqual(checkstring, mw.get_checkstring()))
2317         return d
2318
2319
2320     def test_is_sdmf(self):
2321         # The MDMFSlotReadProxy should also know how to read SDMF files,
2322         # since it will encounter them on the grid. Callers use the
2323         # is_sdmf method to test this.
2324         self.write_sdmf_share_to_server("si1")
2325         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2326         d = mr.is_sdmf()
2327         d.addCallback(lambda issdmf:
2328             self.failUnless(issdmf))
2329         return d
2330
2331
2332     def test_reads_sdmf(self):
2333         # The slot read proxy should, naturally, know how to tell us
2334         # about data in the SDMF format
2335         self.write_sdmf_share_to_server("si1")
2336         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2337         d = defer.succeed(None)
2338         d.addCallback(lambda ignored:
2339             mr.is_sdmf())
2340         d.addCallback(lambda issdmf:
2341             self.failUnless(issdmf))
2342
2343         # What do we need to read?
2344         #  - The sharedata
2345         #  - The salt
2346         d.addCallback(lambda ignored:
2347             mr.get_block_and_salt(0))
2348         def _check_block_and_salt(results):
2349             block, salt = results
2350             # Our original file is 36 bytes long. Then each share is 12
2351             # bytes in size. The share is composed entirely of the
2352             # letter a. self.block contains 2 as, so 6 * self.block is
2353             # what we are looking for.
2354             self.failUnlessEqual(block, self.block * 6)
2355             self.failUnlessEqual(salt, self.salt)
2356         d.addCallback(_check_block_and_salt)
2357
2358         #  - The blockhashes
2359         d.addCallback(lambda ignored:
2360             mr.get_blockhashes())
2361         d.addCallback(lambda blockhashes:
2362             self.failUnlessEqual(self.block_hash_tree,
2363                                  blockhashes,
2364                                  blockhashes))
2365         #  - The sharehashes
2366         d.addCallback(lambda ignored:
2367             mr.get_sharehashes())
2368         d.addCallback(lambda sharehashes:
2369             self.failUnlessEqual(self.share_hash_chain,
2370                                  sharehashes))
2371         #  - The keys
2372         d.addCallback(lambda ignored:
2373             mr.get_encprivkey())
2374         d.addCallback(lambda encprivkey:
2375             self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
2376         d.addCallback(lambda ignored:
2377             mr.get_verification_key())
2378         d.addCallback(lambda verification_key:
2379             self.failUnlessEqual(verification_key,
2380                                  self.verification_key,
2381                                  verification_key))
2382         #  - The signature
2383         d.addCallback(lambda ignored:
2384             mr.get_signature())
2385         d.addCallback(lambda signature:
2386             self.failUnlessEqual(signature, self.signature, signature))
2387
2388         #  - The sequence number
2389         d.addCallback(lambda ignored:
2390             mr.get_seqnum())
2391         d.addCallback(lambda seqnum:
2392             self.failUnlessEqual(seqnum, 0, seqnum))
2393
2394         #  - The root hash
2395         d.addCallback(lambda ignored:
2396             mr.get_root_hash())
2397         d.addCallback(lambda root_hash:
2398             self.failUnlessEqual(root_hash, self.root_hash, root_hash))
2399         return d
2400
2401
2402     def test_only_reads_one_segment_sdmf(self):
2403         # SDMF shares have only one segment, so it doesn't make sense to
2404         # read more segments than that. The reader should know this and
2405         # complain if we try to do that.
2406         self.write_sdmf_share_to_server("si1")
2407         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2408         d = defer.succeed(None)
2409         d.addCallback(lambda ignored:
2410             mr.is_sdmf())
2411         d.addCallback(lambda issdmf:
2412             self.failUnless(issdmf))
2413         d.addCallback(lambda ignored:
2414             self.shouldFail(LayoutInvalid, "test bad segment",
2415                             None,
2416                             mr.get_block_and_salt, 1))
2417         return d
2418
2419
2420     def test_read_with_prefetched_mdmf_data(self):
2421         # The MDMFSlotReadProxy will prefill certain fields if you pass
2422         # it data that you have already fetched. This is useful for
2423         # cases like the Servermap, which prefetches ~2kb of data while
2424         # finding out which shares are on the remote peer so that it
2425         # doesn't waste round trips.
2426         mdmf_data = self.build_test_mdmf_share()
2427         self.write_test_share_to_server("si1")
2428         def _make_mr(ignored, length):
2429             mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
2430             return mr
2431
2432         d = defer.succeed(None)
2433         # This should be enough to fill in both the encoding parameters
2434         # and the table of offsets, which will complete the version
2435         # information tuple.
2436         d.addCallback(_make_mr, 123)
2437         d.addCallback(lambda mr:
2438             mr.get_verinfo())
2439         def _check_verinfo(verinfo):
2440             self.failUnless(verinfo)
2441             self.failUnlessEqual(len(verinfo), 9)
2442             (seqnum,
2443              root_hash,
2444              salt_hash,
2445              segsize,
2446              datalen,
2447              k,
2448              n,
2449              prefix,
2450              offsets) = verinfo
2451             self.failUnlessEqual(seqnum, 0)
2452             self.failUnlessEqual(root_hash, self.root_hash)
2453             self.failUnlessEqual(segsize, 6)
2454             self.failUnlessEqual(datalen, 36)
2455             self.failUnlessEqual(k, 3)
2456             self.failUnlessEqual(n, 10)
2457             expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
2458                                           1,
2459                                           seqnum,
2460                                           root_hash,
2461                                           k,
2462                                           n,
2463                                           segsize,
2464                                           datalen)
2465             self.failUnlessEqual(expected_prefix, prefix)
2466             self.failUnlessEqual(self.rref.read_count, 0)
2467         d.addCallback(_check_verinfo)
2468         # This is not enough data to read a block and a share, so the
2469         # wrapper should attempt to read this from the remote server.
2470         d.addCallback(_make_mr, 123)
2471         d.addCallback(lambda mr:
2472             mr.get_block_and_salt(0))
2473         def _check_block_and_salt((block, salt)):
2474             self.failUnlessEqual(block, self.block)
2475             self.failUnlessEqual(salt, self.salt)
2476             self.failUnlessEqual(self.rref.read_count, 1)
2477         # This should be enough data to read one block.
2478         d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140)
2479         d.addCallback(lambda mr:
2480             mr.get_block_and_salt(0))
2481         d.addCallback(_check_block_and_salt)
2482         return d
2483
2484
2485     def test_read_with_prefetched_sdmf_data(self):
2486         sdmf_data = self.build_test_sdmf_share()
2487         self.write_sdmf_share_to_server("si1")
2488         def _make_mr(ignored, length):
2489             mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
2490             return mr
2491
2492         d = defer.succeed(None)
2493         # This should be enough to get us the encoding parameters,
2494         # offset table, and everything else we need to build a verinfo
2495         # string.
2496         d.addCallback(_make_mr, 123)
2497         d.addCallback(lambda mr:
2498             mr.get_verinfo())
2499         def _check_verinfo(verinfo):
2500             self.failUnless(verinfo)
2501             self.failUnlessEqual(len(verinfo), 9)
2502             (seqnum,
2503              root_hash,
2504              salt,
2505              segsize,
2506              datalen,
2507              k,
2508              n,
2509              prefix,
2510              offsets) = verinfo
2511             self.failUnlessEqual(seqnum, 0)
2512             self.failUnlessEqual(root_hash, self.root_hash)
2513             self.failUnlessEqual(salt, self.salt)
2514             self.failUnlessEqual(segsize, 36)
2515             self.failUnlessEqual(datalen, 36)
2516             self.failUnlessEqual(k, 3)
2517             self.failUnlessEqual(n, 10)
2518             expected_prefix = struct.pack(SIGNED_PREFIX,
2519                                           0,
2520                                           seqnum,
2521                                           root_hash,
2522                                           salt,
2523                                           k,
2524                                           n,
2525                                           segsize,
2526                                           datalen)
2527             self.failUnlessEqual(expected_prefix, prefix)
2528             self.failUnlessEqual(self.rref.read_count, 0)
2529         d.addCallback(_check_verinfo)
2530         # This shouldn't be enough to read any share data.
2531         d.addCallback(_make_mr, 123)
2532         d.addCallback(lambda mr:
2533             mr.get_block_and_salt(0))
2534         def _check_block_and_salt((block, salt)):
2535             self.failUnlessEqual(block, self.block * 6)
2536             self.failUnlessEqual(salt, self.salt)
2537             # TODO: Fix the read routine so that it reads only the data
2538             #       that it has cached if it can't read all of it.
2539             self.failUnlessEqual(self.rref.read_count, 2)
2540
2541         # This should be enough to read share data.
2542         d.addCallback(_make_mr, self.offsets['share_data'])
2543         d.addCallback(lambda mr:
2544             mr.get_block_and_salt(0))
2545         d.addCallback(_check_block_and_salt)
2546         return d
2547
2548
2549     def test_read_with_empty_mdmf_file(self):
2550         # Some tests upload a file with no contents to test things
2551         # unrelated to the actual handling of the content of the file.
2552         # The reader should behave intelligently in these cases.
2553         self.write_test_share_to_server("si1", empty=True)
2554         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2555         # We should be able to get the encoding parameters, and they
2556         # should be correct.
2557         d = defer.succeed(None)
2558         d.addCallback(lambda ignored:
2559             mr.get_encoding_parameters())
2560         def _check_encoding_parameters(params):
2561             self.failUnlessEqual(len(params), 4)
2562             k, n, segsize, datalen = params
2563             self.failUnlessEqual(k, 3)
2564             self.failUnlessEqual(n, 10)
2565             self.failUnlessEqual(segsize, 0)
2566             self.failUnlessEqual(datalen, 0)
2567         d.addCallback(_check_encoding_parameters)
2568
2569         # We should not be able to fetch a block, since there are no
2570         # blocks to fetch
2571         d.addCallback(lambda ignored:
2572             self.shouldFail(LayoutInvalid, "get block on empty file",
2573                             None,
2574                             mr.get_block_and_salt, 0))
2575         return d
2576
2577
2578     def test_read_with_empty_sdmf_file(self):
2579         self.write_sdmf_share_to_server("si1", empty=True)
2580         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2581         # We should be able to get the encoding parameters, and they
2582         # should be correct
2583         d = defer.succeed(None)
2584         d.addCallback(lambda ignored:
2585             mr.get_encoding_parameters())
2586         def _check_encoding_parameters(params):
2587             self.failUnlessEqual(len(params), 4)
2588             k, n, segsize, datalen = params
2589             self.failUnlessEqual(k, 3)
2590             self.failUnlessEqual(n, 10)
2591             self.failUnlessEqual(segsize, 0)
2592             self.failUnlessEqual(datalen, 0)
2593         d.addCallback(_check_encoding_parameters)
2594
2595         # It does not make sense to get a block in this format, so we
2596         # should not be able to.
2597         d.addCallback(lambda ignored:
2598             self.shouldFail(LayoutInvalid, "get block on an empty file",
2599                             None,
2600                             mr.get_block_and_salt, 0))
2601         return d
2602
2603
2604     def test_verinfo_with_sdmf_file(self):
2605         self.write_sdmf_share_to_server("si1")
2606         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2607         # We should be able to get the version information.
2608         d = defer.succeed(None)
2609         d.addCallback(lambda ignored:
2610             mr.get_verinfo())
2611         def _check_verinfo(verinfo):
2612             self.failUnless(verinfo)
2613             self.failUnlessEqual(len(verinfo), 9)
2614             (seqnum,
2615              root_hash,
2616              salt,
2617              segsize,
2618              datalen,
2619              k,
2620              n,
2621              prefix,
2622              offsets) = verinfo
2623             self.failUnlessEqual(seqnum, 0)
2624             self.failUnlessEqual(root_hash, self.root_hash)
2625             self.failUnlessEqual(salt, self.salt)
2626             self.failUnlessEqual(segsize, 36)
2627             self.failUnlessEqual(datalen, 36)
2628             self.failUnlessEqual(k, 3)
2629             self.failUnlessEqual(n, 10)
2630             expected_prefix = struct.pack(">BQ32s16s BBQQ",
2631                                           0,
2632                                           seqnum,
2633                                           root_hash,
2634                                           salt,
2635                                           k,
2636                                           n,
2637                                           segsize,
2638                                           datalen)
2639             self.failUnlessEqual(prefix, expected_prefix)
2640             self.failUnlessEqual(offsets, self.offsets)
2641         d.addCallback(_check_verinfo)
2642         return d
2643
2644
2645     def test_verinfo_with_mdmf_file(self):
2646         self.write_test_share_to_server("si1")
2647         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2648         d = defer.succeed(None)
2649         d.addCallback(lambda ignored:
2650             mr.get_verinfo())
2651         def _check_verinfo(verinfo):
2652             self.failUnless(verinfo)
2653             self.failUnlessEqual(len(verinfo), 9)
2654             (seqnum,
2655              root_hash,
2656              IV,
2657              segsize,
2658              datalen,
2659              k,
2660              n,
2661              prefix,
2662              offsets) = verinfo
2663             self.failUnlessEqual(seqnum, 0)
2664             self.failUnlessEqual(root_hash, self.root_hash)
2665             self.failIf(IV)
2666             self.failUnlessEqual(segsize, 6)
2667             self.failUnlessEqual(datalen, 36)
2668             self.failUnlessEqual(k, 3)
2669             self.failUnlessEqual(n, 10)
2670             expected_prefix = struct.pack(">BQ32s BBQQ",
2671                                           1,
2672                                           seqnum,
2673                                           root_hash,
2674                                           k,
2675                                           n,
2676                                           segsize,
2677                                           datalen)
2678             self.failUnlessEqual(prefix, expected_prefix)
2679             self.failUnlessEqual(offsets, self.offsets)
2680         d.addCallback(_check_verinfo)
2681         return d
2682
2683
2684     def test_sdmf_writer(self):
2685         # Go through the motions of writing an SDMF share to the storage
2686         # server. Then read the storage server to see that the share got
2687         # written in the way that we think it should have.
2688
2689         # We do this first so that the necessary instance variables get
2690         # set the way we want them for the tests below.
2691         data = self.build_test_sdmf_share()
2692         sdmfr = SDMFSlotWriteProxy(0,
2693                                    self.rref,
2694                                    "si1",
2695                                    self.secrets,
2696                                    0, 3, 10, 36, 36)
2697         # Put the block and salt.
2698         sdmfr.put_block(self.blockdata, 0, self.salt)
2699
2700         # Put the encprivkey
2701         sdmfr.put_encprivkey(self.encprivkey)
2702
2703         # Put the block and share hash chains
2704         sdmfr.put_blockhashes(self.block_hash_tree)
2705         sdmfr.put_sharehashes(self.share_hash_chain)
2706         sdmfr.put_root_hash(self.root_hash)
2707
2708         # Put the signature
2709         sdmfr.put_signature(self.signature)
2710
2711         # Put the verification key
2712         sdmfr.put_verification_key(self.verification_key)
2713
2714         # Now check to make sure that nothing has been written yet.
2715         self.failUnlessEqual(self.rref.write_count, 0)
2716
2717         # Now finish publishing
2718         d = sdmfr.finish_publishing()
2719         def _then(ignored):
2720             self.failUnlessEqual(self.rref.write_count, 1)
2721             read = self.ss.remote_slot_readv
2722             self.failUnlessEqual(read("si1", [0], [(0, len(data))]),
2723                                  {0: [data]})
2724         d.addCallback(_then)
2725         return d
2726
2727
2728     def test_sdmf_writer_preexisting_share(self):
2729         data = self.build_test_sdmf_share()
2730         self.write_sdmf_share_to_server("si1")
2731
2732         # Now there is a share on the storage server. To successfully
2733         # write, we need to set the checkstring correctly. When we
2734         # don't, no write should occur.
2735         sdmfw = SDMFSlotWriteProxy(0,
2736                                    self.rref,
2737                                    "si1",
2738                                    self.secrets,
2739                                    1, 3, 10, 36, 36)
2740         sdmfw.put_block(self.blockdata, 0, self.salt)
2741
2742         # Put the encprivkey
2743         sdmfw.put_encprivkey(self.encprivkey)
2744
2745         # Put the block and share hash chains
2746         sdmfw.put_blockhashes(self.block_hash_tree)
2747         sdmfw.put_sharehashes(self.share_hash_chain)
2748
2749         # Put the root hash
2750         sdmfw.put_root_hash(self.root_hash)
2751
2752         # Put the signature
2753         sdmfw.put_signature(self.signature)
2754
2755         # Put the verification key
2756         sdmfw.put_verification_key(self.verification_key)
2757
2758         # We shouldn't have a checkstring yet
2759         self.failUnlessEqual(sdmfw.get_checkstring(), "")
2760
2761         d = sdmfw.finish_publishing()
2762         def _then(results):
2763             self.failIf(results[0])
2764             # this is the correct checkstring
2765             self._expected_checkstring = results[1][0][0]
2766             return self._expected_checkstring
2767
2768         d.addCallback(_then)
2769         d.addCallback(sdmfw.set_checkstring)
2770         d.addCallback(lambda ignored:
2771             sdmfw.get_checkstring())
2772         d.addCallback(lambda checkstring:
2773             self.failUnlessEqual(checkstring, self._expected_checkstring))
2774         d.addCallback(lambda ignored:
2775             sdmfw.finish_publishing())
2776         def _then_again(results):
2777             self.failUnless(results[0])
2778             read = self.ss.remote_slot_readv
2779             self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2780                                  {0: [struct.pack(">Q", 1)]})
2781             self.failUnlessEqual(read("si1", [0], [(9, len(data) - 9)]),
2782                                  {0: [data[9:]]})
2783         d.addCallback(_then_again)
2784         return d
2785
2786
2787 class Stats(unittest.TestCase):
2788
2789     def setUp(self):
2790         self.sparent = LoggingServiceParent()
2791         self._lease_secret = itertools.count()
2792     def tearDown(self):
2793         return self.sparent.stopService()
2794
2795     def workdir(self, name):
2796         basedir = os.path.join("storage", "Server", name)
2797         return basedir
2798
2799     def create(self, name):
2800         workdir = self.workdir(name)
2801         ss = StorageServer(workdir, "\x00" * 20)
2802         ss.setServiceParent(self.sparent)
2803         return ss
2804
2805     def test_latencies(self):
2806         ss = self.create("test_latencies")
2807         for i in range(10000):
2808             ss.add_latency("allocate", 1.0 * i)
2809         for i in range(1000):
2810             ss.add_latency("renew", 1.0 * i)
2811         for i in range(20):
2812             ss.add_latency("write", 1.0 * i)
2813         for i in range(10):
2814             ss.add_latency("cancel", 2.0 * i)
2815         ss.add_latency("get", 5.0)
2816
2817         output = ss.get_latencies()
2818
2819         self.failUnlessEqual(sorted(output.keys()),
2820                              sorted(["allocate", "renew", "cancel", "write", "get"]))
2821         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
2822         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
2823         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
2824         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
2825         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
2826         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
2827         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
2828         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
2829         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
2830
2831         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
2832         self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
2833         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1, output)
2834         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
2835         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
2836         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
2837         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
2838         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
2839         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
2840
2841         self.failUnlessEqual(len(ss.latencies["write"]), 20)
2842         self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
2843         self.failUnless(output["write"]["01_0_percentile"] is None, output)
2844         self.failUnless(abs(output["write"]["10_0_percentile"] -  2) < 1, output)
2845         self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
2846         self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
2847         self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
2848         self.failUnless(output["write"]["99_0_percentile"] is None, output)
2849         self.failUnless(output["write"]["99_9_percentile"] is None, output)
2850
2851         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
2852         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
2853         self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
2854         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1, output)
2855         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
2856         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
2857         self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
2858         self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
2859         self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
2860
2861         self.failUnlessEqual(len(ss.latencies["get"]), 1)
2862         self.failUnless(output["get"]["mean"] is None, output)
2863         self.failUnless(output["get"]["01_0_percentile"] is None, output)
2864         self.failUnless(output["get"]["10_0_percentile"] is None, output)
2865         self.failUnless(output["get"]["50_0_percentile"] is None, output)
2866         self.failUnless(output["get"]["90_0_percentile"] is None, output)
2867         self.failUnless(output["get"]["95_0_percentile"] is None, output)
2868         self.failUnless(output["get"]["99_0_percentile"] is None, output)
2869         self.failUnless(output["get"]["99_9_percentile"] is None, output)
2870
2871 def remove_tags(s):
2872     s = re.sub(r'<[^>]*>', ' ', s)
2873     s = re.sub(r'\s+', ' ', s)
2874     return s
2875
2876 class MyBucketCountingCrawler(BucketCountingCrawler):
2877     def finished_prefix(self, cycle, prefix):
2878         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
2879         if self.hook_ds:
2880             d = self.hook_ds.pop(0)
2881             d.callback(None)
2882
2883 class MyStorageServer(StorageServer):
2884     def add_bucket_counter(self):
2885         statefile = os.path.join(self.storedir, "bucket_counter.state")
2886         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
2887         self.bucket_counter.setServiceParent(self)
2888
2889 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
2890
2891     def setUp(self):
2892         self.s = service.MultiService()
2893         self.s.startService()
2894     def tearDown(self):
2895         return self.s.stopService()
2896
2897     def test_bucket_counter(self):
2898         basedir = "storage/BucketCounter/bucket_counter"
2899         fileutil.make_dirs(basedir)
2900         ss = StorageServer(basedir, "\x00" * 20)
2901         # to make sure we capture the bucket-counting-crawler in the middle
2902         # of a cycle, we reach in and reduce its maximum slice time to 0. We
2903         # also make it start sooner than usual.
2904         ss.bucket_counter.slow_start = 0
2905         orig_cpu_slice = ss.bucket_counter.cpu_slice
2906         ss.bucket_counter.cpu_slice = 0
2907         ss.setServiceParent(self.s)
2908
2909         w = StorageStatus(ss)
2910
2911         # this sample is before the crawler has started doing anything
2912         html = w.renderSynchronously()
2913         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2914         s = remove_tags(html)
2915         self.failUnlessIn("Accepting new shares: Yes", s)
2916         self.failUnlessIn("Reserved space: - 0 B (0)", s)
2917         self.failUnlessIn("Total buckets: Not computed yet", s)
2918         self.failUnlessIn("Next crawl in", s)
2919
2920         # give the bucket-counting-crawler one tick to get started. The
2921         # cpu_slice=0 will force it to yield right after it processes the
2922         # first prefix
2923
2924         d = fireEventually()
2925         def _check(ignored):
2926             # are we really right after the first prefix?
2927             state = ss.bucket_counter.get_state()
2928             if state["last-complete-prefix"] is None:
2929                 d2 = fireEventually()
2930                 d2.addCallback(_check)
2931                 return d2
2932             self.failUnlessEqual(state["last-complete-prefix"],
2933                                  ss.bucket_counter.prefixes[0])
2934             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2935             html = w.renderSynchronously()
2936             s = remove_tags(html)
2937             self.failUnlessIn(" Current crawl ", s)
2938             self.failUnlessIn(" (next work in ", s)
2939         d.addCallback(_check)
2940
2941         # now give it enough time to complete a full cycle
2942         def _watch():
2943             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2944         d.addCallback(lambda ignored: self.poll(_watch))
2945         def _check2(ignored):
2946             ss.bucket_counter.cpu_slice = orig_cpu_slice
2947             html = w.renderSynchronously()
2948             s = remove_tags(html)
2949             self.failUnlessIn("Total buckets: 0 (the number of", s)
2950             self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
2951         d.addCallback(_check2)
2952         return d
2953
2954     def test_bucket_counter_cleanup(self):
2955         basedir = "storage/BucketCounter/bucket_counter_cleanup"
2956         fileutil.make_dirs(basedir)
2957         ss = StorageServer(basedir, "\x00" * 20)
2958         # to make sure we capture the bucket-counting-crawler in the middle
2959         # of a cycle, we reach in and reduce its maximum slice time to 0.
2960         ss.bucket_counter.slow_start = 0
2961         orig_cpu_slice = ss.bucket_counter.cpu_slice
2962         ss.bucket_counter.cpu_slice = 0
2963         ss.setServiceParent(self.s)
2964
2965         d = fireEventually()
2966
2967         def _after_first_prefix(ignored):
2968             state = ss.bucket_counter.state
2969             if state["last-complete-prefix"] is None:
2970                 d2 = fireEventually()
2971                 d2.addCallback(_after_first_prefix)
2972                 return d2
2973             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2974             # now sneak in and mess with its state, to make sure it cleans up
2975             # properly at the end of the cycle
2976             self.failUnlessEqual(state["last-complete-prefix"],
2977                                  ss.bucket_counter.prefixes[0])
2978             state["bucket-counts"][-12] = {}
2979             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
2980             ss.bucket_counter.save_state()
2981         d.addCallback(_after_first_prefix)
2982
2983         # now give it enough time to complete a cycle
2984         def _watch():
2985             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2986         d.addCallback(lambda ignored: self.poll(_watch))
2987         def _check2(ignored):
2988             ss.bucket_counter.cpu_slice = orig_cpu_slice
2989             s = ss.bucket_counter.get_state()
2990             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
2991             self.failIf("bogusprefix!" in s["storage-index-samples"],
2992                         s["storage-index-samples"].keys())
2993         d.addCallback(_check2)
2994         return d
2995
2996     def test_bucket_counter_eta(self):
2997         basedir = "storage/BucketCounter/bucket_counter_eta"
2998         fileutil.make_dirs(basedir)
2999         ss = MyStorageServer(basedir, "\x00" * 20)
3000         ss.bucket_counter.slow_start = 0
3001         # these will be fired inside finished_prefix()
3002         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
3003         w = StorageStatus(ss)
3004
3005         d = defer.Deferred()
3006
3007         def _check_1(ignored):
3008             # no ETA is available yet
3009             html = w.renderSynchronously()
3010             s = remove_tags(html)
3011             self.failUnlessIn("complete (next work", s)
3012
3013         def _check_2(ignored):
3014             # one prefix has finished, so an ETA based upon that elapsed time
3015             # should be available.
3016             html = w.renderSynchronously()
3017             s = remove_tags(html)
3018             self.failUnlessIn("complete (ETA ", s)
3019
3020         def _check_3(ignored):
3021             # two prefixes have finished
3022             html = w.renderSynchronously()
3023             s = remove_tags(html)
3024             self.failUnlessIn("complete (ETA ", s)
3025             d.callback("done")
3026
3027         hooks[0].addCallback(_check_1).addErrback(d.errback)
3028         hooks[1].addCallback(_check_2).addErrback(d.errback)
3029         hooks[2].addCallback(_check_3).addErrback(d.errback)
3030
3031         ss.setServiceParent(self.s)
3032         return d
3033
3034 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
3035     stop_after_first_bucket = False
3036     def process_bucket(self, *args, **kwargs):
3037         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
3038         if self.stop_after_first_bucket:
3039             self.stop_after_first_bucket = False
3040             self.cpu_slice = -1.0
3041     def yielding(self, sleep_time):
3042         if not self.stop_after_first_bucket:
3043             self.cpu_slice = 500
3044
3045 class BrokenStatResults:
3046     pass
3047 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
3048     def stat(self, fn):
3049         s = os.stat(fn)
3050         bsr = BrokenStatResults()
3051         for attrname in dir(s):
3052             if attrname.startswith("_"):
3053                 continue
3054             if attrname == "st_blocks":
3055                 continue
3056             setattr(bsr, attrname, getattr(s, attrname))
3057         return bsr
3058
3059 class InstrumentedStorageServer(StorageServer):
3060     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
3061 class No_ST_BLOCKS_StorageServer(StorageServer):
3062     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
3063
3064 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3065
3066     def setUp(self):
3067         self.s = service.MultiService()
3068         self.s.startService()
3069     def tearDown(self):
3070         return self.s.stopService()
3071
3072     def make_shares(self, ss):
3073         def make(si):
3074             return (si, hashutil.tagged_hash("renew", si),
3075                     hashutil.tagged_hash("cancel", si))
3076         def make_mutable(si):
3077             return (si, hashutil.tagged_hash("renew", si),
3078                     hashutil.tagged_hash("cancel", si),
3079                     hashutil.tagged_hash("write-enabler", si))
3080         def make_extra_lease(si, num):
3081             return (hashutil.tagged_hash("renew-%d" % num, si),
3082                     hashutil.tagged_hash("cancel-%d" % num, si))
3083
3084         immutable_si_0, rs0, cs0 = make("\x00" * 16)
3085         immutable_si_1, rs1, cs1 = make("\x01" * 16)
3086         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
3087         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
3088         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
3089         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
3090         sharenums = [0]
3091         canary = FakeCanary()
3092         # note: 'tahoe debug dump-share' will not handle this file, since the
3093         # inner contents are not a valid CHK share
3094         data = "\xff" * 1000
3095
3096         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
3097                                          1000, canary)
3098         w[0].remote_write(0, data)
3099         w[0].remote_close()
3100
3101         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
3102                                          1000, canary)
3103         w[0].remote_write(0, data)
3104         w[0].remote_close()
3105         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
3106
3107         writev = ss.remote_slot_testv_and_readv_and_writev
3108         writev(mutable_si_2, (we2, rs2, cs2),
3109                {0: ([], [(0,data)], len(data))}, [])
3110         writev(mutable_si_3, (we3, rs3, cs3),
3111                {0: ([], [(0,data)], len(data))}, [])
3112         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
3113
3114         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
3115         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
3116         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
3117
3118     def test_basic(self):
3119         basedir = "storage/LeaseCrawler/basic"
3120         fileutil.make_dirs(basedir)
3121         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3122         # make it start sooner than usual.
3123         lc = ss.lease_checker
3124         lc.slow_start = 0
3125         lc.cpu_slice = 500
3126         lc.stop_after_first_bucket = True
3127         webstatus = StorageStatus(ss)
3128
3129         # create a few shares, with some leases on them
3130         self.make_shares(ss)
3131         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3132
3133         # add a non-sharefile to exercise another code path
3134         fn = os.path.join(ss.sharedir,
3135                           storage_index_to_dir(immutable_si_0),
3136                           "not-a-share")
3137         f = open(fn, "wb")
3138         f.write("I am not a share.\n")
3139         f.close()
3140
3141         # this is before the crawl has started, so we're not in a cycle yet
3142         initial_state = lc.get_state()
3143         self.failIf(lc.get_progress()["cycle-in-progress"])
3144         self.failIfIn("cycle-to-date", initial_state)
3145         self.failIfIn("estimated-remaining-cycle", initial_state)
3146         self.failIfIn("estimated-current-cycle", initial_state)
3147         self.failUnlessIn("history", initial_state)
3148         self.failUnlessEqual(initial_state["history"], {})
3149
3150         ss.setServiceParent(self.s)
3151
3152         DAY = 24*60*60
3153
3154         d = fireEventually()
3155
3156         # now examine the state right after the first bucket has been
3157         # processed.
3158         def _after_first_bucket(ignored):
3159             initial_state = lc.get_state()
3160             if "cycle-to-date" not in initial_state:
3161                 d2 = fireEventually()
3162                 d2.addCallback(_after_first_bucket)
3163                 return d2
3164             self.failUnlessIn("cycle-to-date", initial_state)
3165             self.failUnlessIn("estimated-remaining-cycle", initial_state)
3166             self.failUnlessIn("estimated-current-cycle", initial_state)
3167             self.failUnlessIn("history", initial_state)
3168             self.failUnlessEqual(initial_state["history"], {})
3169
3170             so_far = initial_state["cycle-to-date"]
3171             self.failUnlessEqual(so_far["expiration-enabled"], False)
3172             self.failUnlessIn("configured-expiration-mode", so_far)
3173             self.failUnlessIn("lease-age-histogram", so_far)
3174             lah = so_far["lease-age-histogram"]
3175             self.failUnlessEqual(type(lah), list)
3176             self.failUnlessEqual(len(lah), 1)
3177             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
3178             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
3179             self.failUnlessEqual(so_far["corrupt-shares"], [])
3180             sr1 = so_far["space-recovered"]
3181             self.failUnlessEqual(sr1["examined-buckets"], 1)
3182             self.failUnlessEqual(sr1["examined-shares"], 1)
3183             self.failUnlessEqual(sr1["actual-shares"], 0)
3184             self.failUnlessEqual(sr1["configured-diskbytes"], 0)
3185             self.failUnlessEqual(sr1["original-sharebytes"], 0)
3186             left = initial_state["estimated-remaining-cycle"]
3187             sr2 = left["space-recovered"]
3188             self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
3189             self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
3190             self.failIfEqual(sr2["actual-shares"], None)
3191             self.failIfEqual(sr2["configured-diskbytes"], None)
3192             self.failIfEqual(sr2["original-sharebytes"], None)
3193         d.addCallback(_after_first_bucket)
3194         d.addCallback(lambda ign: self.render1(webstatus))
3195         def _check_html_in_cycle(html):
3196             s = remove_tags(html)
3197             self.failUnlessIn("So far, this cycle has examined "
3198                               "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
3199             self.failUnlessIn("and has recovered: "
3200                               "0 shares, 0 buckets (0 mutable / 0 immutable), "
3201                               "0 B (0 B / 0 B)", s)
3202             self.failUnlessIn("If expiration were enabled, "
3203                               "we would have recovered: "
3204                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3205                               " 0 B (0 B / 0 B) by now", s)
3206             self.failUnlessIn("and the remainder of this cycle "
3207                               "would probably recover: "
3208                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3209                               " 0 B (0 B / 0 B)", s)
3210             self.failUnlessIn("and the whole cycle would probably recover: "
3211                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3212                               " 0 B (0 B / 0 B)", s)
3213             self.failUnlessIn("if we were strictly using each lease's default "
3214                               "31-day lease lifetime", s)
3215             self.failUnlessIn("this cycle would be expected to recover: ", s)
3216         d.addCallback(_check_html_in_cycle)
3217
3218         # wait for the crawler to finish the first cycle. Nothing should have
3219         # been removed.
3220         def _wait():
3221             return bool(lc.get_state()["last-cycle-finished"] is not None)
3222         d.addCallback(lambda ign: self.poll(_wait))
3223
3224         def _after_first_cycle(ignored):
3225             s = lc.get_state()
3226             self.failIf("cycle-to-date" in s)
3227             self.failIf("estimated-remaining-cycle" in s)
3228             self.failIf("estimated-current-cycle" in s)
3229             last = s["history"][0]
3230             self.failUnlessIn("cycle-start-finish-times", last)
3231             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
3232             self.failUnlessEqual(last["expiration-enabled"], False)
3233             self.failUnlessIn("configured-expiration-mode", last)
3234
3235             self.failUnlessIn("lease-age-histogram", last)
3236             lah = last["lease-age-histogram"]
3237             self.failUnlessEqual(type(lah), list)
3238             self.failUnlessEqual(len(lah), 1)
3239             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
3240
3241             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3242             self.failUnlessEqual(last["corrupt-shares"], [])
3243
3244             rec = last["space-recovered"]
3245             self.failUnlessEqual(rec["examined-buckets"], 4)
3246             self.failUnlessEqual(rec["examined-shares"], 4)
3247             self.failUnlessEqual(rec["actual-buckets"], 0)
3248             self.failUnlessEqual(rec["original-buckets"], 0)
3249             self.failUnlessEqual(rec["configured-buckets"], 0)
3250             self.failUnlessEqual(rec["actual-shares"], 0)
3251             self.failUnlessEqual(rec["original-shares"], 0)
3252             self.failUnlessEqual(rec["configured-shares"], 0)
3253             self.failUnlessEqual(rec["actual-diskbytes"], 0)
3254             self.failUnlessEqual(rec["original-diskbytes"], 0)
3255             self.failUnlessEqual(rec["configured-diskbytes"], 0)
3256             self.failUnlessEqual(rec["actual-sharebytes"], 0)
3257             self.failUnlessEqual(rec["original-sharebytes"], 0)
3258             self.failUnlessEqual(rec["configured-sharebytes"], 0)
3259
3260             def _get_sharefile(si):
3261                 return list(ss._iter_share_files(si))[0]
3262             def count_leases(si):
3263                 return len(list(_get_sharefile(si).get_leases()))
3264             self.failUnlessEqual(count_leases(immutable_si_0), 1)
3265             self.failUnlessEqual(count_leases(immutable_si_1), 2)
3266             self.failUnlessEqual(count_leases(mutable_si_2), 1)
3267             self.failUnlessEqual(count_leases(mutable_si_3), 2)
3268         d.addCallback(_after_first_cycle)
3269         d.addCallback(lambda ign: self.render1(webstatus))
3270         def _check_html(html):
3271             s = remove_tags(html)
3272             self.failUnlessIn("recovered: 0 shares, 0 buckets "
3273                               "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
3274             self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
3275                               "(2 mutable / 2 immutable),", s)
3276             self.failUnlessIn("but expiration was not enabled", s)
3277         d.addCallback(_check_html)
3278         d.addCallback(lambda ign: self.render_json(webstatus))
3279         def _check_json(json):
3280             data = simplejson.loads(json)
3281             self.failUnlessIn("lease-checker", data)
3282             self.failUnlessIn("lease-checker-progress", data)
3283         d.addCallback(_check_json)
3284         return d
3285
3286     def backdate_lease(self, sf, renew_secret, new_expire_time):
3287         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
3288         # "renew" a lease with a new_expire_time that is older than what the
3289         # current lease has), so we have to reach inside it.
3290         for i,lease in enumerate(sf.get_leases()):
3291             if lease.renew_secret == renew_secret:
3292                 lease.expiration_time = new_expire_time
3293                 f = open(sf.home, 'rb+')
3294                 sf._write_lease_record(f, i, lease)
3295                 f.close()
3296                 return
3297         raise IndexError("unable to renew non-existent lease")
3298
3299     def test_expire_age(self):
3300         basedir = "storage/LeaseCrawler/expire_age"
3301         fileutil.make_dirs(basedir)
3302         # setting expiration_time to 2000 means that any lease which is more
3303         # than 2000s old will be expired.
3304         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3305                                        expiration_enabled=True,
3306                                        expiration_mode="age",
3307                                        expiration_override_lease_duration=2000)
3308         # make it start sooner than usual.
3309         lc = ss.lease_checker
3310         lc.slow_start = 0
3311         lc.stop_after_first_bucket = True
3312         webstatus = StorageStatus(ss)
3313
3314         # create a few shares, with some leases on them
3315         self.make_shares(ss)
3316         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3317
3318         def count_shares(si):
3319             return len(list(ss._iter_share_files(si)))
3320         def _get_sharefile(si):
3321             return list(ss._iter_share_files(si))[0]
3322         def count_leases(si):
3323             return len(list(_get_sharefile(si).get_leases()))
3324
3325         self.failUnlessEqual(count_shares(immutable_si_0), 1)
3326         self.failUnlessEqual(count_leases(immutable_si_0), 1)
3327         self.failUnlessEqual(count_shares(immutable_si_1), 1)
3328         self.failUnlessEqual(count_leases(immutable_si_1), 2)
3329         self.failUnlessEqual(count_shares(mutable_si_2), 1)
3330         self.failUnlessEqual(count_leases(mutable_si_2), 1)
3331         self.failUnlessEqual(count_shares(mutable_si_3), 1)
3332         self.failUnlessEqual(count_leases(mutable_si_3), 2)
3333
3334         # artificially crank back the expiration time on the first lease of
3335         # each share, to make it look like it expired already (age=1000s).
3336         # Some shares have an extra lease which is set to expire at the
3337         # default time in 31 days from now (age=31days). We then run the
3338         # crawler, which will expire the first lease, making some shares get
3339         # deleted and others stay alive (with one remaining lease)
3340         now = time.time()
3341
3342         sf0 = _get_sharefile(immutable_si_0)
3343         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
3344         sf0_size = os.stat(sf0.home).st_size
3345
3346         # immutable_si_1 gets an extra lease
3347         sf1 = _get_sharefile(immutable_si_1)
3348         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
3349
3350         sf2 = _get_sharefile(mutable_si_2)
3351         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
3352         sf2_size = os.stat(sf2.home).st_size
3353
3354         # mutable_si_3 gets an extra lease
3355         sf3 = _get_sharefile(mutable_si_3)
3356         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
3357
3358         ss.setServiceParent(self.s)
3359
3360         d = fireEventually()
3361         # examine the state right after the first bucket has been processed
3362         def _after_first_bucket(ignored):
3363             p = lc.get_progress()
3364             if not p["cycle-in-progress"]:
3365                 d2 = fireEventually()
3366                 d2.addCallback(_after_first_bucket)
3367                 return d2
3368         d.addCallback(_after_first_bucket)
3369         d.addCallback(lambda ign: self.render1(webstatus))
3370         def _check_html_in_cycle(html):
3371             s = remove_tags(html)
3372             # the first bucket encountered gets deleted, and its prefix
3373             # happens to be about 1/5th of the way through the ring, so the
3374             # predictor thinks we'll have 5 shares and that we'll delete them
3375             # all. This part of the test depends upon the SIs landing right
3376             # where they do now.
3377             self.failUnlessIn("The remainder of this cycle is expected to "
3378                               "recover: 4 shares, 4 buckets", s)
3379             self.failUnlessIn("The whole cycle is expected to examine "
3380                               "5 shares in 5 buckets and to recover: "
3381                               "5 shares, 5 buckets", s)
3382         d.addCallback(_check_html_in_cycle)
3383
3384         # wait for the crawler to finish the first cycle. Two shares should
3385         # have been removed
3386         def _wait():
3387             return bool(lc.get_state()["last-cycle-finished"] is not None)
3388         d.addCallback(lambda ign: self.poll(_wait))
3389
3390         def _after_first_cycle(ignored):
3391             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3392             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3393             self.failUnlessEqual(count_leases(immutable_si_1), 1)
3394             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3395             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3396             self.failUnlessEqual(count_leases(mutable_si_3), 1)
3397
3398             s = lc.get_state()
3399             last = s["history"][0]
3400
3401             self.failUnlessEqual(last["expiration-enabled"], True)
3402             self.failUnlessEqual(last["configured-expiration-mode"],
3403                                  ("age", 2000, None, ("mutable", "immutable")))
3404             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3405
3406             rec = last["space-recovered"]
3407             self.failUnlessEqual(rec["examined-buckets"], 4)
3408             self.failUnlessEqual(rec["examined-shares"], 4)
3409             self.failUnlessEqual(rec["actual-buckets"], 2)
3410             self.failUnlessEqual(rec["original-buckets"], 2)
3411             self.failUnlessEqual(rec["configured-buckets"], 2)
3412             self.failUnlessEqual(rec["actual-shares"], 2)
3413             self.failUnlessEqual(rec["original-shares"], 2)
3414             self.failUnlessEqual(rec["configured-shares"], 2)
3415             size = sf0_size + sf2_size
3416             self.failUnlessEqual(rec["actual-sharebytes"], size)
3417             self.failUnlessEqual(rec["original-sharebytes"], size)
3418             self.failUnlessEqual(rec["configured-sharebytes"], size)
3419             # different platforms have different notions of "blocks used by
3420             # this file", so merely assert that it's a number
3421             self.failUnless(rec["actual-diskbytes"] >= 0,
3422                             rec["actual-diskbytes"])
3423             self.failUnless(rec["original-diskbytes"] >= 0,
3424                             rec["original-diskbytes"])
3425             self.failUnless(rec["configured-diskbytes"] >= 0,
3426                             rec["configured-diskbytes"])
3427         d.addCallback(_after_first_cycle)
3428         d.addCallback(lambda ign: self.render1(webstatus))
3429         def _check_html(html):
3430             s = remove_tags(html)
3431             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
3432             self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
3433             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3434         d.addCallback(_check_html)
3435         return d
3436
3437     def test_expire_cutoff_date(self):
3438         basedir = "storage/LeaseCrawler/expire_cutoff_date"
3439         fileutil.make_dirs(basedir)
3440         # setting cutoff-date to 2000 seconds ago means that any lease which
3441         # is more than 2000s old will be expired.
3442         now = time.time()
3443         then = int(now - 2000)
3444         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3445                                        expiration_enabled=True,
3446                                        expiration_mode="cutoff-date",
3447                                        expiration_cutoff_date=then)
3448         # make it start sooner than usual.
3449         lc = ss.lease_checker
3450         lc.slow_start = 0
3451         lc.stop_after_first_bucket = True
3452         webstatus = StorageStatus(ss)
3453
3454         # create a few shares, with some leases on them
3455         self.make_shares(ss)
3456         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3457
3458         def count_shares(si):
3459             return len(list(ss._iter_share_files(si)))
3460         def _get_sharefile(si):
3461             return list(ss._iter_share_files(si))[0]
3462         def count_leases(si):
3463             return len(list(_get_sharefile(si).get_leases()))
3464
3465         self.failUnlessEqual(count_shares(immutable_si_0), 1)
3466         self.failUnlessEqual(count_leases(immutable_si_0), 1)
3467         self.failUnlessEqual(count_shares(immutable_si_1), 1)
3468         self.failUnlessEqual(count_leases(immutable_si_1), 2)
3469         self.failUnlessEqual(count_shares(mutable_si_2), 1)
3470         self.failUnlessEqual(count_leases(mutable_si_2), 1)
3471         self.failUnlessEqual(count_shares(mutable_si_3), 1)
3472         self.failUnlessEqual(count_leases(mutable_si_3), 2)
3473
3474         # artificially crank back the expiration time on the first lease of
3475         # each share, to make it look like was renewed 3000s ago. To achieve
3476         # this, we need to set the expiration time to now-3000+31days. This
3477         # will change when the lease format is improved to contain both
3478         # create/renew time and duration.
3479         new_expiration_time = now - 3000 + 31*24*60*60
3480
3481         # Some shares have an extra lease which is set to expire at the
3482         # default time in 31 days from now (age=31days). We then run the
3483         # crawler, which will expire the first lease, making some shares get
3484         # deleted and others stay alive (with one remaining lease)
3485
3486         sf0 = _get_sharefile(immutable_si_0)
3487         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3488         sf0_size = os.stat(sf0.home).st_size
3489
3490         # immutable_si_1 gets an extra lease
3491         sf1 = _get_sharefile(immutable_si_1)
3492         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3493
3494         sf2 = _get_sharefile(mutable_si_2)
3495         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3496         sf2_size = os.stat(sf2.home).st_size
3497
3498         # mutable_si_3 gets an extra lease
3499         sf3 = _get_sharefile(mutable_si_3)
3500         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3501
3502         ss.setServiceParent(self.s)
3503
3504         d = fireEventually()
3505         # examine the state right after the first bucket has been processed
3506         def _after_first_bucket(ignored):
3507             p = lc.get_progress()
3508             if not p["cycle-in-progress"]:
3509                 d2 = fireEventually()
3510                 d2.addCallback(_after_first_bucket)
3511                 return d2
3512         d.addCallback(_after_first_bucket)
3513         d.addCallback(lambda ign: self.render1(webstatus))
3514         def _check_html_in_cycle(html):
3515             s = remove_tags(html)
3516             # the first bucket encountered gets deleted, and its prefix
3517             # happens to be about 1/5th of the way through the ring, so the
3518             # predictor thinks we'll have 5 shares and that we'll delete them
3519             # all. This part of the test depends upon the SIs landing right
3520             # where they do now.
3521             self.failUnlessIn("The remainder of this cycle is expected to "
3522                               "recover: 4 shares, 4 buckets", s)
3523             self.failUnlessIn("The whole cycle is expected to examine "
3524                               "5 shares in 5 buckets and to recover: "
3525                               "5 shares, 5 buckets", s)
3526         d.addCallback(_check_html_in_cycle)
3527
3528         # wait for the crawler to finish the first cycle. Two shares should
3529         # have been removed
3530         def _wait():
3531             return bool(lc.get_state()["last-cycle-finished"] is not None)
3532         d.addCallback(lambda ign: self.poll(_wait))
3533
3534         def _after_first_cycle(ignored):
3535             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3536             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3537             self.failUnlessEqual(count_leases(immutable_si_1), 1)
3538             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3539             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3540             self.failUnlessEqual(count_leases(mutable_si_3), 1)
3541
3542             s = lc.get_state()
3543             last = s["history"][0]
3544
3545             self.failUnlessEqual(last["expiration-enabled"], True)
3546             self.failUnlessEqual(last["configured-expiration-mode"],
3547                                  ("cutoff-date", None, then,
3548                                   ("mutable", "immutable")))
3549             self.failUnlessEqual(last["leases-per-share-histogram"],
3550                                  {1: 2, 2: 2})
3551
3552             rec = last["space-recovered"]
3553             self.failUnlessEqual(rec["examined-buckets"], 4)
3554             self.failUnlessEqual(rec["examined-shares"], 4)
3555             self.failUnlessEqual(rec["actual-buckets"], 2)
3556             self.failUnlessEqual(rec["original-buckets"], 0)
3557             self.failUnlessEqual(rec["configured-buckets"], 2)
3558             self.failUnlessEqual(rec["actual-shares"], 2)
3559             self.failUnlessEqual(rec["original-shares"], 0)
3560             self.failUnlessEqual(rec["configured-shares"], 2)
3561             size = sf0_size + sf2_size
3562             self.failUnlessEqual(rec["actual-sharebytes"], size)
3563             self.failUnlessEqual(rec["original-sharebytes"], 0)
3564             self.failUnlessEqual(rec["configured-sharebytes"], size)
3565             # different platforms have different notions of "blocks used by
3566             # this file", so merely assert that it's a number
3567             self.failUnless(rec["actual-diskbytes"] >= 0,
3568                             rec["actual-diskbytes"])
3569             self.failUnless(rec["original-diskbytes"] >= 0,
3570                             rec["original-diskbytes"])
3571             self.failUnless(rec["configured-diskbytes"] >= 0,
3572                             rec["configured-diskbytes"])
3573         d.addCallback(_after_first_cycle)
3574         d.addCallback(lambda ign: self.render1(webstatus))
3575         def _check_html(html):
3576             s = remove_tags(html)
3577             self.failUnlessIn("Expiration Enabled:"
3578                               " expired leases will be removed", s)
3579             date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
3580             substr = "Leases created or last renewed before %s will be considered expired." % date
3581             self.failUnlessIn(substr, s)
3582             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3583         d.addCallback(_check_html)
3584         return d
3585
3586     def test_only_immutable(self):
3587         basedir = "storage/LeaseCrawler/only_immutable"
3588         fileutil.make_dirs(basedir)
3589         now = time.time()
3590         then = int(now - 2000)
3591         ss = StorageServer(basedir, "\x00" * 20,
3592                            expiration_enabled=True,
3593                            expiration_mode="cutoff-date",
3594                            expiration_cutoff_date=then,
3595                            expiration_sharetypes=("immutable",))
3596         lc = ss.lease_checker
3597         lc.slow_start = 0
3598         webstatus = StorageStatus(ss)
3599
3600         self.make_shares(ss)
3601         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3602         # set all leases to be expirable
3603         new_expiration_time = now - 3000 + 31*24*60*60
3604
3605         def count_shares(si):
3606             return len(list(ss._iter_share_files(si)))
3607         def _get_sharefile(si):
3608             return list(ss._iter_share_files(si))[0]
3609         def count_leases(si):
3610             return len(list(_get_sharefile(si).get_leases()))
3611
3612         sf0 = _get_sharefile(immutable_si_0)
3613         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3614         sf1 = _get_sharefile(immutable_si_1)
3615         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3616         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3617         sf2 = _get_sharefile(mutable_si_2)
3618         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3619         sf3 = _get_sharefile(mutable_si_3)
3620         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3621         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3622
3623         ss.setServiceParent(self.s)
3624         def _wait():
3625             return bool(lc.get_state()["last-cycle-finished"] is not None)
3626         d = self.poll(_wait)
3627
3628         def _after_first_cycle(ignored):
3629             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3630             self.failUnlessEqual(count_shares(immutable_si_1), 0)
3631             self.failUnlessEqual(count_shares(mutable_si_2), 1)
3632             self.failUnlessEqual(count_leases(mutable_si_2), 1)
3633             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3634             self.failUnlessEqual(count_leases(mutable_si_3), 2)
3635         d.addCallback(_after_first_cycle)
3636         d.addCallback(lambda ign: self.render1(webstatus))
3637         def _check_html(html):
3638             s = remove_tags(html)
3639             self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
3640         d.addCallback(_check_html)
3641         return d
3642
3643     def test_only_mutable(self):
3644         basedir = "storage/LeaseCrawler/only_mutable"
3645         fileutil.make_dirs(basedir)
3646         now = time.time()
3647         then = int(now - 2000)
3648         ss = StorageServer(basedir, "\x00" * 20,
3649                            expiration_enabled=True,
3650                            expiration_mode="cutoff-date",
3651                            expiration_cutoff_date=then,
3652                            expiration_sharetypes=("mutable",))
3653         lc = ss.lease_checker
3654         lc.slow_start = 0
3655         webstatus = StorageStatus(ss)
3656
3657         self.make_shares(ss)
3658         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3659         # set all leases to be expirable
3660         new_expiration_time = now - 3000 + 31*24*60*60
3661
3662         def count_shares(si):
3663             return len(list(ss._iter_share_files(si)))
3664         def _get_sharefile(si):
3665             return list(ss._iter_share_files(si))[0]
3666         def count_leases(si):
3667             return len(list(_get_sharefile(si).get_leases()))
3668
3669         sf0 = _get_sharefile(immutable_si_0)
3670         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3671         sf1 = _get_sharefile(immutable_si_1)
3672         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3673         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3674         sf2 = _get_sharefile(mutable_si_2)
3675         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3676         sf3 = _get_sharefile(mutable_si_3)
3677         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3678         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3679
3680         ss.setServiceParent(self.s)
3681         def _wait():
3682             return bool(lc.get_state()["last-cycle-finished"] is not None)
3683         d = self.poll(_wait)
3684
3685         def _after_first_cycle(ignored):
3686             self.failUnlessEqual(count_shares(immutable_si_0), 1)
3687             self.failUnlessEqual(count_leases(immutable_si_0), 1)
3688             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3689             self.failUnlessEqual(count_leases(immutable_si_1), 2)
3690             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3691             self.failUnlessEqual(count_shares(mutable_si_3), 0)
3692         d.addCallback(_after_first_cycle)
3693         d.addCallback(lambda ign: self.render1(webstatus))
3694         def _check_html(html):
3695             s = remove_tags(html)
3696             self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
3697         d.addCallback(_check_html)
3698         return d
3699
3700     def test_bad_mode(self):
3701         basedir = "storage/LeaseCrawler/bad_mode"
3702         fileutil.make_dirs(basedir)
3703         e = self.failUnlessRaises(ValueError,
3704                                   StorageServer, basedir, "\x00" * 20,
3705                                   expiration_mode="bogus")
3706         self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
3707
3708     def test_parse_duration(self):
3709         DAY = 24*60*60
3710         MONTH = 31*DAY
3711         YEAR = 365*DAY
3712         p = time_format.parse_duration
3713         self.failUnlessEqual(p("7days"), 7*DAY)
3714         self.failUnlessEqual(p("31day"), 31*DAY)
3715         self.failUnlessEqual(p("60 days"), 60*DAY)
3716         self.failUnlessEqual(p("2mo"), 2*MONTH)
3717         self.failUnlessEqual(p("3 month"), 3*MONTH)
3718         self.failUnlessEqual(p("2years"), 2*YEAR)
3719         e = self.failUnlessRaises(ValueError, p, "2kumquats")
3720         self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
3721
3722     def test_parse_date(self):
3723         p = time_format.parse_date
3724         self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
3725         self.failUnlessEqual(p("2009-03-18"), 1237334400)
3726
3727     def test_limited_history(self):
3728         basedir = "storage/LeaseCrawler/limited_history"
3729         fileutil.make_dirs(basedir)
3730         ss = StorageServer(basedir, "\x00" * 20)
3731         # make it start sooner than usual.
3732         lc = ss.lease_checker
3733         lc.slow_start = 0
3734         lc.cpu_slice = 500
3735
3736         # create a few shares, with some leases on them
3737         self.make_shares(ss)
3738
3739         ss.setServiceParent(self.s)
3740
3741         def _wait_until_15_cycles_done():
3742             last = lc.state["last-cycle-finished"]
3743             if last is not None and last >= 15:
3744                 return True
3745             if lc.timer:
3746                 lc.timer.reset(0)
3747             return False
3748         d = self.poll(_wait_until_15_cycles_done)
3749
3750         def _check(ignored):
3751             s = lc.get_state()
3752             h = s["history"]
3753             self.failUnlessEqual(len(h), 10)
3754             self.failUnlessEqual(max(h.keys()), 15)
3755             self.failUnlessEqual(min(h.keys()), 6)
3756         d.addCallback(_check)
3757         return d
3758
3759     def test_unpredictable_future(self):
3760         basedir = "storage/LeaseCrawler/unpredictable_future"
3761         fileutil.make_dirs(basedir)
3762         ss = StorageServer(basedir, "\x00" * 20)
3763         # make it start sooner than usual.
3764         lc = ss.lease_checker
3765         lc.slow_start = 0
3766         lc.cpu_slice = -1.0 # stop quickly
3767
3768         self.make_shares(ss)
3769
3770         ss.setServiceParent(self.s)
3771
3772         d = fireEventually()
3773         def _check(ignored):
3774             # this should fire after the first bucket is complete, but before
3775             # the first prefix is complete, so the progress-measurer won't
3776             # think we've gotten far enough to raise our percent-complete
3777             # above 0%, triggering the cannot-predict-the-future code in
3778             # expirer.py . This will have to change if/when the
3779             # progress-measurer gets smart enough to count buckets (we'll
3780             # have to interrupt it even earlier, before it's finished the
3781             # first bucket).
3782             s = lc.get_state()
3783             if "cycle-to-date" not in s:
3784                 d2 = fireEventually()
3785                 d2.addCallback(_check)
3786                 return d2
3787             self.failUnlessIn("cycle-to-date", s)
3788             self.failUnlessIn("estimated-remaining-cycle", s)
3789             self.failUnlessIn("estimated-current-cycle", s)
3790
3791             left = s["estimated-remaining-cycle"]["space-recovered"]
3792             self.failUnlessEqual(left["actual-buckets"], None)
3793             self.failUnlessEqual(left["original-buckets"], None)
3794             self.failUnlessEqual(left["configured-buckets"], None)
3795             self.failUnlessEqual(left["actual-shares"], None)
3796             self.failUnlessEqual(left["original-shares"], None)
3797             self.failUnlessEqual(left["configured-shares"], None)
3798             self.failUnlessEqual(left["actual-diskbytes"], None)
3799             self.failUnlessEqual(left["original-diskbytes"], None)
3800             self.failUnlessEqual(left["configured-diskbytes"], None)
3801             self.failUnlessEqual(left["actual-sharebytes"], None)
3802             self.failUnlessEqual(left["original-sharebytes"], None)
3803             self.failUnlessEqual(left["configured-sharebytes"], None)
3804
3805             full = s["estimated-remaining-cycle"]["space-recovered"]
3806             self.failUnlessEqual(full["actual-buckets"], None)
3807             self.failUnlessEqual(full["original-buckets"], None)
3808             self.failUnlessEqual(full["configured-buckets"], None)
3809             self.failUnlessEqual(full["actual-shares"], None)
3810             self.failUnlessEqual(full["original-shares"], None)
3811             self.failUnlessEqual(full["configured-shares"], None)
3812             self.failUnlessEqual(full["actual-diskbytes"], None)
3813             self.failUnlessEqual(full["original-diskbytes"], None)
3814             self.failUnlessEqual(full["configured-diskbytes"], None)
3815             self.failUnlessEqual(full["actual-sharebytes"], None)
3816             self.failUnlessEqual(full["original-sharebytes"], None)
3817             self.failUnlessEqual(full["configured-sharebytes"], None)
3818
3819         d.addCallback(_check)
3820         return d
3821
3822     def test_no_st_blocks(self):
3823         basedir = "storage/LeaseCrawler/no_st_blocks"
3824         fileutil.make_dirs(basedir)
3825         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
3826                                         expiration_mode="age",
3827                                         expiration_override_lease_duration=-1000)
3828         # a negative expiration_time= means the "configured-"
3829         # space-recovered counts will be non-zero, since all shares will have
3830         # expired by then
3831
3832         # make it start sooner than usual.
3833         lc = ss.lease_checker
3834         lc.slow_start = 0
3835
3836         self.make_shares(ss)
3837         ss.setServiceParent(self.s)
3838         def _wait():
3839             return bool(lc.get_state()["last-cycle-finished"] is not None)
3840         d = self.poll(_wait)
3841
3842         def _check(ignored):
3843             s = lc.get_state()
3844             last = s["history"][0]
3845             rec = last["space-recovered"]
3846             self.failUnlessEqual(rec["configured-buckets"], 4)
3847             self.failUnlessEqual(rec["configured-shares"], 4)
3848             self.failUnless(rec["configured-sharebytes"] > 0,
3849                             rec["configured-sharebytes"])
3850             # without the .st_blocks field in os.stat() results, we should be
3851             # reporting diskbytes==sharebytes
3852             self.failUnlessEqual(rec["configured-sharebytes"],
3853                                  rec["configured-diskbytes"])
3854         d.addCallback(_check)
3855         return d
3856
3857     def test_share_corruption(self):
3858         self._poll_should_ignore_these_errors = [
3859             UnknownMutableContainerVersionError,
3860             UnknownImmutableContainerVersionError,
3861             ]
3862         basedir = "storage/LeaseCrawler/share_corruption"
3863         fileutil.make_dirs(basedir)
3864         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3865         w = StorageStatus(ss)
3866         # make it start sooner than usual.
3867         lc = ss.lease_checker
3868         lc.stop_after_first_bucket = True
3869         lc.slow_start = 0
3870         lc.cpu_slice = 500
3871
3872         # create a few shares, with some leases on them
3873         self.make_shares(ss)
3874
3875         # now corrupt one, and make sure the lease-checker keeps going
3876         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3877         first = min(self.sis)
3878         first_b32 = base32.b2a(first)
3879         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
3880         f = open(fn, "rb+")
3881         f.seek(0)
3882         f.write("BAD MAGIC")
3883         f.close()
3884         # if get_share_file() doesn't see the correct mutable magic, it
3885         # assumes the file is an immutable share, and then
3886         # immutable.ShareFile sees a bad version. So regardless of which kind
3887         # of share we corrupted, this will trigger an
3888         # UnknownImmutableContainerVersionError.
3889
3890         # also create an empty bucket
3891         empty_si = base32.b2a("\x04"*16)
3892         empty_bucket_dir = os.path.join(ss.sharedir,
3893                                         storage_index_to_dir(empty_si))
3894         fileutil.make_dirs(empty_bucket_dir)
3895
3896         ss.setServiceParent(self.s)
3897
3898         d = fireEventually()
3899
3900         # now examine the state right after the first bucket has been
3901         # processed.
3902         def _after_first_bucket(ignored):
3903             s = lc.get_state()
3904             if "cycle-to-date" not in s:
3905                 d2 = fireEventually()
3906                 d2.addCallback(_after_first_bucket)
3907                 return d2
3908             so_far = s["cycle-to-date"]
3909             rec = so_far["space-recovered"]
3910             self.failUnlessEqual(rec["examined-buckets"], 1)
3911             self.failUnlessEqual(rec["examined-shares"], 0)
3912             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
3913         d.addCallback(_after_first_bucket)
3914
3915         d.addCallback(lambda ign: self.render_json(w))
3916         def _check_json(json):
3917             data = simplejson.loads(json)
3918             # grr. json turns all dict keys into strings.
3919             so_far = data["lease-checker"]["cycle-to-date"]
3920             corrupt_shares = so_far["corrupt-shares"]
3921             # it also turns all tuples into lists
3922             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3923         d.addCallback(_check_json)
3924         d.addCallback(lambda ign: self.render1(w))
3925         def _check_html(html):
3926             s = remove_tags(html)
3927             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3928         d.addCallback(_check_html)
3929
3930         def _wait():
3931             return bool(lc.get_state()["last-cycle-finished"] is not None)
3932         d.addCallback(lambda ign: self.poll(_wait))
3933
3934         def _after_first_cycle(ignored):
3935             s = lc.get_state()
3936             last = s["history"][0]
3937             rec = last["space-recovered"]
3938             self.failUnlessEqual(rec["examined-buckets"], 5)
3939             self.failUnlessEqual(rec["examined-shares"], 3)
3940             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
3941         d.addCallback(_after_first_cycle)
3942         d.addCallback(lambda ign: self.render_json(w))
3943         def _check_json_history(json):
3944             data = simplejson.loads(json)
3945             last = data["lease-checker"]["history"]["0"]
3946             corrupt_shares = last["corrupt-shares"]
3947             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3948         d.addCallback(_check_json_history)
3949         d.addCallback(lambda ign: self.render1(w))
3950         def _check_html_history(html):
3951             s = remove_tags(html)
3952             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3953         d.addCallback(_check_html_history)
3954
3955         def _cleanup(res):
3956             self.flushLoggedErrors(UnknownMutableContainerVersionError,
3957                                    UnknownImmutableContainerVersionError)
3958             return res
3959         d.addBoth(_cleanup)
3960         return d
3961
3962     def render_json(self, page):
3963         d = self.render1(page, args={"t": ["json"]})
3964         return d
3965
3966 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3967
3968     def setUp(self):
3969         self.s = service.MultiService()
3970         self.s.startService()
3971     def tearDown(self):
3972         return self.s.stopService()
3973
3974     def test_no_server(self):
3975         w = StorageStatus(None)
3976         html = w.renderSynchronously()
3977         self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
3978
3979     def test_status(self):
3980         basedir = "storage/WebStatus/status"
3981         fileutil.make_dirs(basedir)
3982         nodeid = "\x00" * 20
3983         ss = StorageServer(basedir, nodeid)
3984         ss.setServiceParent(self.s)
3985         w = StorageStatus(ss, "nickname")
3986         d = self.render1(w)
3987         def _check_html(html):
3988             self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3989             s = remove_tags(html)
3990             self.failUnlessIn("Server Nickname: nickname", s)
3991             self.failUnlessIn("Server Nodeid: %s"  % base32.b2a(nodeid), s)
3992             self.failUnlessIn("Accepting new shares: Yes", s)
3993             self.failUnlessIn("Reserved space: - 0 B (0)", s)
3994         d.addCallback(_check_html)
3995         d.addCallback(lambda ign: self.render_json(w))
3996         def _check_json(json):
3997             data = simplejson.loads(json)
3998             s = data["stats"]
3999             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
4000             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
4001             self.failUnlessIn("bucket-counter", data)
4002             self.failUnlessIn("lease-checker", data)
4003         d.addCallback(_check_json)
4004         return d
4005
4006     def render_json(self, page):
4007         d = self.render1(page, args={"t": ["json"]})
4008         return d
4009
4010     @mock.patch('allmydata.util.fileutil.get_disk_stats')
4011     def test_status_no_disk_stats(self, mock_get_disk_stats):
4012         mock_get_disk_stats.side_effect = AttributeError()
4013
4014         # Some platforms may have no disk stats API. Make sure the code can handle that
4015         # (test runs on all platforms).
4016         basedir = "storage/WebStatus/status_no_disk_stats"
4017         fileutil.make_dirs(basedir)
4018         ss = StorageServer(basedir, "\x00" * 20)
4019         ss.setServiceParent(self.s)
4020         w = StorageStatus(ss)
4021         html = w.renderSynchronously()
4022         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4023         s = remove_tags(html)
4024         self.failUnlessIn("Accepting new shares: Yes", s)
4025         self.failUnlessIn("Total disk space: ?", s)
4026         self.failUnlessIn("Space Available to Tahoe: ?", s)
4027         self.failUnless(ss.get_available_space() is None)
4028
4029     @mock.patch('allmydata.util.fileutil.get_disk_stats')
4030     def test_status_bad_disk_stats(self, mock_get_disk_stats):
4031         mock_get_disk_stats.side_effect = OSError()
4032
4033         # If the API to get disk stats exists but a call to it fails, then the status should
4034         # show that no shares will be accepted, and get_available_space() should be 0.
4035         basedir = "storage/WebStatus/status_bad_disk_stats"
4036         fileutil.make_dirs(basedir)
4037         ss = StorageServer(basedir, "\x00" * 20)
4038         ss.setServiceParent(self.s)
4039         w = StorageStatus(ss)
4040         html = w.renderSynchronously()
4041         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4042         s = remove_tags(html)
4043         self.failUnlessIn("Accepting new shares: No", s)
4044         self.failUnlessIn("Total disk space: ?", s)
4045         self.failUnlessIn("Space Available to Tahoe: ?", s)
4046         self.failUnlessEqual(ss.get_available_space(), 0)
4047
4048     @mock.patch('allmydata.util.fileutil.get_disk_stats')
4049     def test_status_right_disk_stats(self, mock_get_disk_stats):
4050         GB = 1000000000
4051         total            = 5*GB
4052         free_for_root    = 4*GB
4053         free_for_nonroot = 3*GB
4054         reserved_space   = 1*GB
4055         used = total - free_for_root
4056         avail = max(free_for_nonroot - reserved_space, 0)
4057         mock_get_disk_stats.return_value = {
4058             'total': total,
4059             'free_for_root': free_for_root,
4060             'free_for_nonroot': free_for_nonroot,
4061             'used': used,
4062             'avail': avail,
4063         }
4064
4065         basedir = "storage/WebStatus/status_right_disk_stats"
4066         fileutil.make_dirs(basedir)
4067         ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved_space)
4068         expecteddir = ss.sharedir
4069         ss.setServiceParent(self.s)
4070         w = StorageStatus(ss)
4071         html = w.renderSynchronously()
4072
4073         self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
4074                     mock_get_disk_stats.call_args_list)
4075
4076         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4077         s = remove_tags(html)
4078         self.failUnlessIn("Total disk space: 5.00 GB", s)
4079         self.failUnlessIn("Disk space used: - 1.00 GB", s)
4080         self.failUnlessIn("Disk space free (root): 4.00 GB", s)
4081         self.failUnlessIn("Disk space free (non-root): 3.00 GB", s)
4082         self.failUnlessIn("Reserved space: - 1.00 GB", s)
4083         self.failUnlessIn("Space Available to Tahoe: 2.00 GB", s)
4084         self.failUnlessEqual(ss.get_available_space(), 2*GB)
4085
4086     def test_readonly(self):
4087         basedir = "storage/WebStatus/readonly"
4088         fileutil.make_dirs(basedir)
4089         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
4090         ss.setServiceParent(self.s)
4091         w = StorageStatus(ss)
4092         html = w.renderSynchronously()
4093         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4094         s = remove_tags(html)
4095         self.failUnlessIn("Accepting new shares: No", s)
4096
4097     def test_reserved(self):
4098         basedir = "storage/WebStatus/reserved"
4099         fileutil.make_dirs(basedir)
4100         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4101         ss.setServiceParent(self.s)
4102         w = StorageStatus(ss)
4103         html = w.renderSynchronously()
4104         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4105         s = remove_tags(html)
4106         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4107
4108     def test_huge_reserved(self):
4109         basedir = "storage/WebStatus/reserved"
4110         fileutil.make_dirs(basedir)
4111         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4112         ss.setServiceParent(self.s)
4113         w = StorageStatus(ss)
4114         html = w.renderSynchronously()
4115         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4116         s = remove_tags(html)
4117         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4118
4119     def test_util(self):
4120         w = StorageStatus(None)
4121         self.failUnlessEqual(w.render_space(None, None), "?")
4122         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
4123         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
4124         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
4125         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
4126         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)