]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
storage: test that the storage server does *not* have a "remote_cancel_lease" function
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1 import time, os.path, platform, stat, re, simplejson, struct, shutil
2
3 import mock
4
5 from twisted.trial import unittest
6
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
10 import itertools
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22      ReadBucketProxy
23 from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
24                                      LayoutInvalid, MDMFSIGNABLEHEADER, \
25                                      SIGNED_PREFIX, MDMFHEADER, \
26                                      MDMFOFFSETS, SDMFSlotWriteProxy, \
27                                      PRIVATE_KEY_SIZE, \
28                                      SIGNATURE_SIZE, \
29                                      VERIFICATION_KEY_SIZE, \
30                                      SHARE_HASH_CHAIN_SIZE
31 from allmydata.interfaces import BadWriteEnablerError
32 from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
33 from allmydata.test.common_web import WebRenderingMixin
34 from allmydata.test.no_network import NoNetworkServer
35 from allmydata.web.storage import StorageStatus, remove_prefix
36
37 class Marker:
38     pass
39 class FakeCanary:
40     def __init__(self, ignore_disconnectors=False):
41         self.ignore = ignore_disconnectors
42         self.disconnectors = {}
43     def notifyOnDisconnect(self, f, *args, **kwargs):
44         if self.ignore:
45             return
46         m = Marker()
47         self.disconnectors[m] = (f, args, kwargs)
48         return m
49     def dontNotifyOnDisconnect(self, marker):
50         if self.ignore:
51             return
52         del self.disconnectors[marker]
53
54 class FakeStatsProvider:
55     def count(self, name, delta=1):
56         pass
57     def register_producer(self, producer):
58         pass
59
60 class Bucket(unittest.TestCase):
61     def make_workdir(self, name):
62         basedir = os.path.join("storage", "Bucket", name)
63         incoming = os.path.join(basedir, "tmp", "bucket")
64         final = os.path.join(basedir, "bucket")
65         fileutil.make_dirs(basedir)
66         fileutil.make_dirs(os.path.join(basedir, "tmp"))
67         return incoming, final
68
69     def bucket_writer_closed(self, bw, consumed):
70         pass
71     def add_latency(self, category, latency):
72         pass
73     def count(self, name, delta=1):
74         pass
75
76     def make_lease(self):
77         owner_num = 0
78         renew_secret = os.urandom(32)
79         cancel_secret = os.urandom(32)
80         expiration_time = time.time() + 5000
81         return LeaseInfo(owner_num, renew_secret, cancel_secret,
82                          expiration_time, "\x00" * 20)
83
84     def test_create(self):
85         incoming, final = self.make_workdir("test_create")
86         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
87                           FakeCanary())
88         bw.remote_write(0, "a"*25)
89         bw.remote_write(25, "b"*25)
90         bw.remote_write(50, "c"*25)
91         bw.remote_write(75, "d"*7)
92         bw.remote_close()
93
94     def test_readwrite(self):
95         incoming, final = self.make_workdir("test_readwrite")
96         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
97                           FakeCanary())
98         bw.remote_write(0, "a"*25)
99         bw.remote_write(25, "b"*25)
100         bw.remote_write(50, "c"*7) # last block may be short
101         bw.remote_close()
102
103         # now read from it
104         br = BucketReader(self, bw.finalhome)
105         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
106         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
107         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
108
109     def test_read_past_end_of_share_data(self):
110         # test vector for immutable files (hard-coded contents of an immutable share
111         # file):
112
113         # The following immutable share file content is identical to that
114         # generated with storage.immutable.ShareFile from Tahoe-LAFS v1.8.2
115         # with share data == 'a'. The total size of this content is 85
116         # bytes.
117
118         containerdata = struct.pack('>LLL', 1, 1, 1)
119
120         # A Tahoe-LAFS storage client would send as the share_data a
121         # complicated string involving hash trees and a URI Extension Block
122         # -- see allmydata/immutable/layout.py . This test, which is
123         # simulating a client, just sends 'a'.
124         share_data = 'a'
125
126         ownernumber = struct.pack('>L', 0)
127         renewsecret  = 'THIS LETS ME RENEW YOUR FILE....'
128         assert len(renewsecret) == 32
129         cancelsecret = 'THIS LETS ME KILL YOUR FILE HAHA'
130         assert len(cancelsecret) == 32
131         expirationtime = struct.pack('>L', 60*60*24*31) # 31 days in seconds
132
133         lease_data = ownernumber + renewsecret + cancelsecret + expirationtime
134
135         share_file_data = containerdata + share_data + lease_data
136
137         incoming, final = self.make_workdir("test_read_past_end_of_share_data")
138
139         fileutil.write(final, share_file_data)
140
141         mockstorageserver = mock.Mock()
142
143         # Now read from it.
144         br = BucketReader(mockstorageserver, final)
145
146         self.failUnlessEqual(br.remote_read(0, len(share_data)), share_data)
147
148         # Read past the end of share data to get the cancel secret.
149         read_length = len(share_data) + len(ownernumber) + len(renewsecret) + len(cancelsecret)
150
151         result_of_read = br.remote_read(0, read_length)
152         self.failUnlessEqual(result_of_read, share_data)
153
154         result_of_read = br.remote_read(0, len(share_data)+1)
155         self.failUnlessEqual(result_of_read, share_data)
156
157 class RemoteBucket:
158
159     def __init__(self):
160         self.read_count = 0
161         self.write_count = 0
162
163     def callRemote(self, methname, *args, **kwargs):
164         def _call():
165             meth = getattr(self.target, "remote_" + methname)
166             return meth(*args, **kwargs)
167
168         if methname == "slot_readv":
169             self.read_count += 1
170         if "writev" in methname:
171             self.write_count += 1
172
173         return defer.maybeDeferred(_call)
174
175
176 class BucketProxy(unittest.TestCase):
177     def make_bucket(self, name, size):
178         basedir = os.path.join("storage", "BucketProxy", name)
179         incoming = os.path.join(basedir, "tmp", "bucket")
180         final = os.path.join(basedir, "bucket")
181         fileutil.make_dirs(basedir)
182         fileutil.make_dirs(os.path.join(basedir, "tmp"))
183         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
184                           FakeCanary())
185         rb = RemoteBucket()
186         rb.target = bw
187         return bw, rb, final
188
189     def make_lease(self):
190         owner_num = 0
191         renew_secret = os.urandom(32)
192         cancel_secret = os.urandom(32)
193         expiration_time = time.time() + 5000
194         return LeaseInfo(owner_num, renew_secret, cancel_secret,
195                          expiration_time, "\x00" * 20)
196
197     def bucket_writer_closed(self, bw, consumed):
198         pass
199     def add_latency(self, category, latency):
200         pass
201     def count(self, name, delta=1):
202         pass
203
204     def test_create(self):
205         bw, rb, sharefname = self.make_bucket("test_create", 500)
206         bp = WriteBucketProxy(rb, None,
207                               data_size=300,
208                               block_size=10,
209                               num_segments=5,
210                               num_share_hashes=3,
211                               uri_extension_size_max=500)
212         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
213
214     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
215         # Let's pretend each share has 100 bytes of data, and that there are
216         # 4 segments (25 bytes each), and 8 shares total. So the two
217         # per-segment merkle trees (crypttext_hash_tree,
218         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
219         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
220         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
221         # long. That should make the whole share:
222         #
223         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
224         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
225
226         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
227
228         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
229                             for i in range(7)]
230         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
231                         for i in range(7)]
232         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
233                         for i in (1,9,13)]
234         uri_extension = "s" + "E"*498 + "e"
235
236         bw, rb, sharefname = self.make_bucket(name, sharesize)
237         bp = wbp_class(rb, None,
238                        data_size=95,
239                        block_size=25,
240                        num_segments=4,
241                        num_share_hashes=3,
242                        uri_extension_size_max=len(uri_extension))
243
244         d = bp.put_header()
245         d.addCallback(lambda res: bp.put_block(0, "a"*25))
246         d.addCallback(lambda res: bp.put_block(1, "b"*25))
247         d.addCallback(lambda res: bp.put_block(2, "c"*25))
248         d.addCallback(lambda res: bp.put_block(3, "d"*20))
249         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
250         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
251         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
252         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
253         d.addCallback(lambda res: bp.close())
254
255         # now read everything back
256         def _start_reading(res):
257             br = BucketReader(self, sharefname)
258             rb = RemoteBucket()
259             rb.target = br
260             server = NoNetworkServer("abc", None)
261             rbp = rbp_class(rb, server, storage_index="")
262             self.failUnlessIn("to peer", repr(rbp))
263             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
264
265             d1 = rbp.get_block_data(0, 25, 25)
266             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
267             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
268             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
269             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
270             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
271             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
272             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
273
274             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
275             d1.addCallback(lambda res:
276                            self.failUnlessEqual(res, crypttext_hashes))
277             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
278             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
279             d1.addCallback(lambda res: rbp.get_share_hashes())
280             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
281             d1.addCallback(lambda res: rbp.get_uri_extension())
282             d1.addCallback(lambda res:
283                            self.failUnlessEqual(res, uri_extension))
284
285             return d1
286
287         d.addCallback(_start_reading)
288
289         return d
290
291     def test_readwrite_v1(self):
292         return self._do_test_readwrite("test_readwrite_v1",
293                                        0x24, WriteBucketProxy, ReadBucketProxy)
294
295     def test_readwrite_v2(self):
296         return self._do_test_readwrite("test_readwrite_v2",
297                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
298
299 class Server(unittest.TestCase):
300
301     def setUp(self):
302         self.sparent = LoggingServiceParent()
303         self.sparent.startService()
304         self._lease_secret = itertools.count()
305     def tearDown(self):
306         return self.sparent.stopService()
307
308     def workdir(self, name):
309         basedir = os.path.join("storage", "Server", name)
310         return basedir
311
312     def create(self, name, reserved_space=0, klass=StorageServer):
313         workdir = self.workdir(name)
314         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
315                    stats_provider=FakeStatsProvider())
316         ss.setServiceParent(self.sparent)
317         return ss
318
319     def test_create(self):
320         self.create("test_create")
321
322     def test_declares_fixed_1528(self):
323         ss = self.create("test_declares_fixed_1528")
324         ver = ss.remote_get_version()
325         sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
326         self.failUnless(sv1.get('prevents-read-past-end-of-share-data'), sv1)
327
328     def allocate(self, ss, storage_index, sharenums, size, canary=None):
329         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
330         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
331         if not canary:
332             canary = FakeCanary()
333         return ss.remote_allocate_buckets(storage_index,
334                                           renew_secret, cancel_secret,
335                                           sharenums, size, canary)
336
337     def test_large_share(self):
338         syslow = platform.system().lower()
339         if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
340             raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
341
342         avail = fileutil.get_available_space('.', 512*2**20)
343         if avail <= 4*2**30:
344             raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
345
346         ss = self.create("test_large_share")
347
348         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
349         self.failUnlessEqual(already, set())
350         self.failUnlessEqual(set(writers.keys()), set([0]))
351
352         shnum, bucket = writers.items()[0]
353         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
354         bucket.remote_write(2**32, "ab")
355         bucket.remote_close()
356
357         readers = ss.remote_get_buckets("allocate")
358         reader = readers[shnum]
359         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
360
361     def test_dont_overfill_dirs(self):
362         """
363         This test asserts that if you add a second share whose storage index
364         share lots of leading bits with an extant share (but isn't the exact
365         same storage index), this won't add an entry to the share directory.
366         """
367         ss = self.create("test_dont_overfill_dirs")
368         already, writers = self.allocate(ss, "storageindex", [0], 10)
369         for i, wb in writers.items():
370             wb.remote_write(0, "%10d" % i)
371             wb.remote_close()
372         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
373                                 "shares")
374         children_of_storedir = set(os.listdir(storedir))
375
376         # Now store another one under another storageindex that has leading
377         # chars the same as the first storageindex.
378         already, writers = self.allocate(ss, "storageindey", [0], 10)
379         for i, wb in writers.items():
380             wb.remote_write(0, "%10d" % i)
381             wb.remote_close()
382         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
383                                 "shares")
384         new_children_of_storedir = set(os.listdir(storedir))
385         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
386
387     def test_remove_incoming(self):
388         ss = self.create("test_remove_incoming")
389         already, writers = self.allocate(ss, "vid", range(3), 10)
390         for i,wb in writers.items():
391             wb.remote_write(0, "%10d" % i)
392             wb.remote_close()
393         incoming_share_dir = wb.incominghome
394         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
395         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
396         incoming_dir = os.path.dirname(incoming_prefix_dir)
397         self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
398         self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
399         self.failUnless(os.path.exists(incoming_dir), incoming_dir)
400
401     def test_abort(self):
402         # remote_abort, when called on a writer, should make sure that
403         # the allocated size of the bucket is not counted by the storage
404         # server when accounting for space.
405         ss = self.create("test_abort")
406         already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
407         self.failIfEqual(ss.allocated_size(), 0)
408
409         # Now abort the writers.
410         for writer in writers.itervalues():
411             writer.remote_abort()
412         self.failUnlessEqual(ss.allocated_size(), 0)
413
414
415     def test_allocate(self):
416         ss = self.create("test_allocate")
417
418         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
419
420         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
421         self.failUnlessEqual(already, set())
422         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
423
424         # while the buckets are open, they should not count as readable
425         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
426
427         # close the buckets
428         for i,wb in writers.items():
429             wb.remote_write(0, "%25d" % i)
430             wb.remote_close()
431             # aborting a bucket that was already closed is a no-op
432             wb.remote_abort()
433
434         # now they should be readable
435         b = ss.remote_get_buckets("allocate")
436         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
437         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
438         b_str = str(b[0])
439         self.failUnlessIn("BucketReader", b_str)
440         self.failUnlessIn("mfwgy33dmf2g 0", b_str)
441
442         # now if we ask about writing again, the server should offer those
443         # three buckets as already present. It should offer them even if we
444         # don't ask about those specific ones.
445         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
446         self.failUnlessEqual(already, set([0,1,2]))
447         self.failUnlessEqual(set(writers.keys()), set([3,4]))
448
449         # while those two buckets are open for writing, the server should
450         # refuse to offer them to uploaders
451
452         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
453         self.failUnlessEqual(already2, set([0,1,2]))
454         self.failUnlessEqual(set(writers2.keys()), set([5]))
455
456         # aborting the writes should remove the tempfiles
457         for i,wb in writers2.items():
458             wb.remote_abort()
459         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
460         self.failUnlessEqual(already2, set([0,1,2]))
461         self.failUnlessEqual(set(writers2.keys()), set([5]))
462
463         for i,wb in writers2.items():
464             wb.remote_abort()
465         for i,wb in writers.items():
466             wb.remote_abort()
467
468     def test_bad_container_version(self):
469         ss = self.create("test_bad_container_version")
470         a,w = self.allocate(ss, "si1", [0], 10)
471         w[0].remote_write(0, "\xff"*10)
472         w[0].remote_close()
473
474         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
475         f = open(fn, "rb+")
476         f.seek(0)
477         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
478         f.close()
479
480         ss.remote_get_buckets("allocate")
481
482         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
483                                   ss.remote_get_buckets, "si1")
484         self.failUnlessIn(" had version 0 but we wanted 1", str(e))
485
486     def test_disconnect(self):
487         # simulate a disconnection
488         ss = self.create("test_disconnect")
489         canary = FakeCanary()
490         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
491         self.failUnlessEqual(already, set())
492         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
493         for (f,args,kwargs) in canary.disconnectors.values():
494             f(*args, **kwargs)
495         del already
496         del writers
497
498         # that ought to delete the incoming shares
499         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
500         self.failUnlessEqual(already, set())
501         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
502
503     @mock.patch('allmydata.util.fileutil.get_disk_stats')
504     def test_reserved_space(self, mock_get_disk_stats):
505         reserved_space=10000
506         mock_get_disk_stats.return_value = {
507             'free_for_nonroot': 15000,
508             'avail': max(15000 - reserved_space, 0),
509             }
510
511         ss = self.create("test_reserved_space", reserved_space=reserved_space)
512         # 15k available, 10k reserved, leaves 5k for shares
513
514         # a newly created and filled share incurs this much overhead, beyond
515         # the size we request.
516         OVERHEAD = 3*4
517         LEASE_SIZE = 4+32+32+4
518         canary = FakeCanary(True)
519         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
520         self.failUnlessEqual(len(writers), 3)
521         # now the StorageServer should have 3000 bytes provisionally
522         # allocated, allowing only 2000 more to be claimed
523         self.failUnlessEqual(len(ss._active_writers), 3)
524
525         # allocating 1001-byte shares only leaves room for one
526         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
527         self.failUnlessEqual(len(writers2), 1)
528         self.failUnlessEqual(len(ss._active_writers), 4)
529
530         # we abandon the first set, so their provisional allocation should be
531         # returned
532         del already
533         del writers
534         self.failUnlessEqual(len(ss._active_writers), 1)
535         # now we have a provisional allocation of 1001 bytes
536
537         # and we close the second set, so their provisional allocation should
538         # become real, long-term allocation, and grows to include the
539         # overhead.
540         for bw in writers2.values():
541             bw.remote_write(0, "a"*25)
542             bw.remote_close()
543         del already2
544         del writers2
545         del bw
546         self.failUnlessEqual(len(ss._active_writers), 0)
547
548         allocated = 1001 + OVERHEAD + LEASE_SIZE
549
550         # we have to manually increase available, since we're not doing real
551         # disk measurements
552         mock_get_disk_stats.return_value = {
553             'free_for_nonroot': 15000 - allocated,
554             'avail': max(15000 - allocated - reserved_space, 0),
555             }
556
557         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
558         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
559         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
560         self.failUnlessEqual(len(writers3), 39)
561         self.failUnlessEqual(len(ss._active_writers), 39)
562
563         del already3
564         del writers3
565         self.failUnlessEqual(len(ss._active_writers), 0)
566         ss.disownServiceParent()
567         del ss
568
569     def test_seek(self):
570         basedir = self.workdir("test_seek_behavior")
571         fileutil.make_dirs(basedir)
572         filename = os.path.join(basedir, "testfile")
573         f = open(filename, "wb")
574         f.write("start")
575         f.close()
576         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
577         # files. mode="a" preserves previous contents but does not allow
578         # seeking-to-create-holes. mode="r+" allows both.
579         f = open(filename, "rb+")
580         f.seek(100)
581         f.write("100")
582         f.close()
583         filelen = os.stat(filename)[stat.ST_SIZE]
584         self.failUnlessEqual(filelen, 100+3)
585         f2 = open(filename, "rb")
586         self.failUnlessEqual(f2.read(5), "start")
587
588
589     def test_leases(self):
590         ss = self.create("test_leases")
591         canary = FakeCanary()
592         sharenums = range(5)
593         size = 100
594
595         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
596                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
597         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
598                                                      sharenums, size, canary)
599         self.failUnlessEqual(len(already), 0)
600         self.failUnlessEqual(len(writers), 5)
601         for wb in writers.values():
602             wb.remote_close()
603
604         leases = list(ss.get_leases("si0"))
605         self.failUnlessEqual(len(leases), 1)
606         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
607
608         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
609                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
610         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
611                                                      sharenums, size, canary)
612         for wb in writers.values():
613             wb.remote_close()
614
615         # take out a second lease on si1
616         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
617                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
618         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
619                                                      sharenums, size, canary)
620         self.failUnlessEqual(len(already), 5)
621         self.failUnlessEqual(len(writers), 0)
622
623         leases = list(ss.get_leases("si1"))
624         self.failUnlessEqual(len(leases), 2)
625         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
626
627         # and a third lease, using add-lease
628         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
629                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
630         ss.remote_add_lease("si1", rs2a, cs2a)
631         leases = list(ss.get_leases("si1"))
632         self.failUnlessEqual(len(leases), 3)
633         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
634
635         # add-lease on a missing storage index is silently ignored
636         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
637
638         # check that si0 is readable
639         readers = ss.remote_get_buckets("si0")
640         self.failUnlessEqual(len(readers), 5)
641
642         # renew the first lease. Only the proper renew_secret should work
643         ss.remote_renew_lease("si0", rs0)
644         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
645         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
646
647         # check that si0 is still readable
648         readers = ss.remote_get_buckets("si0")
649         self.failUnlessEqual(len(readers), 5)
650
651         # There is no such method as remote_cancel_lease for now -- see
652         # ticket #1528.
653         self.failIf(hasattr(ss, 'remote_cancel_lease'), \
654                         "ss should not have a 'remote_cancel_lease' method/attribute")
655
656         # test overlapping uploads
657         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
658                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
659         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
660                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
661         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
662                                                      sharenums, size, canary)
663         self.failUnlessEqual(len(already), 0)
664         self.failUnlessEqual(len(writers), 5)
665         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
666                                                        sharenums, size, canary)
667         self.failUnlessEqual(len(already2), 0)
668         self.failUnlessEqual(len(writers2), 0)
669         for wb in writers.values():
670             wb.remote_close()
671
672         leases = list(ss.get_leases("si3"))
673         self.failUnlessEqual(len(leases), 1)
674
675         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
676                                                        sharenums, size, canary)
677         self.failUnlessEqual(len(already3), 5)
678         self.failUnlessEqual(len(writers3), 0)
679
680         leases = list(ss.get_leases("si3"))
681         self.failUnlessEqual(len(leases), 2)
682
683     def test_readonly(self):
684         workdir = self.workdir("test_readonly")
685         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
686         ss.setServiceParent(self.sparent)
687
688         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
689         self.failUnlessEqual(already, set())
690         self.failUnlessEqual(writers, {})
691
692         stats = ss.get_stats()
693         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
694         if "storage_server.disk_avail" in stats:
695             # Some platforms may not have an API to get disk stats.
696             # But if there are stats, readonly_storage means disk_avail=0
697             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
698
699     def test_discard(self):
700         # discard is really only used for other tests, but we test it anyways
701         workdir = self.workdir("test_discard")
702         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
703         ss.setServiceParent(self.sparent)
704
705         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
706         self.failUnlessEqual(already, set())
707         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
708         for i,wb in writers.items():
709             wb.remote_write(0, "%25d" % i)
710             wb.remote_close()
711         # since we discard the data, the shares should be present but sparse.
712         # Since we write with some seeks, the data we read back will be all
713         # zeros.
714         b = ss.remote_get_buckets("vid")
715         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
716         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
717
718     def test_advise_corruption(self):
719         workdir = self.workdir("test_advise_corruption")
720         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
721         ss.setServiceParent(self.sparent)
722
723         si0_s = base32.b2a("si0")
724         ss.remote_advise_corrupt_share("immutable", "si0", 0,
725                                        "This share smells funny.\n")
726         reportdir = os.path.join(workdir, "corruption-advisories")
727         reports = os.listdir(reportdir)
728         self.failUnlessEqual(len(reports), 1)
729         report_si0 = reports[0]
730         self.failUnlessIn(si0_s, report_si0)
731         f = open(os.path.join(reportdir, report_si0), "r")
732         report = f.read()
733         f.close()
734         self.failUnlessIn("type: immutable", report)
735         self.failUnlessIn("storage_index: %s" % si0_s, report)
736         self.failUnlessIn("share_number: 0", report)
737         self.failUnlessIn("This share smells funny.", report)
738
739         # test the RIBucketWriter version too
740         si1_s = base32.b2a("si1")
741         already,writers = self.allocate(ss, "si1", [1], 75)
742         self.failUnlessEqual(already, set())
743         self.failUnlessEqual(set(writers.keys()), set([1]))
744         writers[1].remote_write(0, "data")
745         writers[1].remote_close()
746
747         b = ss.remote_get_buckets("si1")
748         self.failUnlessEqual(set(b.keys()), set([1]))
749         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
750
751         reports = os.listdir(reportdir)
752         self.failUnlessEqual(len(reports), 2)
753         report_si1 = [r for r in reports if si1_s in r][0]
754         f = open(os.path.join(reportdir, report_si1), "r")
755         report = f.read()
756         f.close()
757         self.failUnlessIn("type: immutable", report)
758         self.failUnlessIn("storage_index: %s" % si1_s, report)
759         self.failUnlessIn("share_number: 1", report)
760         self.failUnlessIn("This share tastes like dust.", report)
761
762
763
764 class MutableServer(unittest.TestCase):
765
766     def setUp(self):
767         self.sparent = LoggingServiceParent()
768         self._lease_secret = itertools.count()
769     def tearDown(self):
770         return self.sparent.stopService()
771
772     def workdir(self, name):
773         basedir = os.path.join("storage", "MutableServer", name)
774         return basedir
775
776     def create(self, name):
777         workdir = self.workdir(name)
778         ss = StorageServer(workdir, "\x00" * 20)
779         ss.setServiceParent(self.sparent)
780         return ss
781
782     def test_create(self):
783         self.create("test_create")
784
785     def write_enabler(self, we_tag):
786         return hashutil.tagged_hash("we_blah", we_tag)
787
788     def renew_secret(self, tag):
789         return hashutil.tagged_hash("renew_blah", str(tag))
790
791     def cancel_secret(self, tag):
792         return hashutil.tagged_hash("cancel_blah", str(tag))
793
794     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
795         write_enabler = self.write_enabler(we_tag)
796         renew_secret = self.renew_secret(lease_tag)
797         cancel_secret = self.cancel_secret(lease_tag)
798         rstaraw = ss.remote_slot_testv_and_readv_and_writev
799         testandwritev = dict( [ (shnum, ([], [], None) )
800                          for shnum in sharenums ] )
801         readv = []
802         rc = rstaraw(storage_index,
803                      (write_enabler, renew_secret, cancel_secret),
804                      testandwritev,
805                      readv)
806         (did_write, readv_data) = rc
807         self.failUnless(did_write)
808         self.failUnless(isinstance(readv_data, dict))
809         self.failUnlessEqual(len(readv_data), 0)
810
811     def test_bad_magic(self):
812         ss = self.create("test_bad_magic")
813         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
814         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
815         f = open(fn, "rb+")
816         f.seek(0)
817         f.write("BAD MAGIC")
818         f.close()
819         read = ss.remote_slot_readv
820         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
821                                   read, "si1", [0], [(0,10)])
822         self.failUnlessIn(" had magic ", str(e))
823         self.failUnlessIn(" but we wanted ", str(e))
824
825     def test_container_size(self):
826         ss = self.create("test_container_size")
827         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
828                       set([0,1,2]), 100)
829         read = ss.remote_slot_readv
830         rstaraw = ss.remote_slot_testv_and_readv_and_writev
831         secrets = ( self.write_enabler("we1"),
832                     self.renew_secret("we1"),
833                     self.cancel_secret("we1") )
834         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
835         answer = rstaraw("si1", secrets,
836                          {0: ([], [(0,data)], len(data)+12)},
837                          [])
838         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
839
840         # trying to make the container too large will raise an exception
841         TOOBIG = MutableShareFile.MAX_SIZE + 10
842         self.failUnlessRaises(DataTooLargeError,
843                               rstaraw, "si1", secrets,
844                               {0: ([], [(0,data)], TOOBIG)},
845                               [])
846
847         # it should be possible to make the container smaller, although at
848         # the moment this doesn't actually affect the share, unless the
849         # container size is dropped to zero, in which case the share is
850         # deleted.
851         answer = rstaraw("si1", secrets,
852                          {0: ([], [(0,data)], len(data)+8)},
853                          [])
854         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
855
856         answer = rstaraw("si1", secrets,
857                          {0: ([], [(0,data)], 0)},
858                          [])
859         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
860
861         read_answer = read("si1", [0], [(0,10)])
862         self.failUnlessEqual(read_answer, {})
863
864     def test_allocate(self):
865         ss = self.create("test_allocate")
866         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
867                       set([0,1,2]), 100)
868
869         read = ss.remote_slot_readv
870         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
871                              {0: [""]})
872         self.failUnlessEqual(read("si1", [], [(0, 10)]),
873                              {0: [""], 1: [""], 2: [""]})
874         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
875                              {0: [""]})
876
877         # try writing to one
878         secrets = ( self.write_enabler("we1"),
879                     self.renew_secret("we1"),
880                     self.cancel_secret("we1") )
881         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
882         write = ss.remote_slot_testv_and_readv_and_writev
883         answer = write("si1", secrets,
884                        {0: ([], [(0,data)], None)},
885                        [])
886         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
887
888         self.failUnlessEqual(read("si1", [0], [(0,20)]),
889                              {0: ["00000000001111111111"]})
890         self.failUnlessEqual(read("si1", [0], [(95,10)]),
891                              {0: ["99999"]})
892         #self.failUnlessEqual(s0.remote_get_length(), 100)
893
894         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
895         f = self.failUnlessRaises(BadWriteEnablerError,
896                                   write, "si1", bad_secrets,
897                                   {}, [])
898         self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
899
900         # this testv should fail
901         answer = write("si1", secrets,
902                        {0: ([(0, 12, "eq", "444444444444"),
903                              (20, 5, "eq", "22222"),
904                              ],
905                             [(0, "x"*100)],
906                             None),
907                         },
908                        [(0,12), (20,5)],
909                        )
910         self.failUnlessEqual(answer, (False,
911                                       {0: ["000000000011", "22222"],
912                                        1: ["", ""],
913                                        2: ["", ""],
914                                        }))
915         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
916
917         # as should this one
918         answer = write("si1", secrets,
919                        {0: ([(10, 5, "lt", "11111"),
920                              ],
921                             [(0, "x"*100)],
922                             None),
923                         },
924                        [(10,5)],
925                        )
926         self.failUnlessEqual(answer, (False,
927                                       {0: ["11111"],
928                                        1: [""],
929                                        2: [""]},
930                                       ))
931         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
932
933
934     def test_operators(self):
935         # test operators, the data we're comparing is '11111' in all cases.
936         # test both fail+pass, reset data after each one.
937         ss = self.create("test_operators")
938
939         secrets = ( self.write_enabler("we1"),
940                     self.renew_secret("we1"),
941                     self.cancel_secret("we1") )
942         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
943         write = ss.remote_slot_testv_and_readv_and_writev
944         read = ss.remote_slot_readv
945
946         def reset():
947             write("si1", secrets,
948                   {0: ([], [(0,data)], None)},
949                   [])
950
951         reset()
952
953         #  lt
954         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
955                                              ],
956                                             [(0, "x"*100)],
957                                             None,
958                                             )}, [(10,5)])
959         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
960         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
961         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
962         reset()
963
964         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
965                                              ],
966                                             [(0, "x"*100)],
967                                             None,
968                                             )}, [(10,5)])
969         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
970         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
971         reset()
972
973         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
974                                              ],
975                                             [(0, "y"*100)],
976                                             None,
977                                             )}, [(10,5)])
978         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
979         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
980         reset()
981
982         #  le
983         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
984                                              ],
985                                             [(0, "x"*100)],
986                                             None,
987                                             )}, [(10,5)])
988         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
989         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
990         reset()
991
992         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
993                                              ],
994                                             [(0, "y"*100)],
995                                             None,
996                                             )}, [(10,5)])
997         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
998         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
999         reset()
1000
1001         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
1002                                              ],
1003                                             [(0, "y"*100)],
1004                                             None,
1005                                             )}, [(10,5)])
1006         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1007         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1008         reset()
1009
1010         #  eq
1011         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
1012                                              ],
1013                                             [(0, "x"*100)],
1014                                             None,
1015                                             )}, [(10,5)])
1016         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1017         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1018         reset()
1019
1020         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
1021                                              ],
1022                                             [(0, "y"*100)],
1023                                             None,
1024                                             )}, [(10,5)])
1025         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1026         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1027         reset()
1028
1029         #  ne
1030         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
1031                                              ],
1032                                             [(0, "x"*100)],
1033                                             None,
1034                                             )}, [(10,5)])
1035         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1036         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1037         reset()
1038
1039         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1040                                              ],
1041                                             [(0, "y"*100)],
1042                                             None,
1043                                             )}, [(10,5)])
1044         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1045         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1046         reset()
1047
1048         #  ge
1049         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1050                                              ],
1051                                             [(0, "y"*100)],
1052                                             None,
1053                                             )}, [(10,5)])
1054         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1055         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1056         reset()
1057
1058         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1059                                              ],
1060                                             [(0, "y"*100)],
1061                                             None,
1062                                             )}, [(10,5)])
1063         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1064         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1065         reset()
1066
1067         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1068                                              ],
1069                                             [(0, "y"*100)],
1070                                             None,
1071                                             )}, [(10,5)])
1072         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1073         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1074         reset()
1075
1076         #  gt
1077         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1078                                              ],
1079                                             [(0, "y"*100)],
1080                                             None,
1081                                             )}, [(10,5)])
1082         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1083         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1084         reset()
1085
1086         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1087                                              ],
1088                                             [(0, "x"*100)],
1089                                             None,
1090                                             )}, [(10,5)])
1091         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1092         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1093         reset()
1094
1095         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1096                                              ],
1097                                             [(0, "x"*100)],
1098                                             None,
1099                                             )}, [(10,5)])
1100         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1101         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1102         reset()
1103
1104         # finally, test some operators against empty shares
1105         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1106                                              ],
1107                                             [(0, "x"*100)],
1108                                             None,
1109                                             )}, [(10,5)])
1110         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1111         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1112         reset()
1113
1114     def test_readv(self):
1115         ss = self.create("test_readv")
1116         secrets = ( self.write_enabler("we1"),
1117                     self.renew_secret("we1"),
1118                     self.cancel_secret("we1") )
1119         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1120         write = ss.remote_slot_testv_and_readv_and_writev
1121         read = ss.remote_slot_readv
1122         data = [("%d" % i) * 100 for i in range(3)]
1123         rc = write("si1", secrets,
1124                    {0: ([], [(0,data[0])], None),
1125                     1: ([], [(0,data[1])], None),
1126                     2: ([], [(0,data[2])], None),
1127                     }, [])
1128         self.failUnlessEqual(rc, (True, {}))
1129
1130         answer = read("si1", [], [(0, 10)])
1131         self.failUnlessEqual(answer, {0: ["0"*10],
1132                                       1: ["1"*10],
1133                                       2: ["2"*10]})
1134
1135     def compare_leases_without_timestamps(self, leases_a, leases_b):
1136         self.failUnlessEqual(len(leases_a), len(leases_b))
1137         for i in range(len(leases_a)):
1138             a = leases_a[i]
1139             b = leases_b[i]
1140             self.failUnlessEqual(a.owner_num,       b.owner_num)
1141             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1142             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1143             self.failUnlessEqual(a.nodeid,          b.nodeid)
1144
1145     def compare_leases(self, leases_a, leases_b):
1146         self.failUnlessEqual(len(leases_a), len(leases_b))
1147         for i in range(len(leases_a)):
1148             a = leases_a[i]
1149             b = leases_b[i]
1150             self.failUnlessEqual(a.owner_num,       b.owner_num)
1151             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1152             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1153             self.failUnlessEqual(a.nodeid,          b.nodeid)
1154             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1155
1156     def test_leases(self):
1157         ss = self.create("test_leases")
1158         def secrets(n):
1159             return ( self.write_enabler("we1"),
1160                      self.renew_secret("we1-%d" % n),
1161                      self.cancel_secret("we1-%d" % n) )
1162         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1163         write = ss.remote_slot_testv_and_readv_and_writev
1164         read = ss.remote_slot_readv
1165         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1166         self.failUnlessEqual(rc, (True, {}))
1167
1168         # create a random non-numeric file in the bucket directory, to
1169         # exercise the code that's supposed to ignore those.
1170         bucket_dir = os.path.join(self.workdir("test_leases"),
1171                                   "shares", storage_index_to_dir("si1"))
1172         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1173         f.write("you ought to be ignoring me\n")
1174         f.close()
1175
1176         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1177         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1178
1179         # add-lease on a missing storage index is silently ignored
1180         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1181
1182         # re-allocate the slots and use the same secrets, that should update
1183         # the lease
1184         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1185         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1186
1187         # renew it directly
1188         ss.remote_renew_lease("si1", secrets(0)[1])
1189         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1190
1191         # now allocate them with a bunch of different secrets, to trigger the
1192         # extended lease code. Use add_lease for one of them.
1193         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1194         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1195         secrets2 = secrets(2)
1196         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1197         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1198         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1199         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1200         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1201
1202         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1203
1204         all_leases = list(s0.get_leases())
1205         # and write enough data to expand the container, forcing the server
1206         # to move the leases
1207         write("si1", secrets(0),
1208               {0: ([], [(0,data)], 200), },
1209               [])
1210
1211         # read back the leases, make sure they're still intact.
1212         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1213
1214         ss.remote_renew_lease("si1", secrets(0)[1])
1215         ss.remote_renew_lease("si1", secrets(1)[1])
1216         ss.remote_renew_lease("si1", secrets(2)[1])
1217         ss.remote_renew_lease("si1", secrets(3)[1])
1218         ss.remote_renew_lease("si1", secrets(4)[1])
1219         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1220         # get a new copy of the leases, with the current timestamps. Reading
1221         # data and failing to renew/cancel leases should leave the timestamps
1222         # alone.
1223         all_leases = list(s0.get_leases())
1224         # renewing with a bogus token should prompt an error message
1225
1226         # examine the exception thus raised, make sure the old nodeid is
1227         # present, to provide for share migration
1228         e = self.failUnlessRaises(IndexError,
1229                                   ss.remote_renew_lease, "si1",
1230                                   secrets(20)[1])
1231         e_s = str(e)
1232         self.failUnlessIn("Unable to renew non-existent lease", e_s)
1233         self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1234         self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1235
1236         self.compare_leases(all_leases, list(s0.get_leases()))
1237
1238         # reading shares should not modify the timestamp
1239         read("si1", [], [(0,200)])
1240         self.compare_leases(all_leases, list(s0.get_leases()))
1241
1242         write("si1", secrets(0),
1243               {0: ([], [(200, "make me bigger")], None)}, [])
1244         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1245
1246         write("si1", secrets(0),
1247               {0: ([], [(500, "make me really bigger")], None)}, [])
1248         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1249
1250     def test_remove(self):
1251         ss = self.create("test_remove")
1252         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1253                       set([0,1,2]), 100)
1254         readv = ss.remote_slot_readv
1255         writev = ss.remote_slot_testv_and_readv_and_writev
1256         secrets = ( self.write_enabler("we1"),
1257                     self.renew_secret("we1"),
1258                     self.cancel_secret("we1") )
1259         # delete sh0 by setting its size to zero
1260         answer = writev("si1", secrets,
1261                         {0: ([], [], 0)},
1262                         [])
1263         # the answer should mention all the shares that existed before the
1264         # write
1265         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1266         # but a new read should show only sh1 and sh2
1267         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1268                              {1: [""], 2: [""]})
1269
1270         # delete sh1 by setting its size to zero
1271         answer = writev("si1", secrets,
1272                         {1: ([], [], 0)},
1273                         [])
1274         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1275         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1276                              {2: [""]})
1277
1278         # delete sh2 by setting its size to zero
1279         answer = writev("si1", secrets,
1280                         {2: ([], [], 0)},
1281                         [])
1282         self.failUnlessEqual(answer, (True, {2:[]}) )
1283         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1284                              {})
1285         # and the bucket directory should now be gone
1286         si = base32.b2a("si1")
1287         # note: this is a detail of the storage server implementation, and
1288         # may change in the future
1289         prefix = si[:2]
1290         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1291         bucketdir = os.path.join(prefixdir, si)
1292         self.failUnless(os.path.exists(prefixdir), prefixdir)
1293         self.failIf(os.path.exists(bucketdir), bucketdir)
1294
1295
1296 class MDMFProxies(unittest.TestCase, ShouldFailMixin):
1297     def setUp(self):
1298         self.sparent = LoggingServiceParent()
1299         self._lease_secret = itertools.count()
1300         self.ss = self.create("MDMFProxies storage test server")
1301         self.rref = RemoteBucket()
1302         self.rref.target = self.ss
1303         self.secrets = (self.write_enabler("we_secret"),
1304                         self.renew_secret("renew_secret"),
1305                         self.cancel_secret("cancel_secret"))
1306         self.segment = "aaaaaa"
1307         self.block = "aa"
1308         self.salt = "a" * 16
1309         self.block_hash = "a" * 32
1310         self.block_hash_tree = [self.block_hash for i in xrange(6)]
1311         self.share_hash = self.block_hash
1312         self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
1313         self.signature = "foobarbaz"
1314         self.verification_key = "vvvvvv"
1315         self.encprivkey = "private"
1316         self.root_hash = self.block_hash
1317         self.salt_hash = self.root_hash
1318         self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
1319         self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
1320         self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
1321         # blockhashes and salt hashes are serialized in the same way,
1322         # only we lop off the first element and store that in the
1323         # header.
1324         self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
1325
1326
1327     def tearDown(self):
1328         self.sparent.stopService()
1329         shutil.rmtree(self.workdir("MDMFProxies storage test server"))
1330
1331
1332     def write_enabler(self, we_tag):
1333         return hashutil.tagged_hash("we_blah", we_tag)
1334
1335
1336     def renew_secret(self, tag):
1337         return hashutil.tagged_hash("renew_blah", str(tag))
1338
1339
1340     def cancel_secret(self, tag):
1341         return hashutil.tagged_hash("cancel_blah", str(tag))
1342
1343
1344     def workdir(self, name):
1345         basedir = os.path.join("storage", "MutableServer", name)
1346         return basedir
1347
1348
1349     def create(self, name):
1350         workdir = self.workdir(name)
1351         ss = StorageServer(workdir, "\x00" * 20)
1352         ss.setServiceParent(self.sparent)
1353         return ss
1354
1355
1356     def build_test_mdmf_share(self, tail_segment=False, empty=False):
1357         # Start with the checkstring
1358         data = struct.pack(">BQ32s",
1359                            1,
1360                            0,
1361                            self.root_hash)
1362         self.checkstring = data
1363         # Next, the encoding parameters
1364         if tail_segment:
1365             data += struct.pack(">BBQQ",
1366                                 3,
1367                                 10,
1368                                 6,
1369                                 33)
1370         elif empty:
1371             data += struct.pack(">BBQQ",
1372                                 3,
1373                                 10,
1374                                 0,
1375                                 0)
1376         else:
1377             data += struct.pack(">BBQQ",
1378                                 3,
1379                                 10,
1380                                 6,
1381                                 36)
1382         # Now we'll build the offsets.
1383         sharedata = ""
1384         if not tail_segment and not empty:
1385             for i in xrange(6):
1386                 sharedata += self.salt + self.block
1387         elif tail_segment:
1388             for i in xrange(5):
1389                 sharedata += self.salt + self.block
1390             sharedata += self.salt + "a"
1391
1392         # The encrypted private key comes after the shares + salts
1393         offset_size = struct.calcsize(MDMFOFFSETS)
1394         encrypted_private_key_offset = len(data) + offset_size
1395         # The share has chain comes after the private key
1396         sharehashes_offset = encrypted_private_key_offset + \
1397             len(self.encprivkey)
1398
1399         # The signature comes after the share hash chain.
1400         signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
1401
1402         verification_key_offset = signature_offset + len(self.signature)
1403         verification_key_end = verification_key_offset + \
1404             len(self.verification_key)
1405
1406         share_data_offset = offset_size
1407         share_data_offset += PRIVATE_KEY_SIZE
1408         share_data_offset += SIGNATURE_SIZE
1409         share_data_offset += VERIFICATION_KEY_SIZE
1410         share_data_offset += SHARE_HASH_CHAIN_SIZE
1411
1412         blockhashes_offset = share_data_offset + len(sharedata)
1413         eof_offset = blockhashes_offset + len(self.block_hash_tree_s)
1414
1415         data += struct.pack(MDMFOFFSETS,
1416                             encrypted_private_key_offset,
1417                             sharehashes_offset,
1418                             signature_offset,
1419                             verification_key_offset,
1420                             verification_key_end,
1421                             share_data_offset,
1422                             blockhashes_offset,
1423                             eof_offset)
1424
1425         self.offsets = {}
1426         self.offsets['enc_privkey'] = encrypted_private_key_offset
1427         self.offsets['block_hash_tree'] = blockhashes_offset
1428         self.offsets['share_hash_chain'] = sharehashes_offset
1429         self.offsets['signature'] = signature_offset
1430         self.offsets['verification_key'] = verification_key_offset
1431         self.offsets['share_data'] = share_data_offset
1432         self.offsets['verification_key_end'] = verification_key_end
1433         self.offsets['EOF'] = eof_offset
1434
1435         # the private key,
1436         data += self.encprivkey
1437         # the sharehashes
1438         data += self.share_hash_chain_s
1439         # the signature,
1440         data += self.signature
1441         # and the verification key
1442         data += self.verification_key
1443         # Then we'll add in gibberish until we get to the right point.
1444         nulls = "".join([" " for i in xrange(len(data), share_data_offset)])
1445         data += nulls
1446
1447         # Then the share data
1448         data += sharedata
1449         # the blockhashes
1450         data += self.block_hash_tree_s
1451         return data
1452
1453
1454     def write_test_share_to_server(self,
1455                                    storage_index,
1456                                    tail_segment=False,
1457                                    empty=False):
1458         """
1459         I write some data for the read tests to read to self.ss
1460
1461         If tail_segment=True, then I will write a share that has a
1462         smaller tail segment than other segments.
1463         """
1464         write = self.ss.remote_slot_testv_and_readv_and_writev
1465         data = self.build_test_mdmf_share(tail_segment, empty)
1466         # Finally, we write the whole thing to the storage server in one
1467         # pass.
1468         testvs = [(0, 1, "eq", "")]
1469         tws = {}
1470         tws[0] = (testvs, [(0, data)], None)
1471         readv = [(0, 1)]
1472         results = write(storage_index, self.secrets, tws, readv)
1473         self.failUnless(results[0])
1474
1475
1476     def build_test_sdmf_share(self, empty=False):
1477         if empty:
1478             sharedata = ""
1479         else:
1480             sharedata = self.segment * 6
1481         self.sharedata = sharedata
1482         blocksize = len(sharedata) / 3
1483         block = sharedata[:blocksize]
1484         self.blockdata = block
1485         prefix = struct.pack(">BQ32s16s BBQQ",
1486                              0, # version,
1487                              0,
1488                              self.root_hash,
1489                              self.salt,
1490                              3,
1491                              10,
1492                              len(sharedata),
1493                              len(sharedata),
1494                             )
1495         post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
1496         signature_offset = post_offset + len(self.verification_key)
1497         sharehashes_offset = signature_offset + len(self.signature)
1498         blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
1499         sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
1500         encprivkey_offset = sharedata_offset + len(block)
1501         eof_offset = encprivkey_offset + len(self.encprivkey)
1502         offsets = struct.pack(">LLLLQQ",
1503                               signature_offset,
1504                               sharehashes_offset,
1505                               blockhashes_offset,
1506                               sharedata_offset,
1507                               encprivkey_offset,
1508                               eof_offset)
1509         final_share = "".join([prefix,
1510                            offsets,
1511                            self.verification_key,
1512                            self.signature,
1513                            self.share_hash_chain_s,
1514                            self.block_hash_tree_s,
1515                            block,
1516                            self.encprivkey])
1517         self.offsets = {}
1518         self.offsets['signature'] = signature_offset
1519         self.offsets['share_hash_chain'] = sharehashes_offset
1520         self.offsets['block_hash_tree'] = blockhashes_offset
1521         self.offsets['share_data'] = sharedata_offset
1522         self.offsets['enc_privkey'] = encprivkey_offset
1523         self.offsets['EOF'] = eof_offset
1524         return final_share
1525
1526
1527     def write_sdmf_share_to_server(self,
1528                                    storage_index,
1529                                    empty=False):
1530         # Some tests need SDMF shares to verify that we can still 
1531         # read them. This method writes one, which resembles but is not
1532         assert self.rref
1533         write = self.ss.remote_slot_testv_and_readv_and_writev
1534         share = self.build_test_sdmf_share(empty)
1535         testvs = [(0, 1, "eq", "")]
1536         tws = {}
1537         tws[0] = (testvs, [(0, share)], None)
1538         readv = []
1539         results = write(storage_index, self.secrets, tws, readv)
1540         self.failUnless(results[0])
1541
1542
1543     def test_read(self):
1544         self.write_test_share_to_server("si1")
1545         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1546         # Check that every method equals what we expect it to.
1547         d = defer.succeed(None)
1548         def _check_block_and_salt((block, salt)):
1549             self.failUnlessEqual(block, self.block)
1550             self.failUnlessEqual(salt, self.salt)
1551
1552         for i in xrange(6):
1553             d.addCallback(lambda ignored, i=i:
1554                 mr.get_block_and_salt(i))
1555             d.addCallback(_check_block_and_salt)
1556
1557         d.addCallback(lambda ignored:
1558             mr.get_encprivkey())
1559         d.addCallback(lambda encprivkey:
1560             self.failUnlessEqual(self.encprivkey, encprivkey))
1561
1562         d.addCallback(lambda ignored:
1563             mr.get_blockhashes())
1564         d.addCallback(lambda blockhashes:
1565             self.failUnlessEqual(self.block_hash_tree, blockhashes))
1566
1567         d.addCallback(lambda ignored:
1568             mr.get_sharehashes())
1569         d.addCallback(lambda sharehashes:
1570             self.failUnlessEqual(self.share_hash_chain, sharehashes))
1571
1572         d.addCallback(lambda ignored:
1573             mr.get_signature())
1574         d.addCallback(lambda signature:
1575             self.failUnlessEqual(signature, self.signature))
1576
1577         d.addCallback(lambda ignored:
1578             mr.get_verification_key())
1579         d.addCallback(lambda verification_key:
1580             self.failUnlessEqual(verification_key, self.verification_key))
1581
1582         d.addCallback(lambda ignored:
1583             mr.get_seqnum())
1584         d.addCallback(lambda seqnum:
1585             self.failUnlessEqual(seqnum, 0))
1586
1587         d.addCallback(lambda ignored:
1588             mr.get_root_hash())
1589         d.addCallback(lambda root_hash:
1590             self.failUnlessEqual(self.root_hash, root_hash))
1591
1592         d.addCallback(lambda ignored:
1593             mr.get_seqnum())
1594         d.addCallback(lambda seqnum:
1595             self.failUnlessEqual(0, seqnum))
1596
1597         d.addCallback(lambda ignored:
1598             mr.get_encoding_parameters())
1599         def _check_encoding_parameters((k, n, segsize, datalen)):
1600             self.failUnlessEqual(k, 3)
1601             self.failUnlessEqual(n, 10)
1602             self.failUnlessEqual(segsize, 6)
1603             self.failUnlessEqual(datalen, 36)
1604         d.addCallback(_check_encoding_parameters)
1605
1606         d.addCallback(lambda ignored:
1607             mr.get_checkstring())
1608         d.addCallback(lambda checkstring:
1609             self.failUnlessEqual(checkstring, checkstring))
1610         return d
1611
1612
1613     def test_read_with_different_tail_segment_size(self):
1614         self.write_test_share_to_server("si1", tail_segment=True)
1615         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1616         d = mr.get_block_and_salt(5)
1617         def _check_tail_segment(results):
1618             block, salt = results
1619             self.failUnlessEqual(len(block), 1)
1620             self.failUnlessEqual(block, "a")
1621         d.addCallback(_check_tail_segment)
1622         return d
1623
1624
1625     def test_get_block_with_invalid_segnum(self):
1626         self.write_test_share_to_server("si1")
1627         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1628         d = defer.succeed(None)
1629         d.addCallback(lambda ignored:
1630             self.shouldFail(LayoutInvalid, "test invalid segnum",
1631                             None,
1632                             mr.get_block_and_salt, 7))
1633         return d
1634
1635
1636     def test_get_encoding_parameters_first(self):
1637         self.write_test_share_to_server("si1")
1638         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1639         d = mr.get_encoding_parameters()
1640         def _check_encoding_parameters((k, n, segment_size, datalen)):
1641             self.failUnlessEqual(k, 3)
1642             self.failUnlessEqual(n, 10)
1643             self.failUnlessEqual(segment_size, 6)
1644             self.failUnlessEqual(datalen, 36)
1645         d.addCallback(_check_encoding_parameters)
1646         return d
1647
1648
1649     def test_get_seqnum_first(self):
1650         self.write_test_share_to_server("si1")
1651         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1652         d = mr.get_seqnum()
1653         d.addCallback(lambda seqnum:
1654             self.failUnlessEqual(seqnum, 0))
1655         return d
1656
1657
1658     def test_get_root_hash_first(self):
1659         self.write_test_share_to_server("si1")
1660         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1661         d = mr.get_root_hash()
1662         d.addCallback(lambda root_hash:
1663             self.failUnlessEqual(root_hash, self.root_hash))
1664         return d
1665
1666
1667     def test_get_checkstring_first(self):
1668         self.write_test_share_to_server("si1")
1669         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1670         d = mr.get_checkstring()
1671         d.addCallback(lambda checkstring:
1672             self.failUnlessEqual(checkstring, self.checkstring))
1673         return d
1674
1675
1676     def test_write_read_vectors(self):
1677         # When writing for us, the storage server will return to us a
1678         # read vector, along with its result. If a write fails because
1679         # the test vectors failed, this read vector can help us to
1680         # diagnose the problem. This test ensures that the read vector
1681         # is working appropriately.
1682         mw = self._make_new_mw("si1", 0)
1683
1684         for i in xrange(6):
1685             mw.put_block(self.block, i, self.salt)
1686         mw.put_encprivkey(self.encprivkey)
1687         mw.put_blockhashes(self.block_hash_tree)
1688         mw.put_sharehashes(self.share_hash_chain)
1689         mw.put_root_hash(self.root_hash)
1690         mw.put_signature(self.signature)
1691         mw.put_verification_key(self.verification_key)
1692         d = mw.finish_publishing()
1693         def _then(results):
1694             self.failUnless(len(results), 2)
1695             result, readv = results
1696             self.failUnless(result)
1697             self.failIf(readv)
1698             self.old_checkstring = mw.get_checkstring()
1699             mw.set_checkstring("")
1700         d.addCallback(_then)
1701         d.addCallback(lambda ignored:
1702             mw.finish_publishing())
1703         def _then_again(results):
1704             self.failUnlessEqual(len(results), 2)
1705             result, readvs = results
1706             self.failIf(result)
1707             self.failUnlessIn(0, readvs)
1708             readv = readvs[0][0]
1709             self.failUnlessEqual(readv, self.old_checkstring)
1710         d.addCallback(_then_again)
1711         # The checkstring remains the same for the rest of the process.
1712         return d
1713
1714
1715     def test_private_key_after_share_hash_chain(self):
1716         mw = self._make_new_mw("si1", 0)
1717         d = defer.succeed(None)
1718         for i in xrange(6):
1719             d.addCallback(lambda ignored, i=i:
1720                 mw.put_block(self.block, i, self.salt))
1721         d.addCallback(lambda ignored:
1722             mw.put_encprivkey(self.encprivkey))
1723         d.addCallback(lambda ignored:
1724             mw.put_sharehashes(self.share_hash_chain))
1725
1726         # Now try to put the private key again.
1727         d.addCallback(lambda ignored:
1728             self.shouldFail(LayoutInvalid, "test repeat private key",
1729                             None,
1730                             mw.put_encprivkey, self.encprivkey))
1731         return d
1732
1733
1734     def test_signature_after_verification_key(self):
1735         mw = self._make_new_mw("si1", 0)
1736         d = defer.succeed(None)
1737         # Put everything up to and including the verification key.
1738         for i in xrange(6):
1739             d.addCallback(lambda ignored, i=i:
1740                 mw.put_block(self.block, i, self.salt))
1741         d.addCallback(lambda ignored:
1742             mw.put_encprivkey(self.encprivkey))
1743         d.addCallback(lambda ignored:
1744             mw.put_blockhashes(self.block_hash_tree))
1745         d.addCallback(lambda ignored:
1746             mw.put_sharehashes(self.share_hash_chain))
1747         d.addCallback(lambda ignored:
1748             mw.put_root_hash(self.root_hash))
1749         d.addCallback(lambda ignored:
1750             mw.put_signature(self.signature))
1751         d.addCallback(lambda ignored:
1752             mw.put_verification_key(self.verification_key))
1753         # Now try to put the signature again. This should fail
1754         d.addCallback(lambda ignored:
1755             self.shouldFail(LayoutInvalid, "signature after verification",
1756                             None,
1757                             mw.put_signature, self.signature))
1758         return d
1759
1760
1761     def test_uncoordinated_write(self):
1762         # Make two mutable writers, both pointing to the same storage
1763         # server, both at the same storage index, and try writing to the
1764         # same share.
1765         mw1 = self._make_new_mw("si1", 0)
1766         mw2 = self._make_new_mw("si1", 0)
1767
1768         def _check_success(results):
1769             result, readvs = results
1770             self.failUnless(result)
1771
1772         def _check_failure(results):
1773             result, readvs = results
1774             self.failIf(result)
1775
1776         def _write_share(mw):
1777             for i in xrange(6):
1778                 mw.put_block(self.block, i, self.salt)
1779             mw.put_encprivkey(self.encprivkey)
1780             mw.put_blockhashes(self.block_hash_tree)
1781             mw.put_sharehashes(self.share_hash_chain)
1782             mw.put_root_hash(self.root_hash)
1783             mw.put_signature(self.signature)
1784             mw.put_verification_key(self.verification_key)
1785             return mw.finish_publishing()
1786         d = _write_share(mw1)
1787         d.addCallback(_check_success)
1788         d.addCallback(lambda ignored:
1789             _write_share(mw2))
1790         d.addCallback(_check_failure)
1791         return d
1792
1793
1794     def test_invalid_salt_size(self):
1795         # Salts need to be 16 bytes in size. Writes that attempt to
1796         # write more or less than this should be rejected.
1797         mw = self._make_new_mw("si1", 0)
1798         invalid_salt = "a" * 17 # 17 bytes
1799         another_invalid_salt = "b" * 15 # 15 bytes
1800         d = defer.succeed(None)
1801         d.addCallback(lambda ignored:
1802             self.shouldFail(LayoutInvalid, "salt too big",
1803                             None,
1804                             mw.put_block, self.block, 0, invalid_salt))
1805         d.addCallback(lambda ignored:
1806             self.shouldFail(LayoutInvalid, "salt too small",
1807                             None,
1808                             mw.put_block, self.block, 0,
1809                             another_invalid_salt))
1810         return d
1811
1812
1813     def test_write_test_vectors(self):
1814         # If we give the write proxy a bogus test vector at 
1815         # any point during the process, it should fail to write when we 
1816         # tell it to write.
1817         def _check_failure(results):
1818             self.failUnlessEqual(len(results), 2)
1819             res, d = results
1820             self.failIf(res)
1821
1822         def _check_success(results):
1823             self.failUnlessEqual(len(results), 2)
1824             res, d = results
1825             self.failUnless(results)
1826
1827         mw = self._make_new_mw("si1", 0)
1828         mw.set_checkstring("this is a lie")
1829         for i in xrange(6):
1830             mw.put_block(self.block, i, self.salt)
1831         mw.put_encprivkey(self.encprivkey)
1832         mw.put_blockhashes(self.block_hash_tree)
1833         mw.put_sharehashes(self.share_hash_chain)
1834         mw.put_root_hash(self.root_hash)
1835         mw.put_signature(self.signature)
1836         mw.put_verification_key(self.verification_key)
1837         d = mw.finish_publishing()
1838         d.addCallback(_check_failure)
1839         d.addCallback(lambda ignored:
1840             mw.set_checkstring(""))
1841         d.addCallback(lambda ignored:
1842             mw.finish_publishing())
1843         d.addCallback(_check_success)
1844         return d
1845
1846
1847     def serialize_blockhashes(self, blockhashes):
1848         return "".join(blockhashes)
1849
1850
1851     def serialize_sharehashes(self, sharehashes):
1852         ret = "".join([struct.pack(">H32s", i, sharehashes[i])
1853                         for i in sorted(sharehashes.keys())])
1854         return ret
1855
1856
1857     def test_write(self):
1858         # This translates to a file with 6 6-byte segments, and with 2-byte
1859         # blocks.
1860         mw = self._make_new_mw("si1", 0)
1861         # Test writing some blocks.
1862         read = self.ss.remote_slot_readv
1863         expected_private_key_offset = struct.calcsize(MDMFHEADER)
1864         expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
1865                                     PRIVATE_KEY_SIZE + \
1866                                     SIGNATURE_SIZE + \
1867                                     VERIFICATION_KEY_SIZE + \
1868                                     SHARE_HASH_CHAIN_SIZE
1869         written_block_size = 2 + len(self.salt)
1870         written_block = self.block + self.salt
1871         for i in xrange(6):
1872             mw.put_block(self.block, i, self.salt)
1873
1874         mw.put_encprivkey(self.encprivkey)
1875         mw.put_blockhashes(self.block_hash_tree)
1876         mw.put_sharehashes(self.share_hash_chain)
1877         mw.put_root_hash(self.root_hash)
1878         mw.put_signature(self.signature)
1879         mw.put_verification_key(self.verification_key)
1880         d = mw.finish_publishing()
1881         def _check_publish(results):
1882             self.failUnlessEqual(len(results), 2)
1883             result, ign = results
1884             self.failUnless(result, "publish failed")
1885             for i in xrange(6):
1886                 self.failUnlessEqual(read("si1", [0], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]),
1887                                 {0: [written_block]})
1888
1889             self.failUnlessEqual(len(self.encprivkey), 7)
1890             self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]),
1891                                  {0: [self.encprivkey]})
1892
1893             expected_block_hash_offset = expected_sharedata_offset + \
1894                         (6 * written_block_size)
1895             self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6)
1896             self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]),
1897                                  {0: [self.block_hash_tree_s]})
1898
1899             expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey)
1900             self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
1901                                  {0: [self.share_hash_chain_s]})
1902
1903             self.failUnlessEqual(read("si1", [0], [(9, 32)]),
1904                                  {0: [self.root_hash]})
1905             expected_signature_offset = expected_share_hash_offset + \
1906                 len(self.share_hash_chain_s)
1907             self.failUnlessEqual(len(self.signature), 9)
1908             self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]),
1909                                  {0: [self.signature]})
1910
1911             expected_verification_key_offset = expected_signature_offset + len(self.signature)
1912             self.failUnlessEqual(len(self.verification_key), 6)
1913             self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]),
1914                                  {0: [self.verification_key]})
1915
1916             signable = mw.get_signable()
1917             verno, seq, roothash, k, n, segsize, datalen = \
1918                                             struct.unpack(">BQ32sBBQQ",
1919                                                           signable)
1920             self.failUnlessEqual(verno, 1)
1921             self.failUnlessEqual(seq, 0)
1922             self.failUnlessEqual(roothash, self.root_hash)
1923             self.failUnlessEqual(k, 3)
1924             self.failUnlessEqual(n, 10)
1925             self.failUnlessEqual(segsize, 6)
1926             self.failUnlessEqual(datalen, 36)
1927             expected_eof_offset = expected_block_hash_offset + \
1928                 len(self.block_hash_tree_s)
1929
1930             # Check the version number to make sure that it is correct.
1931             expected_version_number = struct.pack(">B", 1)
1932             self.failUnlessEqual(read("si1", [0], [(0, 1)]),
1933                                  {0: [expected_version_number]})
1934             # Check the sequence number to make sure that it is correct
1935             expected_sequence_number = struct.pack(">Q", 0)
1936             self.failUnlessEqual(read("si1", [0], [(1, 8)]),
1937                                  {0: [expected_sequence_number]})
1938             # Check that the encoding parameters (k, N, segement size, data
1939             # length) are what they should be. These are  3, 10, 6, 36
1940             expected_k = struct.pack(">B", 3)
1941             self.failUnlessEqual(read("si1", [0], [(41, 1)]),
1942                                  {0: [expected_k]})
1943             expected_n = struct.pack(">B", 10)
1944             self.failUnlessEqual(read("si1", [0], [(42, 1)]),
1945                                  {0: [expected_n]})
1946             expected_segment_size = struct.pack(">Q", 6)
1947             self.failUnlessEqual(read("si1", [0], [(43, 8)]),
1948                                  {0: [expected_segment_size]})
1949             expected_data_length = struct.pack(">Q", 36)
1950             self.failUnlessEqual(read("si1", [0], [(51, 8)]),
1951                                  {0: [expected_data_length]})
1952             expected_offset = struct.pack(">Q", expected_private_key_offset)
1953             self.failUnlessEqual(read("si1", [0], [(59, 8)]),
1954                                  {0: [expected_offset]})
1955             expected_offset = struct.pack(">Q", expected_share_hash_offset)
1956             self.failUnlessEqual(read("si1", [0], [(67, 8)]),
1957                                  {0: [expected_offset]})
1958             expected_offset = struct.pack(">Q", expected_signature_offset)
1959             self.failUnlessEqual(read("si1", [0], [(75, 8)]),
1960                                  {0: [expected_offset]})
1961             expected_offset = struct.pack(">Q", expected_verification_key_offset)
1962             self.failUnlessEqual(read("si1", [0], [(83, 8)]),
1963                                  {0: [expected_offset]})
1964             expected_offset = struct.pack(">Q", expected_verification_key_offset + len(self.verification_key))
1965             self.failUnlessEqual(read("si1", [0], [(91, 8)]),
1966                                  {0: [expected_offset]})
1967             expected_offset = struct.pack(">Q", expected_sharedata_offset)
1968             self.failUnlessEqual(read("si1", [0], [(99, 8)]),
1969                                  {0: [expected_offset]})
1970             expected_offset = struct.pack(">Q", expected_block_hash_offset)
1971             self.failUnlessEqual(read("si1", [0], [(107, 8)]),
1972                                  {0: [expected_offset]})
1973             expected_offset = struct.pack(">Q", expected_eof_offset)
1974             self.failUnlessEqual(read("si1", [0], [(115, 8)]),
1975                                  {0: [expected_offset]})
1976         d.addCallback(_check_publish)
1977         return d
1978
1979     def _make_new_mw(self, si, share, datalength=36):
1980         # This is a file of size 36 bytes. Since it has a segment
1981         # size of 6, we know that it has 6 byte segments, which will
1982         # be split into blocks of 2 bytes because our FEC k
1983         # parameter is 3.
1984         mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
1985                                 6, datalength)
1986         return mw
1987
1988
1989     def test_write_rejected_with_too_many_blocks(self):
1990         mw = self._make_new_mw("si0", 0)
1991
1992         # Try writing too many blocks. We should not be able to write
1993         # more than 6
1994         # blocks into each share.
1995         d = defer.succeed(None)
1996         for i in xrange(6):
1997             d.addCallback(lambda ignored, i=i:
1998                 mw.put_block(self.block, i, self.salt))
1999         d.addCallback(lambda ignored:
2000             self.shouldFail(LayoutInvalid, "too many blocks",
2001                             None,
2002                             mw.put_block, self.block, 7, self.salt))
2003         return d
2004
2005
2006     def test_write_rejected_with_invalid_salt(self):
2007         # Try writing an invalid salt. Salts are 16 bytes -- any more or
2008         # less should cause an error.
2009         mw = self._make_new_mw("si1", 0)
2010         bad_salt = "a" * 17 # 17 bytes
2011         d = defer.succeed(None)
2012         d.addCallback(lambda ignored:
2013             self.shouldFail(LayoutInvalid, "test_invalid_salt",
2014                             None, mw.put_block, self.block, 7, bad_salt))
2015         return d
2016
2017
2018     def test_write_rejected_with_invalid_root_hash(self):
2019         # Try writing an invalid root hash. This should be SHA256d, and
2020         # 32 bytes long as a result.
2021         mw = self._make_new_mw("si2", 0)
2022         # 17 bytes != 32 bytes
2023         invalid_root_hash = "a" * 17
2024         d = defer.succeed(None)
2025         # Before this test can work, we need to put some blocks + salts,
2026         # a block hash tree, and a share hash tree. Otherwise, we'll see
2027         # failures that match what we are looking for, but are caused by
2028         # the constraints imposed on operation ordering.
2029         for i in xrange(6):
2030             d.addCallback(lambda ignored, i=i:
2031                 mw.put_block(self.block, i, self.salt))
2032         d.addCallback(lambda ignored:
2033             mw.put_encprivkey(self.encprivkey))
2034         d.addCallback(lambda ignored:
2035             mw.put_blockhashes(self.block_hash_tree))
2036         d.addCallback(lambda ignored:
2037             mw.put_sharehashes(self.share_hash_chain))
2038         d.addCallback(lambda ignored:
2039             self.shouldFail(LayoutInvalid, "invalid root hash",
2040                             None, mw.put_root_hash, invalid_root_hash))
2041         return d
2042
2043
2044     def test_write_rejected_with_invalid_blocksize(self):
2045         # The blocksize implied by the writer that we get from
2046         # _make_new_mw is 2bytes -- any more or any less than this
2047         # should be cause for failure, unless it is the tail segment, in
2048         # which case it may not be failure.
2049         invalid_block = "a"
2050         mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
2051                                              # one byte blocks
2052         # 1 bytes != 2 bytes
2053         d = defer.succeed(None)
2054         d.addCallback(lambda ignored, invalid_block=invalid_block:
2055             self.shouldFail(LayoutInvalid, "test blocksize too small",
2056                             None, mw.put_block, invalid_block, 0,
2057                             self.salt))
2058         invalid_block = invalid_block * 3
2059         # 3 bytes != 2 bytes
2060         d.addCallback(lambda ignored:
2061             self.shouldFail(LayoutInvalid, "test blocksize too large",
2062                             None,
2063                             mw.put_block, invalid_block, 0, self.salt))
2064         for i in xrange(5):
2065             d.addCallback(lambda ignored, i=i:
2066                 mw.put_block(self.block, i, self.salt))
2067         # Try to put an invalid tail segment
2068         d.addCallback(lambda ignored:
2069             self.shouldFail(LayoutInvalid, "test invalid tail segment",
2070                             None,
2071                             mw.put_block, self.block, 5, self.salt))
2072         valid_block = "a"
2073         d.addCallback(lambda ignored:
2074             mw.put_block(valid_block, 5, self.salt))
2075         return d
2076
2077
2078     def test_write_enforces_order_constraints(self):
2079         # We require that the MDMFSlotWriteProxy be interacted with in a
2080         # specific way.
2081         # That way is:
2082         # 0: __init__
2083         # 1: write blocks and salts
2084         # 2: Write the encrypted private key
2085         # 3: Write the block hashes
2086         # 4: Write the share hashes
2087         # 5: Write the root hash and salt hash
2088         # 6: Write the signature and verification key
2089         # 7: Write the file.
2090         # 
2091         # Some of these can be performed out-of-order, and some can't.
2092         # The dependencies that I want to test here are:
2093         #  - Private key before block hashes
2094         #  - share hashes and block hashes before root hash
2095         #  - root hash before signature
2096         #  - signature before verification key
2097         mw0 = self._make_new_mw("si0", 0)
2098         # Write some shares
2099         d = defer.succeed(None)
2100         for i in xrange(6):
2101             d.addCallback(lambda ignored, i=i:
2102                 mw0.put_block(self.block, i, self.salt))
2103
2104         # Try to write the share hash chain without writing the
2105         # encrypted private key
2106         d.addCallback(lambda ignored:
2107             self.shouldFail(LayoutInvalid, "share hash chain before "
2108                                            "private key",
2109                             None,
2110                             mw0.put_sharehashes, self.share_hash_chain))
2111         # Write the private key.
2112         d.addCallback(lambda ignored:
2113             mw0.put_encprivkey(self.encprivkey))
2114
2115         # Now write the block hashes and try again
2116         d.addCallback(lambda ignored:
2117             mw0.put_blockhashes(self.block_hash_tree))
2118
2119         # We haven't yet put the root hash on the share, so we shouldn't
2120         # be able to sign it.
2121         d.addCallback(lambda ignored:
2122             self.shouldFail(LayoutInvalid, "signature before root hash",
2123                             None, mw0.put_signature, self.signature))
2124
2125         d.addCallback(lambda ignored:
2126             self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
2127
2128         # ..and, since that fails, we also shouldn't be able to put the
2129         # verification key.
2130         d.addCallback(lambda ignored:
2131             self.shouldFail(LayoutInvalid, "key before signature",
2132                             None, mw0.put_verification_key,
2133                             self.verification_key))
2134
2135         # Now write the share hashes.
2136         d.addCallback(lambda ignored:
2137             mw0.put_sharehashes(self.share_hash_chain))
2138         # We should be able to write the root hash now too
2139         d.addCallback(lambda ignored:
2140             mw0.put_root_hash(self.root_hash))
2141
2142         # We should still be unable to put the verification key
2143         d.addCallback(lambda ignored:
2144             self.shouldFail(LayoutInvalid, "key before signature",
2145                             None, mw0.put_verification_key,
2146                             self.verification_key))
2147
2148         d.addCallback(lambda ignored:
2149             mw0.put_signature(self.signature))
2150
2151         # We shouldn't be able to write the offsets to the remote server
2152         # until the offset table is finished; IOW, until we have written
2153         # the verification key.
2154         d.addCallback(lambda ignored:
2155             self.shouldFail(LayoutInvalid, "offsets before verification key",
2156                             None,
2157                             mw0.finish_publishing))
2158
2159         d.addCallback(lambda ignored:
2160             mw0.put_verification_key(self.verification_key))
2161         return d
2162
2163
2164     def test_end_to_end(self):
2165         mw = self._make_new_mw("si1", 0)
2166         # Write a share using the mutable writer, and make sure that the
2167         # reader knows how to read everything back to us.
2168         d = defer.succeed(None)
2169         for i in xrange(6):
2170             d.addCallback(lambda ignored, i=i:
2171                 mw.put_block(self.block, i, self.salt))
2172         d.addCallback(lambda ignored:
2173             mw.put_encprivkey(self.encprivkey))
2174         d.addCallback(lambda ignored:
2175             mw.put_blockhashes(self.block_hash_tree))
2176         d.addCallback(lambda ignored:
2177             mw.put_sharehashes(self.share_hash_chain))
2178         d.addCallback(lambda ignored:
2179             mw.put_root_hash(self.root_hash))
2180         d.addCallback(lambda ignored:
2181             mw.put_signature(self.signature))
2182         d.addCallback(lambda ignored:
2183             mw.put_verification_key(self.verification_key))
2184         d.addCallback(lambda ignored:
2185             mw.finish_publishing())
2186
2187         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2188         def _check_block_and_salt((block, salt)):
2189             self.failUnlessEqual(block, self.block)
2190             self.failUnlessEqual(salt, self.salt)
2191
2192         for i in xrange(6):
2193             d.addCallback(lambda ignored, i=i:
2194                 mr.get_block_and_salt(i))
2195             d.addCallback(_check_block_and_salt)
2196
2197         d.addCallback(lambda ignored:
2198             mr.get_encprivkey())
2199         d.addCallback(lambda encprivkey:
2200             self.failUnlessEqual(self.encprivkey, encprivkey))
2201
2202         d.addCallback(lambda ignored:
2203             mr.get_blockhashes())
2204         d.addCallback(lambda blockhashes:
2205             self.failUnlessEqual(self.block_hash_tree, blockhashes))
2206
2207         d.addCallback(lambda ignored:
2208             mr.get_sharehashes())
2209         d.addCallback(lambda sharehashes:
2210             self.failUnlessEqual(self.share_hash_chain, sharehashes))
2211
2212         d.addCallback(lambda ignored:
2213             mr.get_signature())
2214         d.addCallback(lambda signature:
2215             self.failUnlessEqual(signature, self.signature))
2216
2217         d.addCallback(lambda ignored:
2218             mr.get_verification_key())
2219         d.addCallback(lambda verification_key:
2220             self.failUnlessEqual(verification_key, self.verification_key))
2221
2222         d.addCallback(lambda ignored:
2223             mr.get_seqnum())
2224         d.addCallback(lambda seqnum:
2225             self.failUnlessEqual(seqnum, 0))
2226
2227         d.addCallback(lambda ignored:
2228             mr.get_root_hash())
2229         d.addCallback(lambda root_hash:
2230             self.failUnlessEqual(self.root_hash, root_hash))
2231
2232         d.addCallback(lambda ignored:
2233             mr.get_encoding_parameters())
2234         def _check_encoding_parameters((k, n, segsize, datalen)):
2235             self.failUnlessEqual(k, 3)
2236             self.failUnlessEqual(n, 10)
2237             self.failUnlessEqual(segsize, 6)
2238             self.failUnlessEqual(datalen, 36)
2239         d.addCallback(_check_encoding_parameters)
2240
2241         d.addCallback(lambda ignored:
2242             mr.get_checkstring())
2243         d.addCallback(lambda checkstring:
2244             self.failUnlessEqual(checkstring, mw.get_checkstring()))
2245         return d
2246
2247
2248     def test_is_sdmf(self):
2249         # The MDMFSlotReadProxy should also know how to read SDMF files,
2250         # since it will encounter them on the grid. Callers use the
2251         # is_sdmf method to test this.
2252         self.write_sdmf_share_to_server("si1")
2253         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2254         d = mr.is_sdmf()
2255         d.addCallback(lambda issdmf:
2256             self.failUnless(issdmf))
2257         return d
2258
2259
2260     def test_reads_sdmf(self):
2261         # The slot read proxy should, naturally, know how to tell us
2262         # about data in the SDMF format
2263         self.write_sdmf_share_to_server("si1")
2264         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2265         d = defer.succeed(None)
2266         d.addCallback(lambda ignored:
2267             mr.is_sdmf())
2268         d.addCallback(lambda issdmf:
2269             self.failUnless(issdmf))
2270
2271         # What do we need to read?
2272         #  - The sharedata
2273         #  - The salt
2274         d.addCallback(lambda ignored:
2275             mr.get_block_and_salt(0))
2276         def _check_block_and_salt(results):
2277             block, salt = results
2278             # Our original file is 36 bytes long. Then each share is 12
2279             # bytes in size. The share is composed entirely of the
2280             # letter a. self.block contains 2 as, so 6 * self.block is
2281             # what we are looking for.
2282             self.failUnlessEqual(block, self.block * 6)
2283             self.failUnlessEqual(salt, self.salt)
2284         d.addCallback(_check_block_and_salt)
2285
2286         #  - The blockhashes
2287         d.addCallback(lambda ignored:
2288             mr.get_blockhashes())
2289         d.addCallback(lambda blockhashes:
2290             self.failUnlessEqual(self.block_hash_tree,
2291                                  blockhashes,
2292                                  blockhashes))
2293         #  - The sharehashes
2294         d.addCallback(lambda ignored:
2295             mr.get_sharehashes())
2296         d.addCallback(lambda sharehashes:
2297             self.failUnlessEqual(self.share_hash_chain,
2298                                  sharehashes))
2299         #  - The keys
2300         d.addCallback(lambda ignored:
2301             mr.get_encprivkey())
2302         d.addCallback(lambda encprivkey:
2303             self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
2304         d.addCallback(lambda ignored:
2305             mr.get_verification_key())
2306         d.addCallback(lambda verification_key:
2307             self.failUnlessEqual(verification_key,
2308                                  self.verification_key,
2309                                  verification_key))
2310         #  - The signature
2311         d.addCallback(lambda ignored:
2312             mr.get_signature())
2313         d.addCallback(lambda signature:
2314             self.failUnlessEqual(signature, self.signature, signature))
2315
2316         #  - The sequence number
2317         d.addCallback(lambda ignored:
2318             mr.get_seqnum())
2319         d.addCallback(lambda seqnum:
2320             self.failUnlessEqual(seqnum, 0, seqnum))
2321
2322         #  - The root hash
2323         d.addCallback(lambda ignored:
2324             mr.get_root_hash())
2325         d.addCallback(lambda root_hash:
2326             self.failUnlessEqual(root_hash, self.root_hash, root_hash))
2327         return d
2328
2329
2330     def test_only_reads_one_segment_sdmf(self):
2331         # SDMF shares have only one segment, so it doesn't make sense to
2332         # read more segments than that. The reader should know this and
2333         # complain if we try to do that.
2334         self.write_sdmf_share_to_server("si1")
2335         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2336         d = defer.succeed(None)
2337         d.addCallback(lambda ignored:
2338             mr.is_sdmf())
2339         d.addCallback(lambda issdmf:
2340             self.failUnless(issdmf))
2341         d.addCallback(lambda ignored:
2342             self.shouldFail(LayoutInvalid, "test bad segment",
2343                             None,
2344                             mr.get_block_and_salt, 1))
2345         return d
2346
2347
2348     def test_read_with_prefetched_mdmf_data(self):
2349         # The MDMFSlotReadProxy will prefill certain fields if you pass
2350         # it data that you have already fetched. This is useful for
2351         # cases like the Servermap, which prefetches ~2kb of data while
2352         # finding out which shares are on the remote peer so that it
2353         # doesn't waste round trips.
2354         mdmf_data = self.build_test_mdmf_share()
2355         self.write_test_share_to_server("si1")
2356         def _make_mr(ignored, length):
2357             mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
2358             return mr
2359
2360         d = defer.succeed(None)
2361         # This should be enough to fill in both the encoding parameters
2362         # and the table of offsets, which will complete the version
2363         # information tuple.
2364         d.addCallback(_make_mr, 123)
2365         d.addCallback(lambda mr:
2366             mr.get_verinfo())
2367         def _check_verinfo(verinfo):
2368             self.failUnless(verinfo)
2369             self.failUnlessEqual(len(verinfo), 9)
2370             (seqnum,
2371              root_hash,
2372              salt_hash,
2373              segsize,
2374              datalen,
2375              k,
2376              n,
2377              prefix,
2378              offsets) = verinfo
2379             self.failUnlessEqual(seqnum, 0)
2380             self.failUnlessEqual(root_hash, self.root_hash)
2381             self.failUnlessEqual(segsize, 6)
2382             self.failUnlessEqual(datalen, 36)
2383             self.failUnlessEqual(k, 3)
2384             self.failUnlessEqual(n, 10)
2385             expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
2386                                           1,
2387                                           seqnum,
2388                                           root_hash,
2389                                           k,
2390                                           n,
2391                                           segsize,
2392                                           datalen)
2393             self.failUnlessEqual(expected_prefix, prefix)
2394             self.failUnlessEqual(self.rref.read_count, 0)
2395         d.addCallback(_check_verinfo)
2396         # This is not enough data to read a block and a share, so the
2397         # wrapper should attempt to read this from the remote server.
2398         d.addCallback(_make_mr, 123)
2399         d.addCallback(lambda mr:
2400             mr.get_block_and_salt(0))
2401         def _check_block_and_salt((block, salt)):
2402             self.failUnlessEqual(block, self.block)
2403             self.failUnlessEqual(salt, self.salt)
2404             self.failUnlessEqual(self.rref.read_count, 1)
2405         # This should be enough data to read one block.
2406         d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140)
2407         d.addCallback(lambda mr:
2408             mr.get_block_and_salt(0))
2409         d.addCallback(_check_block_and_salt)
2410         return d
2411
2412
2413     def test_read_with_prefetched_sdmf_data(self):
2414         sdmf_data = self.build_test_sdmf_share()
2415         self.write_sdmf_share_to_server("si1")
2416         def _make_mr(ignored, length):
2417             mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
2418             return mr
2419
2420         d = defer.succeed(None)
2421         # This should be enough to get us the encoding parameters,
2422         # offset table, and everything else we need to build a verinfo
2423         # string.
2424         d.addCallback(_make_mr, 123)
2425         d.addCallback(lambda mr:
2426             mr.get_verinfo())
2427         def _check_verinfo(verinfo):
2428             self.failUnless(verinfo)
2429             self.failUnlessEqual(len(verinfo), 9)
2430             (seqnum,
2431              root_hash,
2432              salt,
2433              segsize,
2434              datalen,
2435              k,
2436              n,
2437              prefix,
2438              offsets) = verinfo
2439             self.failUnlessEqual(seqnum, 0)
2440             self.failUnlessEqual(root_hash, self.root_hash)
2441             self.failUnlessEqual(salt, self.salt)
2442             self.failUnlessEqual(segsize, 36)
2443             self.failUnlessEqual(datalen, 36)
2444             self.failUnlessEqual(k, 3)
2445             self.failUnlessEqual(n, 10)
2446             expected_prefix = struct.pack(SIGNED_PREFIX,
2447                                           0,
2448                                           seqnum,
2449                                           root_hash,
2450                                           salt,
2451                                           k,
2452                                           n,
2453                                           segsize,
2454                                           datalen)
2455             self.failUnlessEqual(expected_prefix, prefix)
2456             self.failUnlessEqual(self.rref.read_count, 0)
2457         d.addCallback(_check_verinfo)
2458         # This shouldn't be enough to read any share data.
2459         d.addCallback(_make_mr, 123)
2460         d.addCallback(lambda mr:
2461             mr.get_block_and_salt(0))
2462         def _check_block_and_salt((block, salt)):
2463             self.failUnlessEqual(block, self.block * 6)
2464             self.failUnlessEqual(salt, self.salt)
2465             # TODO: Fix the read routine so that it reads only the data
2466             #       that it has cached if it can't read all of it.
2467             self.failUnlessEqual(self.rref.read_count, 2)
2468
2469         # This should be enough to read share data.
2470         d.addCallback(_make_mr, self.offsets['share_data'])
2471         d.addCallback(lambda mr:
2472             mr.get_block_and_salt(0))
2473         d.addCallback(_check_block_and_salt)
2474         return d
2475
2476
2477     def test_read_with_empty_mdmf_file(self):
2478         # Some tests upload a file with no contents to test things
2479         # unrelated to the actual handling of the content of the file.
2480         # The reader should behave intelligently in these cases.
2481         self.write_test_share_to_server("si1", empty=True)
2482         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2483         # We should be able to get the encoding parameters, and they
2484         # should be correct.
2485         d = defer.succeed(None)
2486         d.addCallback(lambda ignored:
2487             mr.get_encoding_parameters())
2488         def _check_encoding_parameters(params):
2489             self.failUnlessEqual(len(params), 4)
2490             k, n, segsize, datalen = params
2491             self.failUnlessEqual(k, 3)
2492             self.failUnlessEqual(n, 10)
2493             self.failUnlessEqual(segsize, 0)
2494             self.failUnlessEqual(datalen, 0)
2495         d.addCallback(_check_encoding_parameters)
2496
2497         # We should not be able to fetch a block, since there are no
2498         # blocks to fetch
2499         d.addCallback(lambda ignored:
2500             self.shouldFail(LayoutInvalid, "get block on empty file",
2501                             None,
2502                             mr.get_block_and_salt, 0))
2503         return d
2504
2505
2506     def test_read_with_empty_sdmf_file(self):
2507         self.write_sdmf_share_to_server("si1", empty=True)
2508         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2509         # We should be able to get the encoding parameters, and they
2510         # should be correct
2511         d = defer.succeed(None)
2512         d.addCallback(lambda ignored:
2513             mr.get_encoding_parameters())
2514         def _check_encoding_parameters(params):
2515             self.failUnlessEqual(len(params), 4)
2516             k, n, segsize, datalen = params
2517             self.failUnlessEqual(k, 3)
2518             self.failUnlessEqual(n, 10)
2519             self.failUnlessEqual(segsize, 0)
2520             self.failUnlessEqual(datalen, 0)
2521         d.addCallback(_check_encoding_parameters)
2522
2523         # It does not make sense to get a block in this format, so we
2524         # should not be able to.
2525         d.addCallback(lambda ignored:
2526             self.shouldFail(LayoutInvalid, "get block on an empty file",
2527                             None,
2528                             mr.get_block_and_salt, 0))
2529         return d
2530
2531
2532     def test_verinfo_with_sdmf_file(self):
2533         self.write_sdmf_share_to_server("si1")
2534         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2535         # We should be able to get the version information.
2536         d = defer.succeed(None)
2537         d.addCallback(lambda ignored:
2538             mr.get_verinfo())
2539         def _check_verinfo(verinfo):
2540             self.failUnless(verinfo)
2541             self.failUnlessEqual(len(verinfo), 9)
2542             (seqnum,
2543              root_hash,
2544              salt,
2545              segsize,
2546              datalen,
2547              k,
2548              n,
2549              prefix,
2550              offsets) = verinfo
2551             self.failUnlessEqual(seqnum, 0)
2552             self.failUnlessEqual(root_hash, self.root_hash)
2553             self.failUnlessEqual(salt, self.salt)
2554             self.failUnlessEqual(segsize, 36)
2555             self.failUnlessEqual(datalen, 36)
2556             self.failUnlessEqual(k, 3)
2557             self.failUnlessEqual(n, 10)
2558             expected_prefix = struct.pack(">BQ32s16s BBQQ",
2559                                           0,
2560                                           seqnum,
2561                                           root_hash,
2562                                           salt,
2563                                           k,
2564                                           n,
2565                                           segsize,
2566                                           datalen)
2567             self.failUnlessEqual(prefix, expected_prefix)
2568             self.failUnlessEqual(offsets, self.offsets)
2569         d.addCallback(_check_verinfo)
2570         return d
2571
2572
2573     def test_verinfo_with_mdmf_file(self):
2574         self.write_test_share_to_server("si1")
2575         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2576         d = defer.succeed(None)
2577         d.addCallback(lambda ignored:
2578             mr.get_verinfo())
2579         def _check_verinfo(verinfo):
2580             self.failUnless(verinfo)
2581             self.failUnlessEqual(len(verinfo), 9)
2582             (seqnum,
2583              root_hash,
2584              IV,
2585              segsize,
2586              datalen,
2587              k,
2588              n,
2589              prefix,
2590              offsets) = verinfo
2591             self.failUnlessEqual(seqnum, 0)
2592             self.failUnlessEqual(root_hash, self.root_hash)
2593             self.failIf(IV)
2594             self.failUnlessEqual(segsize, 6)
2595             self.failUnlessEqual(datalen, 36)
2596             self.failUnlessEqual(k, 3)
2597             self.failUnlessEqual(n, 10)
2598             expected_prefix = struct.pack(">BQ32s BBQQ",
2599                                           1,
2600                                           seqnum,
2601                                           root_hash,
2602                                           k,
2603                                           n,
2604                                           segsize,
2605                                           datalen)
2606             self.failUnlessEqual(prefix, expected_prefix)
2607             self.failUnlessEqual(offsets, self.offsets)
2608         d.addCallback(_check_verinfo)
2609         return d
2610
2611
2612     def test_sdmf_writer(self):
2613         # Go through the motions of writing an SDMF share to the storage
2614         # server. Then read the storage server to see that the share got
2615         # written in the way that we think it should have. 
2616
2617         # We do this first so that the necessary instance variables get
2618         # set the way we want them for the tests below.
2619         data = self.build_test_sdmf_share()
2620         sdmfr = SDMFSlotWriteProxy(0,
2621                                    self.rref,
2622                                    "si1",
2623                                    self.secrets,
2624                                    0, 3, 10, 36, 36)
2625         # Put the block and salt.
2626         sdmfr.put_block(self.blockdata, 0, self.salt)
2627
2628         # Put the encprivkey
2629         sdmfr.put_encprivkey(self.encprivkey)
2630
2631         # Put the block and share hash chains
2632         sdmfr.put_blockhashes(self.block_hash_tree)
2633         sdmfr.put_sharehashes(self.share_hash_chain)
2634         sdmfr.put_root_hash(self.root_hash)
2635
2636         # Put the signature
2637         sdmfr.put_signature(self.signature)
2638
2639         # Put the verification key
2640         sdmfr.put_verification_key(self.verification_key)
2641
2642         # Now check to make sure that nothing has been written yet.
2643         self.failUnlessEqual(self.rref.write_count, 0)
2644
2645         # Now finish publishing
2646         d = sdmfr.finish_publishing()
2647         def _then(ignored):
2648             self.failUnlessEqual(self.rref.write_count, 1)
2649             read = self.ss.remote_slot_readv
2650             self.failUnlessEqual(read("si1", [0], [(0, len(data))]),
2651                                  {0: [data]})
2652         d.addCallback(_then)
2653         return d
2654
2655
2656     def test_sdmf_writer_preexisting_share(self):
2657         data = self.build_test_sdmf_share()
2658         self.write_sdmf_share_to_server("si1")
2659
2660         # Now there is a share on the storage server. To successfully
2661         # write, we need to set the checkstring correctly. When we
2662         # don't, no write should occur.
2663         sdmfw = SDMFSlotWriteProxy(0,
2664                                    self.rref,
2665                                    "si1",
2666                                    self.secrets,
2667                                    1, 3, 10, 36, 36)
2668         sdmfw.put_block(self.blockdata, 0, self.salt)
2669
2670         # Put the encprivkey
2671         sdmfw.put_encprivkey(self.encprivkey)
2672
2673         # Put the block and share hash chains
2674         sdmfw.put_blockhashes(self.block_hash_tree)
2675         sdmfw.put_sharehashes(self.share_hash_chain)
2676
2677         # Put the root hash
2678         sdmfw.put_root_hash(self.root_hash)
2679
2680         # Put the signature
2681         sdmfw.put_signature(self.signature)
2682
2683         # Put the verification key
2684         sdmfw.put_verification_key(self.verification_key)
2685
2686         # We shouldn't have a checkstring yet
2687         self.failUnlessEqual(sdmfw.get_checkstring(), "")
2688
2689         d = sdmfw.finish_publishing()
2690         def _then(results):
2691             self.failIf(results[0])
2692             # this is the correct checkstring
2693             self._expected_checkstring = results[1][0][0]
2694             return self._expected_checkstring
2695
2696         d.addCallback(_then)
2697         d.addCallback(sdmfw.set_checkstring)
2698         d.addCallback(lambda ignored:
2699             sdmfw.get_checkstring())
2700         d.addCallback(lambda checkstring:
2701             self.failUnlessEqual(checkstring, self._expected_checkstring))
2702         d.addCallback(lambda ignored:
2703             sdmfw.finish_publishing())
2704         def _then_again(results):
2705             self.failUnless(results[0])
2706             read = self.ss.remote_slot_readv
2707             self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2708                                  {0: [struct.pack(">Q", 1)]})
2709             self.failUnlessEqual(read("si1", [0], [(9, len(data) - 9)]),
2710                                  {0: [data[9:]]})
2711         d.addCallback(_then_again)
2712         return d
2713
2714
2715 class Stats(unittest.TestCase):
2716
2717     def setUp(self):
2718         self.sparent = LoggingServiceParent()
2719         self._lease_secret = itertools.count()
2720     def tearDown(self):
2721         return self.sparent.stopService()
2722
2723     def workdir(self, name):
2724         basedir = os.path.join("storage", "Server", name)
2725         return basedir
2726
2727     def create(self, name):
2728         workdir = self.workdir(name)
2729         ss = StorageServer(workdir, "\x00" * 20)
2730         ss.setServiceParent(self.sparent)
2731         return ss
2732
2733     def test_latencies(self):
2734         ss = self.create("test_latencies")
2735         for i in range(10000):
2736             ss.add_latency("allocate", 1.0 * i)
2737         for i in range(1000):
2738             ss.add_latency("renew", 1.0 * i)
2739         for i in range(20):
2740             ss.add_latency("write", 1.0 * i)
2741         for i in range(10):
2742             ss.add_latency("cancel", 2.0 * i)
2743         ss.add_latency("get", 5.0)
2744
2745         output = ss.get_latencies()
2746
2747         self.failUnlessEqual(sorted(output.keys()),
2748                              sorted(["allocate", "renew", "cancel", "write", "get"]))
2749         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
2750         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
2751         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
2752         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
2753         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
2754         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
2755         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
2756         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
2757         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
2758
2759         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
2760         self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
2761         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1, output)
2762         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
2763         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
2764         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
2765         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
2766         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
2767         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
2768
2769         self.failUnlessEqual(len(ss.latencies["write"]), 20)
2770         self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
2771         self.failUnless(output["write"]["01_0_percentile"] is None, output)
2772         self.failUnless(abs(output["write"]["10_0_percentile"] -  2) < 1, output)
2773         self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
2774         self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
2775         self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
2776         self.failUnless(output["write"]["99_0_percentile"] is None, output)
2777         self.failUnless(output["write"]["99_9_percentile"] is None, output)
2778
2779         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
2780         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
2781         self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
2782         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1, output)
2783         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
2784         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
2785         self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
2786         self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
2787         self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
2788
2789         self.failUnlessEqual(len(ss.latencies["get"]), 1)
2790         self.failUnless(output["get"]["mean"] is None, output)
2791         self.failUnless(output["get"]["01_0_percentile"] is None, output)
2792         self.failUnless(output["get"]["10_0_percentile"] is None, output)
2793         self.failUnless(output["get"]["50_0_percentile"] is None, output)
2794         self.failUnless(output["get"]["90_0_percentile"] is None, output)
2795         self.failUnless(output["get"]["95_0_percentile"] is None, output)
2796         self.failUnless(output["get"]["99_0_percentile"] is None, output)
2797         self.failUnless(output["get"]["99_9_percentile"] is None, output)
2798
2799 def remove_tags(s):
2800     s = re.sub(r'<[^>]*>', ' ', s)
2801     s = re.sub(r'\s+', ' ', s)
2802     return s
2803
2804 class MyBucketCountingCrawler(BucketCountingCrawler):
2805     def finished_prefix(self, cycle, prefix):
2806         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
2807         if self.hook_ds:
2808             d = self.hook_ds.pop(0)
2809             d.callback(None)
2810
2811 class MyStorageServer(StorageServer):
2812     def add_bucket_counter(self):
2813         statefile = os.path.join(self.storedir, "bucket_counter.state")
2814         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
2815         self.bucket_counter.setServiceParent(self)
2816
2817 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
2818
2819     def setUp(self):
2820         self.s = service.MultiService()
2821         self.s.startService()
2822     def tearDown(self):
2823         return self.s.stopService()
2824
2825     def test_bucket_counter(self):
2826         basedir = "storage/BucketCounter/bucket_counter"
2827         fileutil.make_dirs(basedir)
2828         ss = StorageServer(basedir, "\x00" * 20)
2829         # to make sure we capture the bucket-counting-crawler in the middle
2830         # of a cycle, we reach in and reduce its maximum slice time to 0. We
2831         # also make it start sooner than usual.
2832         ss.bucket_counter.slow_start = 0
2833         orig_cpu_slice = ss.bucket_counter.cpu_slice
2834         ss.bucket_counter.cpu_slice = 0
2835         ss.setServiceParent(self.s)
2836
2837         w = StorageStatus(ss)
2838
2839         # this sample is before the crawler has started doing anything
2840         html = w.renderSynchronously()
2841         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2842         s = remove_tags(html)
2843         self.failUnlessIn("Accepting new shares: Yes", s)
2844         self.failUnlessIn("Reserved space: - 0 B (0)", s)
2845         self.failUnlessIn("Total buckets: Not computed yet", s)
2846         self.failUnlessIn("Next crawl in", s)
2847
2848         # give the bucket-counting-crawler one tick to get started. The
2849         # cpu_slice=0 will force it to yield right after it processes the
2850         # first prefix
2851
2852         d = fireEventually()
2853         def _check(ignored):
2854             # are we really right after the first prefix?
2855             state = ss.bucket_counter.get_state()
2856             if state["last-complete-prefix"] is None:
2857                 d2 = fireEventually()
2858                 d2.addCallback(_check)
2859                 return d2
2860             self.failUnlessEqual(state["last-complete-prefix"],
2861                                  ss.bucket_counter.prefixes[0])
2862             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2863             html = w.renderSynchronously()
2864             s = remove_tags(html)
2865             self.failUnlessIn(" Current crawl ", s)
2866             self.failUnlessIn(" (next work in ", s)
2867         d.addCallback(_check)
2868
2869         # now give it enough time to complete a full cycle
2870         def _watch():
2871             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2872         d.addCallback(lambda ignored: self.poll(_watch))
2873         def _check2(ignored):
2874             ss.bucket_counter.cpu_slice = orig_cpu_slice
2875             html = w.renderSynchronously()
2876             s = remove_tags(html)
2877             self.failUnlessIn("Total buckets: 0 (the number of", s)
2878             self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
2879         d.addCallback(_check2)
2880         return d
2881
2882     def test_bucket_counter_cleanup(self):
2883         basedir = "storage/BucketCounter/bucket_counter_cleanup"
2884         fileutil.make_dirs(basedir)
2885         ss = StorageServer(basedir, "\x00" * 20)
2886         # to make sure we capture the bucket-counting-crawler in the middle
2887         # of a cycle, we reach in and reduce its maximum slice time to 0.
2888         ss.bucket_counter.slow_start = 0
2889         orig_cpu_slice = ss.bucket_counter.cpu_slice
2890         ss.bucket_counter.cpu_slice = 0
2891         ss.setServiceParent(self.s)
2892
2893         d = fireEventually()
2894
2895         def _after_first_prefix(ignored):
2896             state = ss.bucket_counter.state
2897             if state["last-complete-prefix"] is None:
2898                 d2 = fireEventually()
2899                 d2.addCallback(_after_first_prefix)
2900                 return d2
2901             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2902             # now sneak in and mess with its state, to make sure it cleans up
2903             # properly at the end of the cycle
2904             self.failUnlessEqual(state["last-complete-prefix"],
2905                                  ss.bucket_counter.prefixes[0])
2906             state["bucket-counts"][-12] = {}
2907             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
2908             ss.bucket_counter.save_state()
2909         d.addCallback(_after_first_prefix)
2910
2911         # now give it enough time to complete a cycle
2912         def _watch():
2913             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2914         d.addCallback(lambda ignored: self.poll(_watch))
2915         def _check2(ignored):
2916             ss.bucket_counter.cpu_slice = orig_cpu_slice
2917             s = ss.bucket_counter.get_state()
2918             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
2919             self.failIf("bogusprefix!" in s["storage-index-samples"],
2920                         s["storage-index-samples"].keys())
2921         d.addCallback(_check2)
2922         return d
2923
2924     def test_bucket_counter_eta(self):
2925         basedir = "storage/BucketCounter/bucket_counter_eta"
2926         fileutil.make_dirs(basedir)
2927         ss = MyStorageServer(basedir, "\x00" * 20)
2928         ss.bucket_counter.slow_start = 0
2929         # these will be fired inside finished_prefix()
2930         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
2931         w = StorageStatus(ss)
2932
2933         d = defer.Deferred()
2934
2935         def _check_1(ignored):
2936             # no ETA is available yet
2937             html = w.renderSynchronously()
2938             s = remove_tags(html)
2939             self.failUnlessIn("complete (next work", s)
2940
2941         def _check_2(ignored):
2942             # one prefix has finished, so an ETA based upon that elapsed time
2943             # should be available.
2944             html = w.renderSynchronously()
2945             s = remove_tags(html)
2946             self.failUnlessIn("complete (ETA ", s)
2947
2948         def _check_3(ignored):
2949             # two prefixes have finished
2950             html = w.renderSynchronously()
2951             s = remove_tags(html)
2952             self.failUnlessIn("complete (ETA ", s)
2953             d.callback("done")
2954
2955         hooks[0].addCallback(_check_1).addErrback(d.errback)
2956         hooks[1].addCallback(_check_2).addErrback(d.errback)
2957         hooks[2].addCallback(_check_3).addErrback(d.errback)
2958
2959         ss.setServiceParent(self.s)
2960         return d
2961
2962 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
2963     stop_after_first_bucket = False
2964     def process_bucket(self, *args, **kwargs):
2965         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
2966         if self.stop_after_first_bucket:
2967             self.stop_after_first_bucket = False
2968             self.cpu_slice = -1.0
2969     def yielding(self, sleep_time):
2970         if not self.stop_after_first_bucket:
2971             self.cpu_slice = 500
2972
2973 class BrokenStatResults:
2974     pass
2975 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
2976     def stat(self, fn):
2977         s = os.stat(fn)
2978         bsr = BrokenStatResults()
2979         for attrname in dir(s):
2980             if attrname.startswith("_"):
2981                 continue
2982             if attrname == "st_blocks":
2983                 continue
2984             setattr(bsr, attrname, getattr(s, attrname))
2985         return bsr
2986
2987 class InstrumentedStorageServer(StorageServer):
2988     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
2989 class No_ST_BLOCKS_StorageServer(StorageServer):
2990     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
2991
2992 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2993
2994     def setUp(self):
2995         self.s = service.MultiService()
2996         self.s.startService()
2997     def tearDown(self):
2998         return self.s.stopService()
2999
3000     def make_shares(self, ss):
3001         def make(si):
3002             return (si, hashutil.tagged_hash("renew", si),
3003                     hashutil.tagged_hash("cancel", si))
3004         def make_mutable(si):
3005             return (si, hashutil.tagged_hash("renew", si),
3006                     hashutil.tagged_hash("cancel", si),
3007                     hashutil.tagged_hash("write-enabler", si))
3008         def make_extra_lease(si, num):
3009             return (hashutil.tagged_hash("renew-%d" % num, si),
3010                     hashutil.tagged_hash("cancel-%d" % num, si))
3011
3012         immutable_si_0, rs0, cs0 = make("\x00" * 16)
3013         immutable_si_1, rs1, cs1 = make("\x01" * 16)
3014         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
3015         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
3016         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
3017         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
3018         sharenums = [0]
3019         canary = FakeCanary()
3020         # note: 'tahoe debug dump-share' will not handle this file, since the
3021         # inner contents are not a valid CHK share
3022         data = "\xff" * 1000
3023
3024         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
3025                                          1000, canary)
3026         w[0].remote_write(0, data)
3027         w[0].remote_close()
3028
3029         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
3030                                          1000, canary)
3031         w[0].remote_write(0, data)
3032         w[0].remote_close()
3033         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
3034
3035         writev = ss.remote_slot_testv_and_readv_and_writev
3036         writev(mutable_si_2, (we2, rs2, cs2),
3037                {0: ([], [(0,data)], len(data))}, [])
3038         writev(mutable_si_3, (we3, rs3, cs3),
3039                {0: ([], [(0,data)], len(data))}, [])
3040         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
3041
3042         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
3043         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
3044         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
3045
3046     def test_basic(self):
3047         basedir = "storage/LeaseCrawler/basic"
3048         fileutil.make_dirs(basedir)
3049         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3050         # make it start sooner than usual.
3051         lc = ss.lease_checker
3052         lc.slow_start = 0
3053         lc.cpu_slice = 500
3054         lc.stop_after_first_bucket = True
3055         webstatus = StorageStatus(ss)
3056
3057         # create a few shares, with some leases on them
3058         self.make_shares(ss)
3059         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3060
3061         # add a non-sharefile to exercise another code path
3062         fn = os.path.join(ss.sharedir,
3063                           storage_index_to_dir(immutable_si_0),
3064                           "not-a-share")
3065         f = open(fn, "wb")
3066         f.write("I am not a share.\n")
3067         f.close()
3068
3069         # this is before the crawl has started, so we're not in a cycle yet
3070         initial_state = lc.get_state()
3071         self.failIf(lc.get_progress()["cycle-in-progress"])
3072         self.failIfIn("cycle-to-date", initial_state)
3073         self.failIfIn("estimated-remaining-cycle", initial_state)
3074         self.failIfIn("estimated-current-cycle", initial_state)
3075         self.failUnlessIn("history", initial_state)
3076         self.failUnlessEqual(initial_state["history"], {})
3077
3078         ss.setServiceParent(self.s)
3079
3080         DAY = 24*60*60
3081
3082         d = fireEventually()
3083
3084         # now examine the state right after the first bucket has been
3085         # processed.
3086         def _after_first_bucket(ignored):
3087             initial_state = lc.get_state()
3088             if "cycle-to-date" not in initial_state:
3089                 d2 = fireEventually()
3090                 d2.addCallback(_after_first_bucket)
3091                 return d2
3092             self.failUnlessIn("cycle-to-date", initial_state)
3093             self.failUnlessIn("estimated-remaining-cycle", initial_state)
3094             self.failUnlessIn("estimated-current-cycle", initial_state)
3095             self.failUnlessIn("history", initial_state)
3096             self.failUnlessEqual(initial_state["history"], {})
3097
3098             so_far = initial_state["cycle-to-date"]
3099             self.failUnlessEqual(so_far["expiration-enabled"], False)
3100             self.failUnlessIn("configured-expiration-mode", so_far)
3101             self.failUnlessIn("lease-age-histogram", so_far)
3102             lah = so_far["lease-age-histogram"]
3103             self.failUnlessEqual(type(lah), list)
3104             self.failUnlessEqual(len(lah), 1)
3105             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
3106             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
3107             self.failUnlessEqual(so_far["corrupt-shares"], [])
3108             sr1 = so_far["space-recovered"]
3109             self.failUnlessEqual(sr1["examined-buckets"], 1)
3110             self.failUnlessEqual(sr1["examined-shares"], 1)
3111             self.failUnlessEqual(sr1["actual-shares"], 0)
3112             self.failUnlessEqual(sr1["configured-diskbytes"], 0)
3113             self.failUnlessEqual(sr1["original-sharebytes"], 0)
3114             left = initial_state["estimated-remaining-cycle"]
3115             sr2 = left["space-recovered"]
3116             self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
3117             self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
3118             self.failIfEqual(sr2["actual-shares"], None)
3119             self.failIfEqual(sr2["configured-diskbytes"], None)
3120             self.failIfEqual(sr2["original-sharebytes"], None)
3121         d.addCallback(_after_first_bucket)
3122         d.addCallback(lambda ign: self.render1(webstatus))
3123         def _check_html_in_cycle(html):
3124             s = remove_tags(html)
3125             self.failUnlessIn("So far, this cycle has examined "
3126                               "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
3127             self.failUnlessIn("and has recovered: "
3128                               "0 shares, 0 buckets (0 mutable / 0 immutable), "
3129                               "0 B (0 B / 0 B)", s)
3130             self.failUnlessIn("If expiration were enabled, "
3131                               "we would have recovered: "
3132                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3133                               " 0 B (0 B / 0 B) by now", s)
3134             self.failUnlessIn("and the remainder of this cycle "
3135                               "would probably recover: "
3136                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3137                               " 0 B (0 B / 0 B)", s)
3138             self.failUnlessIn("and the whole cycle would probably recover: "
3139                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3140                               " 0 B (0 B / 0 B)", s)
3141             self.failUnlessIn("if we were strictly using each lease's default "
3142                               "31-day lease lifetime", s)
3143             self.failUnlessIn("this cycle would be expected to recover: ", s)
3144         d.addCallback(_check_html_in_cycle)
3145
3146         # wait for the crawler to finish the first cycle. Nothing should have
3147         # been removed.
3148         def _wait():
3149             return bool(lc.get_state()["last-cycle-finished"] is not None)
3150         d.addCallback(lambda ign: self.poll(_wait))
3151
3152         def _after_first_cycle(ignored):
3153             s = lc.get_state()
3154             self.failIf("cycle-to-date" in s)
3155             self.failIf("estimated-remaining-cycle" in s)
3156             self.failIf("estimated-current-cycle" in s)
3157             last = s["history"][0]
3158             self.failUnlessIn("cycle-start-finish-times", last)
3159             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
3160             self.failUnlessEqual(last["expiration-enabled"], False)
3161             self.failUnlessIn("configured-expiration-mode", last)
3162
3163             self.failUnlessIn("lease-age-histogram", last)
3164             lah = last["lease-age-histogram"]
3165             self.failUnlessEqual(type(lah), list)
3166             self.failUnlessEqual(len(lah), 1)
3167             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
3168
3169             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3170             self.failUnlessEqual(last["corrupt-shares"], [])
3171
3172             rec = last["space-recovered"]
3173             self.failUnlessEqual(rec["examined-buckets"], 4)
3174             self.failUnlessEqual(rec["examined-shares"], 4)
3175             self.failUnlessEqual(rec["actual-buckets"], 0)
3176             self.failUnlessEqual(rec["original-buckets"], 0)
3177             self.failUnlessEqual(rec["configured-buckets"], 0)
3178             self.failUnlessEqual(rec["actual-shares"], 0)
3179             self.failUnlessEqual(rec["original-shares"], 0)
3180             self.failUnlessEqual(rec["configured-shares"], 0)
3181             self.failUnlessEqual(rec["actual-diskbytes"], 0)
3182             self.failUnlessEqual(rec["original-diskbytes"], 0)
3183             self.failUnlessEqual(rec["configured-diskbytes"], 0)
3184             self.failUnlessEqual(rec["actual-sharebytes"], 0)
3185             self.failUnlessEqual(rec["original-sharebytes"], 0)
3186             self.failUnlessEqual(rec["configured-sharebytes"], 0)
3187
3188             def _get_sharefile(si):
3189                 return list(ss._iter_share_files(si))[0]
3190             def count_leases(si):
3191                 return len(list(_get_sharefile(si).get_leases()))
3192             self.failUnlessEqual(count_leases(immutable_si_0), 1)
3193             self.failUnlessEqual(count_leases(immutable_si_1), 2)
3194             self.failUnlessEqual(count_leases(mutable_si_2), 1)
3195             self.failUnlessEqual(count_leases(mutable_si_3), 2)
3196         d.addCallback(_after_first_cycle)
3197         d.addCallback(lambda ign: self.render1(webstatus))
3198         def _check_html(html):
3199             s = remove_tags(html)
3200             self.failUnlessIn("recovered: 0 shares, 0 buckets "
3201                               "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
3202             self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
3203                               "(2 mutable / 2 immutable),", s)
3204             self.failUnlessIn("but expiration was not enabled", s)
3205         d.addCallback(_check_html)
3206         d.addCallback(lambda ign: self.render_json(webstatus))
3207         def _check_json(json):
3208             data = simplejson.loads(json)
3209             self.failUnlessIn("lease-checker", data)
3210             self.failUnlessIn("lease-checker-progress", data)
3211         d.addCallback(_check_json)
3212         return d
3213
3214     def backdate_lease(self, sf, renew_secret, new_expire_time):
3215         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
3216         # "renew" a lease with a new_expire_time that is older than what the
3217         # current lease has), so we have to reach inside it.
3218         for i,lease in enumerate(sf.get_leases()):
3219             if lease.renew_secret == renew_secret:
3220                 lease.expiration_time = new_expire_time
3221                 f = open(sf.home, 'rb+')
3222                 sf._write_lease_record(f, i, lease)
3223                 f.close()
3224                 return
3225         raise IndexError("unable to renew non-existent lease")
3226
3227     def test_expire_age(self):
3228         basedir = "storage/LeaseCrawler/expire_age"
3229         fileutil.make_dirs(basedir)
3230         # setting expiration_time to 2000 means that any lease which is more
3231         # than 2000s old will be expired.
3232         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3233                                        expiration_enabled=True,
3234                                        expiration_mode="age",
3235                                        expiration_override_lease_duration=2000)
3236         # make it start sooner than usual.
3237         lc = ss.lease_checker
3238         lc.slow_start = 0
3239         lc.stop_after_first_bucket = True
3240         webstatus = StorageStatus(ss)
3241
3242         # create a few shares, with some leases on them
3243         self.make_shares(ss)
3244         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3245
3246         def count_shares(si):
3247             return len(list(ss._iter_share_files(si)))
3248         def _get_sharefile(si):
3249             return list(ss._iter_share_files(si))[0]
3250         def count_leases(si):
3251             return len(list(_get_sharefile(si).get_leases()))
3252
3253         self.failUnlessEqual(count_shares(immutable_si_0), 1)
3254         self.failUnlessEqual(count_leases(immutable_si_0), 1)
3255         self.failUnlessEqual(count_shares(immutable_si_1), 1)
3256         self.failUnlessEqual(count_leases(immutable_si_1), 2)
3257         self.failUnlessEqual(count_shares(mutable_si_2), 1)
3258         self.failUnlessEqual(count_leases(mutable_si_2), 1)
3259         self.failUnlessEqual(count_shares(mutable_si_3), 1)
3260         self.failUnlessEqual(count_leases(mutable_si_3), 2)
3261
3262         # artificially crank back the expiration time on the first lease of
3263         # each share, to make it look like it expired already (age=1000s).
3264         # Some shares have an extra lease which is set to expire at the
3265         # default time in 31 days from now (age=31days). We then run the
3266         # crawler, which will expire the first lease, making some shares get
3267         # deleted and others stay alive (with one remaining lease)
3268         now = time.time()
3269
3270         sf0 = _get_sharefile(immutable_si_0)
3271         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
3272         sf0_size = os.stat(sf0.home).st_size
3273
3274         # immutable_si_1 gets an extra lease
3275         sf1 = _get_sharefile(immutable_si_1)
3276         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
3277
3278         sf2 = _get_sharefile(mutable_si_2)
3279         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
3280         sf2_size = os.stat(sf2.home).st_size
3281
3282         # mutable_si_3 gets an extra lease
3283         sf3 = _get_sharefile(mutable_si_3)
3284         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
3285
3286         ss.setServiceParent(self.s)
3287
3288         d = fireEventually()
3289         # examine the state right after the first bucket has been processed
3290         def _after_first_bucket(ignored):
3291             p = lc.get_progress()
3292             if not p["cycle-in-progress"]:
3293                 d2 = fireEventually()
3294                 d2.addCallback(_after_first_bucket)
3295                 return d2
3296         d.addCallback(_after_first_bucket)
3297         d.addCallback(lambda ign: self.render1(webstatus))
3298         def _check_html_in_cycle(html):
3299             s = remove_tags(html)
3300             # the first bucket encountered gets deleted, and its prefix
3301             # happens to be about 1/5th of the way through the ring, so the
3302             # predictor thinks we'll have 5 shares and that we'll delete them
3303             # all. This part of the test depends upon the SIs landing right
3304             # where they do now.
3305             self.failUnlessIn("The remainder of this cycle is expected to "
3306                               "recover: 4 shares, 4 buckets", s)
3307             self.failUnlessIn("The whole cycle is expected to examine "
3308                               "5 shares in 5 buckets and to recover: "
3309                               "5 shares, 5 buckets", s)
3310         d.addCallback(_check_html_in_cycle)
3311
3312         # wait for the crawler to finish the first cycle. Two shares should
3313         # have been removed
3314         def _wait():
3315             return bool(lc.get_state()["last-cycle-finished"] is not None)
3316         d.addCallback(lambda ign: self.poll(_wait))
3317
3318         def _after_first_cycle(ignored):
3319             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3320             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3321             self.failUnlessEqual(count_leases(immutable_si_1), 1)
3322             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3323             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3324             self.failUnlessEqual(count_leases(mutable_si_3), 1)
3325
3326             s = lc.get_state()
3327             last = s["history"][0]
3328
3329             self.failUnlessEqual(last["expiration-enabled"], True)
3330             self.failUnlessEqual(last["configured-expiration-mode"],
3331                                  ("age", 2000, None, ("mutable", "immutable")))
3332             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3333
3334             rec = last["space-recovered"]
3335             self.failUnlessEqual(rec["examined-buckets"], 4)
3336             self.failUnlessEqual(rec["examined-shares"], 4)
3337             self.failUnlessEqual(rec["actual-buckets"], 2)
3338             self.failUnlessEqual(rec["original-buckets"], 2)
3339             self.failUnlessEqual(rec["configured-buckets"], 2)
3340             self.failUnlessEqual(rec["actual-shares"], 2)
3341             self.failUnlessEqual(rec["original-shares"], 2)
3342             self.failUnlessEqual(rec["configured-shares"], 2)
3343             size = sf0_size + sf2_size
3344             self.failUnlessEqual(rec["actual-sharebytes"], size)
3345             self.failUnlessEqual(rec["original-sharebytes"], size)
3346             self.failUnlessEqual(rec["configured-sharebytes"], size)
3347             # different platforms have different notions of "blocks used by
3348             # this file", so merely assert that it's a number
3349             self.failUnless(rec["actual-diskbytes"] >= 0,
3350                             rec["actual-diskbytes"])
3351             self.failUnless(rec["original-diskbytes"] >= 0,
3352                             rec["original-diskbytes"])
3353             self.failUnless(rec["configured-diskbytes"] >= 0,
3354                             rec["configured-diskbytes"])
3355         d.addCallback(_after_first_cycle)
3356         d.addCallback(lambda ign: self.render1(webstatus))
3357         def _check_html(html):
3358             s = remove_tags(html)
3359             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
3360             self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
3361             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3362         d.addCallback(_check_html)
3363         return d
3364
3365     def test_expire_cutoff_date(self):
3366         basedir = "storage/LeaseCrawler/expire_cutoff_date"
3367         fileutil.make_dirs(basedir)
3368         # setting cutoff-date to 2000 seconds ago means that any lease which
3369         # is more than 2000s old will be expired.
3370         now = time.time()
3371         then = int(now - 2000)
3372         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3373                                        expiration_enabled=True,
3374                                        expiration_mode="cutoff-date",
3375                                        expiration_cutoff_date=then)
3376         # make it start sooner than usual.
3377         lc = ss.lease_checker
3378         lc.slow_start = 0
3379         lc.stop_after_first_bucket = True
3380         webstatus = StorageStatus(ss)
3381
3382         # create a few shares, with some leases on them
3383         self.make_shares(ss)
3384         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3385
3386         def count_shares(si):
3387             return len(list(ss._iter_share_files(si)))
3388         def _get_sharefile(si):
3389             return list(ss._iter_share_files(si))[0]
3390         def count_leases(si):
3391             return len(list(_get_sharefile(si).get_leases()))
3392
3393         self.failUnlessEqual(count_shares(immutable_si_0), 1)
3394         self.failUnlessEqual(count_leases(immutable_si_0), 1)
3395         self.failUnlessEqual(count_shares(immutable_si_1), 1)
3396         self.failUnlessEqual(count_leases(immutable_si_1), 2)
3397         self.failUnlessEqual(count_shares(mutable_si_2), 1)
3398         self.failUnlessEqual(count_leases(mutable_si_2), 1)
3399         self.failUnlessEqual(count_shares(mutable_si_3), 1)
3400         self.failUnlessEqual(count_leases(mutable_si_3), 2)
3401
3402         # artificially crank back the expiration time on the first lease of
3403         # each share, to make it look like was renewed 3000s ago. To achieve
3404         # this, we need to set the expiration time to now-3000+31days. This
3405         # will change when the lease format is improved to contain both
3406         # create/renew time and duration.
3407         new_expiration_time = now - 3000 + 31*24*60*60
3408
3409         # Some shares have an extra lease which is set to expire at the
3410         # default time in 31 days from now (age=31days). We then run the
3411         # crawler, which will expire the first lease, making some shares get
3412         # deleted and others stay alive (with one remaining lease)
3413
3414         sf0 = _get_sharefile(immutable_si_0)
3415         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3416         sf0_size = os.stat(sf0.home).st_size
3417
3418         # immutable_si_1 gets an extra lease
3419         sf1 = _get_sharefile(immutable_si_1)
3420         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3421
3422         sf2 = _get_sharefile(mutable_si_2)
3423         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3424         sf2_size = os.stat(sf2.home).st_size
3425
3426         # mutable_si_3 gets an extra lease
3427         sf3 = _get_sharefile(mutable_si_3)
3428         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3429
3430         ss.setServiceParent(self.s)
3431
3432         d = fireEventually()
3433         # examine the state right after the first bucket has been processed
3434         def _after_first_bucket(ignored):
3435             p = lc.get_progress()
3436             if not p["cycle-in-progress"]:
3437                 d2 = fireEventually()
3438                 d2.addCallback(_after_first_bucket)
3439                 return d2
3440         d.addCallback(_after_first_bucket)
3441         d.addCallback(lambda ign: self.render1(webstatus))
3442         def _check_html_in_cycle(html):
3443             s = remove_tags(html)
3444             # the first bucket encountered gets deleted, and its prefix
3445             # happens to be about 1/5th of the way through the ring, so the
3446             # predictor thinks we'll have 5 shares and that we'll delete them
3447             # all. This part of the test depends upon the SIs landing right
3448             # where they do now.
3449             self.failUnlessIn("The remainder of this cycle is expected to "
3450                               "recover: 4 shares, 4 buckets", s)
3451             self.failUnlessIn("The whole cycle is expected to examine "
3452                               "5 shares in 5 buckets and to recover: "
3453                               "5 shares, 5 buckets", s)
3454         d.addCallback(_check_html_in_cycle)
3455
3456         # wait for the crawler to finish the first cycle. Two shares should
3457         # have been removed
3458         def _wait():
3459             return bool(lc.get_state()["last-cycle-finished"] is not None)
3460         d.addCallback(lambda ign: self.poll(_wait))
3461
3462         def _after_first_cycle(ignored):
3463             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3464             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3465             self.failUnlessEqual(count_leases(immutable_si_1), 1)
3466             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3467             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3468             self.failUnlessEqual(count_leases(mutable_si_3), 1)
3469
3470             s = lc.get_state()
3471             last = s["history"][0]
3472
3473             self.failUnlessEqual(last["expiration-enabled"], True)
3474             self.failUnlessEqual(last["configured-expiration-mode"],
3475                                  ("cutoff-date", None, then,
3476                                   ("mutable", "immutable")))
3477             self.failUnlessEqual(last["leases-per-share-histogram"],
3478                                  {1: 2, 2: 2})
3479
3480             rec = last["space-recovered"]
3481             self.failUnlessEqual(rec["examined-buckets"], 4)
3482             self.failUnlessEqual(rec["examined-shares"], 4)
3483             self.failUnlessEqual(rec["actual-buckets"], 2)
3484             self.failUnlessEqual(rec["original-buckets"], 0)
3485             self.failUnlessEqual(rec["configured-buckets"], 2)
3486             self.failUnlessEqual(rec["actual-shares"], 2)
3487             self.failUnlessEqual(rec["original-shares"], 0)
3488             self.failUnlessEqual(rec["configured-shares"], 2)
3489             size = sf0_size + sf2_size
3490             self.failUnlessEqual(rec["actual-sharebytes"], size)
3491             self.failUnlessEqual(rec["original-sharebytes"], 0)
3492             self.failUnlessEqual(rec["configured-sharebytes"], size)
3493             # different platforms have different notions of "blocks used by
3494             # this file", so merely assert that it's a number
3495             self.failUnless(rec["actual-diskbytes"] >= 0,
3496                             rec["actual-diskbytes"])
3497             self.failUnless(rec["original-diskbytes"] >= 0,
3498                             rec["original-diskbytes"])
3499             self.failUnless(rec["configured-diskbytes"] >= 0,
3500                             rec["configured-diskbytes"])
3501         d.addCallback(_after_first_cycle)
3502         d.addCallback(lambda ign: self.render1(webstatus))
3503         def _check_html(html):
3504             s = remove_tags(html)
3505             self.failUnlessIn("Expiration Enabled:"
3506                               " expired leases will be removed", s)
3507             date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
3508             substr = "Leases created or last renewed before %s will be considered expired." % date
3509             self.failUnlessIn(substr, s)
3510             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3511         d.addCallback(_check_html)
3512         return d
3513
3514     def test_only_immutable(self):
3515         basedir = "storage/LeaseCrawler/only_immutable"
3516         fileutil.make_dirs(basedir)
3517         now = time.time()
3518         then = int(now - 2000)
3519         ss = StorageServer(basedir, "\x00" * 20,
3520                            expiration_enabled=True,
3521                            expiration_mode="cutoff-date",
3522                            expiration_cutoff_date=then,
3523                            expiration_sharetypes=("immutable",))
3524         lc = ss.lease_checker
3525         lc.slow_start = 0
3526         webstatus = StorageStatus(ss)
3527
3528         self.make_shares(ss)
3529         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3530         # set all leases to be expirable
3531         new_expiration_time = now - 3000 + 31*24*60*60
3532
3533         def count_shares(si):
3534             return len(list(ss._iter_share_files(si)))
3535         def _get_sharefile(si):
3536             return list(ss._iter_share_files(si))[0]
3537         def count_leases(si):
3538             return len(list(_get_sharefile(si).get_leases()))
3539
3540         sf0 = _get_sharefile(immutable_si_0)
3541         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3542         sf1 = _get_sharefile(immutable_si_1)
3543         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3544         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3545         sf2 = _get_sharefile(mutable_si_2)
3546         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3547         sf3 = _get_sharefile(mutable_si_3)
3548         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3549         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3550
3551         ss.setServiceParent(self.s)
3552         def _wait():
3553             return bool(lc.get_state()["last-cycle-finished"] is not None)
3554         d = self.poll(_wait)
3555
3556         def _after_first_cycle(ignored):
3557             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3558             self.failUnlessEqual(count_shares(immutable_si_1), 0)
3559             self.failUnlessEqual(count_shares(mutable_si_2), 1)
3560             self.failUnlessEqual(count_leases(mutable_si_2), 1)
3561             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3562             self.failUnlessEqual(count_leases(mutable_si_3), 2)
3563         d.addCallback(_after_first_cycle)
3564         d.addCallback(lambda ign: self.render1(webstatus))
3565         def _check_html(html):
3566             s = remove_tags(html)
3567             self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
3568         d.addCallback(_check_html)
3569         return d
3570
3571     def test_only_mutable(self):
3572         basedir = "storage/LeaseCrawler/only_mutable"
3573         fileutil.make_dirs(basedir)
3574         now = time.time()
3575         then = int(now - 2000)
3576         ss = StorageServer(basedir, "\x00" * 20,
3577                            expiration_enabled=True,
3578                            expiration_mode="cutoff-date",
3579                            expiration_cutoff_date=then,
3580                            expiration_sharetypes=("mutable",))
3581         lc = ss.lease_checker
3582         lc.slow_start = 0
3583         webstatus = StorageStatus(ss)
3584
3585         self.make_shares(ss)
3586         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3587         # set all leases to be expirable
3588         new_expiration_time = now - 3000 + 31*24*60*60
3589
3590         def count_shares(si):
3591             return len(list(ss._iter_share_files(si)))
3592         def _get_sharefile(si):
3593             return list(ss._iter_share_files(si))[0]
3594         def count_leases(si):
3595             return len(list(_get_sharefile(si).get_leases()))
3596
3597         sf0 = _get_sharefile(immutable_si_0)
3598         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3599         sf1 = _get_sharefile(immutable_si_1)
3600         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3601         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3602         sf2 = _get_sharefile(mutable_si_2)
3603         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3604         sf3 = _get_sharefile(mutable_si_3)
3605         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3606         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3607
3608         ss.setServiceParent(self.s)
3609         def _wait():
3610             return bool(lc.get_state()["last-cycle-finished"] is not None)
3611         d = self.poll(_wait)
3612
3613         def _after_first_cycle(ignored):
3614             self.failUnlessEqual(count_shares(immutable_si_0), 1)
3615             self.failUnlessEqual(count_leases(immutable_si_0), 1)
3616             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3617             self.failUnlessEqual(count_leases(immutable_si_1), 2)
3618             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3619             self.failUnlessEqual(count_shares(mutable_si_3), 0)
3620         d.addCallback(_after_first_cycle)
3621         d.addCallback(lambda ign: self.render1(webstatus))
3622         def _check_html(html):
3623             s = remove_tags(html)
3624             self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
3625         d.addCallback(_check_html)
3626         return d
3627
3628     def test_bad_mode(self):
3629         basedir = "storage/LeaseCrawler/bad_mode"
3630         fileutil.make_dirs(basedir)
3631         e = self.failUnlessRaises(ValueError,
3632                                   StorageServer, basedir, "\x00" * 20,
3633                                   expiration_mode="bogus")
3634         self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
3635
3636     def test_parse_duration(self):
3637         DAY = 24*60*60
3638         MONTH = 31*DAY
3639         YEAR = 365*DAY
3640         p = time_format.parse_duration
3641         self.failUnlessEqual(p("7days"), 7*DAY)
3642         self.failUnlessEqual(p("31day"), 31*DAY)
3643         self.failUnlessEqual(p("60 days"), 60*DAY)
3644         self.failUnlessEqual(p("2mo"), 2*MONTH)
3645         self.failUnlessEqual(p("3 month"), 3*MONTH)
3646         self.failUnlessEqual(p("2years"), 2*YEAR)
3647         e = self.failUnlessRaises(ValueError, p, "2kumquats")
3648         self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
3649
3650     def test_parse_date(self):
3651         p = time_format.parse_date
3652         self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
3653         self.failUnlessEqual(p("2009-03-18"), 1237334400)
3654
3655     def test_limited_history(self):
3656         basedir = "storage/LeaseCrawler/limited_history"
3657         fileutil.make_dirs(basedir)
3658         ss = StorageServer(basedir, "\x00" * 20)
3659         # make it start sooner than usual.
3660         lc = ss.lease_checker
3661         lc.slow_start = 0
3662         lc.cpu_slice = 500
3663
3664         # create a few shares, with some leases on them
3665         self.make_shares(ss)
3666
3667         ss.setServiceParent(self.s)
3668
3669         def _wait_until_15_cycles_done():
3670             last = lc.state["last-cycle-finished"]
3671             if last is not None and last >= 15:
3672                 return True
3673             if lc.timer:
3674                 lc.timer.reset(0)
3675             return False
3676         d = self.poll(_wait_until_15_cycles_done)
3677
3678         def _check(ignored):
3679             s = lc.get_state()
3680             h = s["history"]
3681             self.failUnlessEqual(len(h), 10)
3682             self.failUnlessEqual(max(h.keys()), 15)
3683             self.failUnlessEqual(min(h.keys()), 6)
3684         d.addCallback(_check)
3685         return d
3686
3687     def test_unpredictable_future(self):
3688         basedir = "storage/LeaseCrawler/unpredictable_future"
3689         fileutil.make_dirs(basedir)
3690         ss = StorageServer(basedir, "\x00" * 20)
3691         # make it start sooner than usual.
3692         lc = ss.lease_checker
3693         lc.slow_start = 0
3694         lc.cpu_slice = -1.0 # stop quickly
3695
3696         self.make_shares(ss)
3697
3698         ss.setServiceParent(self.s)
3699
3700         d = fireEventually()
3701         def _check(ignored):
3702             # this should fire after the first bucket is complete, but before
3703             # the first prefix is complete, so the progress-measurer won't
3704             # think we've gotten far enough to raise our percent-complete
3705             # above 0%, triggering the cannot-predict-the-future code in
3706             # expirer.py . This will have to change if/when the
3707             # progress-measurer gets smart enough to count buckets (we'll
3708             # have to interrupt it even earlier, before it's finished the
3709             # first bucket).
3710             s = lc.get_state()
3711             if "cycle-to-date" not in s:
3712                 d2 = fireEventually()
3713                 d2.addCallback(_check)
3714                 return d2
3715             self.failUnlessIn("cycle-to-date", s)
3716             self.failUnlessIn("estimated-remaining-cycle", s)
3717             self.failUnlessIn("estimated-current-cycle", s)
3718
3719             left = s["estimated-remaining-cycle"]["space-recovered"]
3720             self.failUnlessEqual(left["actual-buckets"], None)
3721             self.failUnlessEqual(left["original-buckets"], None)
3722             self.failUnlessEqual(left["configured-buckets"], None)
3723             self.failUnlessEqual(left["actual-shares"], None)
3724             self.failUnlessEqual(left["original-shares"], None)
3725             self.failUnlessEqual(left["configured-shares"], None)
3726             self.failUnlessEqual(left["actual-diskbytes"], None)
3727             self.failUnlessEqual(left["original-diskbytes"], None)
3728             self.failUnlessEqual(left["configured-diskbytes"], None)
3729             self.failUnlessEqual(left["actual-sharebytes"], None)
3730             self.failUnlessEqual(left["original-sharebytes"], None)
3731             self.failUnlessEqual(left["configured-sharebytes"], None)
3732
3733             full = s["estimated-remaining-cycle"]["space-recovered"]
3734             self.failUnlessEqual(full["actual-buckets"], None)
3735             self.failUnlessEqual(full["original-buckets"], None)
3736             self.failUnlessEqual(full["configured-buckets"], None)
3737             self.failUnlessEqual(full["actual-shares"], None)
3738             self.failUnlessEqual(full["original-shares"], None)
3739             self.failUnlessEqual(full["configured-shares"], None)
3740             self.failUnlessEqual(full["actual-diskbytes"], None)
3741             self.failUnlessEqual(full["original-diskbytes"], None)
3742             self.failUnlessEqual(full["configured-diskbytes"], None)
3743             self.failUnlessEqual(full["actual-sharebytes"], None)
3744             self.failUnlessEqual(full["original-sharebytes"], None)
3745             self.failUnlessEqual(full["configured-sharebytes"], None)
3746
3747         d.addCallback(_check)
3748         return d
3749
3750     def test_no_st_blocks(self):
3751         basedir = "storage/LeaseCrawler/no_st_blocks"
3752         fileutil.make_dirs(basedir)
3753         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
3754                                         expiration_mode="age",
3755                                         expiration_override_lease_duration=-1000)
3756         # a negative expiration_time= means the "configured-"
3757         # space-recovered counts will be non-zero, since all shares will have
3758         # expired by then
3759
3760         # make it start sooner than usual.
3761         lc = ss.lease_checker
3762         lc.slow_start = 0
3763
3764         self.make_shares(ss)
3765         ss.setServiceParent(self.s)
3766         def _wait():
3767             return bool(lc.get_state()["last-cycle-finished"] is not None)
3768         d = self.poll(_wait)
3769
3770         def _check(ignored):
3771             s = lc.get_state()
3772             last = s["history"][0]
3773             rec = last["space-recovered"]
3774             self.failUnlessEqual(rec["configured-buckets"], 4)
3775             self.failUnlessEqual(rec["configured-shares"], 4)
3776             self.failUnless(rec["configured-sharebytes"] > 0,
3777                             rec["configured-sharebytes"])
3778             # without the .st_blocks field in os.stat() results, we should be
3779             # reporting diskbytes==sharebytes
3780             self.failUnlessEqual(rec["configured-sharebytes"],
3781                                  rec["configured-diskbytes"])
3782         d.addCallback(_check)
3783         return d
3784
3785     def test_share_corruption(self):
3786         self._poll_should_ignore_these_errors = [
3787             UnknownMutableContainerVersionError,
3788             UnknownImmutableContainerVersionError,
3789             ]
3790         basedir = "storage/LeaseCrawler/share_corruption"
3791         fileutil.make_dirs(basedir)
3792         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3793         w = StorageStatus(ss)
3794         # make it start sooner than usual.
3795         lc = ss.lease_checker
3796         lc.stop_after_first_bucket = True
3797         lc.slow_start = 0
3798         lc.cpu_slice = 500
3799
3800         # create a few shares, with some leases on them
3801         self.make_shares(ss)
3802
3803         # now corrupt one, and make sure the lease-checker keeps going
3804         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3805         first = min(self.sis)
3806         first_b32 = base32.b2a(first)
3807         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
3808         f = open(fn, "rb+")
3809         f.seek(0)
3810         f.write("BAD MAGIC")
3811         f.close()
3812         # if get_share_file() doesn't see the correct mutable magic, it
3813         # assumes the file is an immutable share, and then
3814         # immutable.ShareFile sees a bad version. So regardless of which kind
3815         # of share we corrupted, this will trigger an
3816         # UnknownImmutableContainerVersionError.
3817
3818         # also create an empty bucket
3819         empty_si = base32.b2a("\x04"*16)
3820         empty_bucket_dir = os.path.join(ss.sharedir,
3821                                         storage_index_to_dir(empty_si))
3822         fileutil.make_dirs(empty_bucket_dir)
3823
3824         ss.setServiceParent(self.s)
3825
3826         d = fireEventually()
3827
3828         # now examine the state right after the first bucket has been
3829         # processed.
3830         def _after_first_bucket(ignored):
3831             s = lc.get_state()
3832             if "cycle-to-date" not in s:
3833                 d2 = fireEventually()
3834                 d2.addCallback(_after_first_bucket)
3835                 return d2
3836             so_far = s["cycle-to-date"]
3837             rec = so_far["space-recovered"]
3838             self.failUnlessEqual(rec["examined-buckets"], 1)
3839             self.failUnlessEqual(rec["examined-shares"], 0)
3840             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
3841         d.addCallback(_after_first_bucket)
3842
3843         d.addCallback(lambda ign: self.render_json(w))
3844         def _check_json(json):
3845             data = simplejson.loads(json)
3846             # grr. json turns all dict keys into strings.
3847             so_far = data["lease-checker"]["cycle-to-date"]
3848             corrupt_shares = so_far["corrupt-shares"]
3849             # it also turns all tuples into lists
3850             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3851         d.addCallback(_check_json)
3852         d.addCallback(lambda ign: self.render1(w))
3853         def _check_html(html):
3854             s = remove_tags(html)
3855             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3856         d.addCallback(_check_html)
3857
3858         def _wait():
3859             return bool(lc.get_state()["last-cycle-finished"] is not None)
3860         d.addCallback(lambda ign: self.poll(_wait))
3861
3862         def _after_first_cycle(ignored):
3863             s = lc.get_state()
3864             last = s["history"][0]
3865             rec = last["space-recovered"]
3866             self.failUnlessEqual(rec["examined-buckets"], 5)
3867             self.failUnlessEqual(rec["examined-shares"], 3)
3868             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
3869         d.addCallback(_after_first_cycle)
3870         d.addCallback(lambda ign: self.render_json(w))
3871         def _check_json_history(json):
3872             data = simplejson.loads(json)
3873             last = data["lease-checker"]["history"]["0"]
3874             corrupt_shares = last["corrupt-shares"]
3875             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3876         d.addCallback(_check_json_history)
3877         d.addCallback(lambda ign: self.render1(w))
3878         def _check_html_history(html):
3879             s = remove_tags(html)
3880             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3881         d.addCallback(_check_html_history)
3882
3883         def _cleanup(res):
3884             self.flushLoggedErrors(UnknownMutableContainerVersionError,
3885                                    UnknownImmutableContainerVersionError)
3886             return res
3887         d.addBoth(_cleanup)
3888         return d
3889
3890     def render_json(self, page):
3891         d = self.render1(page, args={"t": ["json"]})
3892         return d
3893
3894 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3895
3896     def setUp(self):
3897         self.s = service.MultiService()
3898         self.s.startService()
3899     def tearDown(self):
3900         return self.s.stopService()
3901
3902     def test_no_server(self):
3903         w = StorageStatus(None)
3904         html = w.renderSynchronously()
3905         self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
3906
3907     def test_status(self):
3908         basedir = "storage/WebStatus/status"
3909         fileutil.make_dirs(basedir)
3910         ss = StorageServer(basedir, "\x00" * 20)
3911         ss.setServiceParent(self.s)
3912         w = StorageStatus(ss)
3913         d = self.render1(w)
3914         def _check_html(html):
3915             self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3916             s = remove_tags(html)
3917             self.failUnlessIn("Accepting new shares: Yes", s)
3918             self.failUnlessIn("Reserved space: - 0 B (0)", s)
3919         d.addCallback(_check_html)
3920         d.addCallback(lambda ign: self.render_json(w))
3921         def _check_json(json):
3922             data = simplejson.loads(json)
3923             s = data["stats"]
3924             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
3925             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
3926             self.failUnlessIn("bucket-counter", data)
3927             self.failUnlessIn("lease-checker", data)
3928         d.addCallback(_check_json)
3929         return d
3930
3931     def render_json(self, page):
3932         d = self.render1(page, args={"t": ["json"]})
3933         return d
3934
3935     @mock.patch('allmydata.util.fileutil.get_disk_stats')
3936     def test_status_no_disk_stats(self, mock_get_disk_stats):
3937         mock_get_disk_stats.side_effect = AttributeError()
3938
3939         # Some platforms may have no disk stats API. Make sure the code can handle that
3940         # (test runs on all platforms).
3941         basedir = "storage/WebStatus/status_no_disk_stats"
3942         fileutil.make_dirs(basedir)
3943         ss = StorageServer(basedir, "\x00" * 20)
3944         ss.setServiceParent(self.s)
3945         w = StorageStatus(ss)
3946         html = w.renderSynchronously()
3947         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3948         s = remove_tags(html)
3949         self.failUnlessIn("Accepting new shares: Yes", s)
3950         self.failUnlessIn("Total disk space: ?", s)
3951         self.failUnlessIn("Space Available to Tahoe: ?", s)
3952         self.failUnless(ss.get_available_space() is None)
3953
3954     @mock.patch('allmydata.util.fileutil.get_disk_stats')
3955     def test_status_bad_disk_stats(self, mock_get_disk_stats):
3956         mock_get_disk_stats.side_effect = OSError()
3957
3958         # If the API to get disk stats exists but a call to it fails, then the status should
3959         # show that no shares will be accepted, and get_available_space() should be 0.
3960         basedir = "storage/WebStatus/status_bad_disk_stats"
3961         fileutil.make_dirs(basedir)
3962         ss = StorageServer(basedir, "\x00" * 20)
3963         ss.setServiceParent(self.s)
3964         w = StorageStatus(ss)
3965         html = w.renderSynchronously()
3966         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3967         s = remove_tags(html)
3968         self.failUnlessIn("Accepting new shares: No", s)
3969         self.failUnlessIn("Total disk space: ?", s)
3970         self.failUnlessIn("Space Available to Tahoe: ?", s)
3971         self.failUnlessEqual(ss.get_available_space(), 0)
3972
3973     @mock.patch('allmydata.util.fileutil.get_disk_stats')
3974     def test_status_right_disk_stats(self, mock_get_disk_stats):
3975         GB = 1000000000
3976         total            = 5*GB
3977         free_for_root    = 4*GB
3978         free_for_nonroot = 3*GB
3979         reserved_space   = 1*GB
3980         used = total - free_for_root
3981         avail = max(free_for_nonroot - reserved_space, 0)
3982         mock_get_disk_stats.return_value = {
3983             'total': total,
3984             'free_for_root': free_for_root,
3985             'free_for_nonroot': free_for_nonroot,
3986             'used': used,
3987             'avail': avail,
3988         }
3989
3990         basedir = "storage/WebStatus/status_right_disk_stats"
3991         fileutil.make_dirs(basedir)
3992         ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved_space)
3993         expecteddir = ss.sharedir
3994         ss.setServiceParent(self.s)
3995         w = StorageStatus(ss)
3996         html = w.renderSynchronously()
3997
3998         self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
3999                     mock_get_disk_stats.call_args_list)
4000
4001         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4002         s = remove_tags(html)
4003         self.failUnlessIn("Total disk space: 5.00 GB", s)
4004         self.failUnlessIn("Disk space used: - 1.00 GB", s)
4005         self.failUnlessIn("Disk space free (root): 4.00 GB", s)
4006         self.failUnlessIn("Disk space free (non-root): 3.00 GB", s)
4007         self.failUnlessIn("Reserved space: - 1.00 GB", s)
4008         self.failUnlessIn("Space Available to Tahoe: 2.00 GB", s)
4009         self.failUnlessEqual(ss.get_available_space(), 2*GB)
4010
4011     def test_readonly(self):
4012         basedir = "storage/WebStatus/readonly"
4013         fileutil.make_dirs(basedir)
4014         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
4015         ss.setServiceParent(self.s)
4016         w = StorageStatus(ss)
4017         html = w.renderSynchronously()
4018         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4019         s = remove_tags(html)
4020         self.failUnlessIn("Accepting new shares: No", s)
4021
4022     def test_reserved(self):
4023         basedir = "storage/WebStatus/reserved"
4024         fileutil.make_dirs(basedir)
4025         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4026         ss.setServiceParent(self.s)
4027         w = StorageStatus(ss)
4028         html = w.renderSynchronously()
4029         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4030         s = remove_tags(html)
4031         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4032
4033     def test_huge_reserved(self):
4034         basedir = "storage/WebStatus/reserved"
4035         fileutil.make_dirs(basedir)
4036         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4037         ss.setServiceParent(self.s)
4038         w = StorageStatus(ss)
4039         html = w.renderSynchronously()
4040         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4041         s = remove_tags(html)
4042         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4043
4044     def test_util(self):
4045         w = StorageStatus(None)
4046         self.failUnlessEqual(w.render_space(None, None), "?")
4047         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
4048         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
4049         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
4050         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
4051         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)