]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
08c513cdcca048bcaf5cf7e718f1da623347244c
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1 import time, os.path, platform, stat, re, simplejson, struct, shutil
2
3 import mock
4
5 from twisted.trial import unittest
6
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
10 import itertools
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22      ReadBucketProxy
23 from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
24                                      LayoutInvalid, MDMFSIGNABLEHEADER, \
25                                      SIGNED_PREFIX, MDMFHEADER, \
26                                      MDMFOFFSETS, SDMFSlotWriteProxy, \
27                                      PRIVATE_KEY_SIZE, \
28                                      SIGNATURE_SIZE, \
29                                      VERIFICATION_KEY_SIZE, \
30                                      SHARE_HASH_CHAIN_SIZE
31 from allmydata.interfaces import BadWriteEnablerError
32 from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
33 from allmydata.test.common_web import WebRenderingMixin
34 from allmydata.test.no_network import NoNetworkServer
35 from allmydata.web.storage import StorageStatus, remove_prefix
36
37 class Marker:
38     pass
39 class FakeCanary:
40     def __init__(self, ignore_disconnectors=False):
41         self.ignore = ignore_disconnectors
42         self.disconnectors = {}
43     def notifyOnDisconnect(self, f, *args, **kwargs):
44         if self.ignore:
45             return
46         m = Marker()
47         self.disconnectors[m] = (f, args, kwargs)
48         return m
49     def dontNotifyOnDisconnect(self, marker):
50         if self.ignore:
51             return
52         del self.disconnectors[marker]
53
54 class FakeStatsProvider:
55     def count(self, name, delta=1):
56         pass
57     def register_producer(self, producer):
58         pass
59
60 class Bucket(unittest.TestCase):
61     def make_workdir(self, name):
62         basedir = os.path.join("storage", "Bucket", name)
63         incoming = os.path.join(basedir, "tmp", "bucket")
64         final = os.path.join(basedir, "bucket")
65         fileutil.make_dirs(basedir)
66         fileutil.make_dirs(os.path.join(basedir, "tmp"))
67         return incoming, final
68
69     def bucket_writer_closed(self, bw, consumed):
70         pass
71     def add_latency(self, category, latency):
72         pass
73     def count(self, name, delta=1):
74         pass
75
76     def make_lease(self):
77         owner_num = 0
78         renew_secret = os.urandom(32)
79         cancel_secret = os.urandom(32)
80         expiration_time = time.time() + 5000
81         return LeaseInfo(owner_num, renew_secret, cancel_secret,
82                          expiration_time, "\x00" * 20)
83
84     def test_create(self):
85         incoming, final = self.make_workdir("test_create")
86         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
87                           FakeCanary())
88         bw.remote_write(0, "a"*25)
89         bw.remote_write(25, "b"*25)
90         bw.remote_write(50, "c"*25)
91         bw.remote_write(75, "d"*7)
92         bw.remote_close()
93
94     def test_readwrite(self):
95         incoming, final = self.make_workdir("test_readwrite")
96         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
97                           FakeCanary())
98         bw.remote_write(0, "a"*25)
99         bw.remote_write(25, "b"*25)
100         bw.remote_write(50, "c"*7) # last block may be short
101         bw.remote_close()
102
103         # now read from it
104         br = BucketReader(self, bw.finalhome)
105         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
106         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
107         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
108
109     def test_read_past_end_of_share_data(self):
110         # test vector for immutable files (hard-coded contents of an immutable share
111         # file):
112
113         # The following immutable share file content is identical to that
114         # generated with storage.immutable.ShareFile from Tahoe-LAFS v1.8.2
115         # with share data == 'a'. The total size of this content is 85
116         # bytes.
117
118         containerdata = struct.pack('>LLL', 1, 1, 1)
119
120         # A Tahoe-LAFS storage client would send as the share_data a
121         # complicated string involving hash trees and a URI Extension Block
122         # -- see allmydata/immutable/layout.py . This test, which is
123         # simulating a client, just sends 'a'.
124         share_data = 'a'
125
126         ownernumber = struct.pack('>L', 0)
127         renewsecret  = 'THIS LETS ME RENEW YOUR FILE....'
128         assert len(renewsecret) == 32
129         cancelsecret = 'THIS LETS ME KILL YOUR FILE HAHA'
130         assert len(cancelsecret) == 32
131         expirationtime = struct.pack('>L', 60*60*24*31) # 31 days in seconds
132
133         lease_data = ownernumber + renewsecret + cancelsecret + expirationtime
134
135         share_file_data = containerdata + share_data + lease_data
136
137         incoming, final = self.make_workdir("test_read_past_end_of_share_data")
138
139         fileutil.write(final, share_file_data)
140
141         mockstorageserver = mock.Mock()
142
143         # Now read from it.
144         br = BucketReader(mockstorageserver, final)
145
146         self.failUnlessEqual(br.remote_read(0, len(share_data)), share_data)
147
148         # Read past the end of share data to get the cancel secret.
149         read_length = len(share_data) + len(ownernumber) + len(renewsecret) + len(cancelsecret)
150
151         result_of_read = br.remote_read(0, read_length)
152         self.failUnlessEqual(result_of_read, share_data)
153
154         result_of_read = br.remote_read(0, len(share_data)+1)
155         self.failUnlessEqual(result_of_read, share_data)
156
157 class RemoteBucket:
158
159     def __init__(self):
160         self.read_count = 0
161         self.write_count = 0
162
163     def callRemote(self, methname, *args, **kwargs):
164         def _call():
165             meth = getattr(self.target, "remote_" + methname)
166             return meth(*args, **kwargs)
167
168         if methname == "slot_readv":
169             self.read_count += 1
170         if "writev" in methname:
171             self.write_count += 1
172
173         return defer.maybeDeferred(_call)
174
175
176 class BucketProxy(unittest.TestCase):
177     def make_bucket(self, name, size):
178         basedir = os.path.join("storage", "BucketProxy", name)
179         incoming = os.path.join(basedir, "tmp", "bucket")
180         final = os.path.join(basedir, "bucket")
181         fileutil.make_dirs(basedir)
182         fileutil.make_dirs(os.path.join(basedir, "tmp"))
183         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
184                           FakeCanary())
185         rb = RemoteBucket()
186         rb.target = bw
187         return bw, rb, final
188
189     def make_lease(self):
190         owner_num = 0
191         renew_secret = os.urandom(32)
192         cancel_secret = os.urandom(32)
193         expiration_time = time.time() + 5000
194         return LeaseInfo(owner_num, renew_secret, cancel_secret,
195                          expiration_time, "\x00" * 20)
196
197     def bucket_writer_closed(self, bw, consumed):
198         pass
199     def add_latency(self, category, latency):
200         pass
201     def count(self, name, delta=1):
202         pass
203
204     def test_create(self):
205         bw, rb, sharefname = self.make_bucket("test_create", 500)
206         bp = WriteBucketProxy(rb, None,
207                               data_size=300,
208                               block_size=10,
209                               num_segments=5,
210                               num_share_hashes=3,
211                               uri_extension_size_max=500)
212         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
213
214     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
215         # Let's pretend each share has 100 bytes of data, and that there are
216         # 4 segments (25 bytes each), and 8 shares total. So the two
217         # per-segment merkle trees (crypttext_hash_tree,
218         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
219         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
220         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
221         # long. That should make the whole share:
222         #
223         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
224         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
225
226         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
227
228         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
229                             for i in range(7)]
230         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
231                         for i in range(7)]
232         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
233                         for i in (1,9,13)]
234         uri_extension = "s" + "E"*498 + "e"
235
236         bw, rb, sharefname = self.make_bucket(name, sharesize)
237         bp = wbp_class(rb, None,
238                        data_size=95,
239                        block_size=25,
240                        num_segments=4,
241                        num_share_hashes=3,
242                        uri_extension_size_max=len(uri_extension))
243
244         d = bp.put_header()
245         d.addCallback(lambda res: bp.put_block(0, "a"*25))
246         d.addCallback(lambda res: bp.put_block(1, "b"*25))
247         d.addCallback(lambda res: bp.put_block(2, "c"*25))
248         d.addCallback(lambda res: bp.put_block(3, "d"*20))
249         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
250         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
251         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
252         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
253         d.addCallback(lambda res: bp.close())
254
255         # now read everything back
256         def _start_reading(res):
257             br = BucketReader(self, sharefname)
258             rb = RemoteBucket()
259             rb.target = br
260             server = NoNetworkServer("abc", None)
261             rbp = rbp_class(rb, server, storage_index="")
262             self.failUnlessIn("to peer", repr(rbp))
263             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
264
265             d1 = rbp.get_block_data(0, 25, 25)
266             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
267             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
268             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
269             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
270             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
271             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
272             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
273
274             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
275             d1.addCallback(lambda res:
276                            self.failUnlessEqual(res, crypttext_hashes))
277             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
278             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
279             d1.addCallback(lambda res: rbp.get_share_hashes())
280             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
281             d1.addCallback(lambda res: rbp.get_uri_extension())
282             d1.addCallback(lambda res:
283                            self.failUnlessEqual(res, uri_extension))
284
285             return d1
286
287         d.addCallback(_start_reading)
288
289         return d
290
291     def test_readwrite_v1(self):
292         return self._do_test_readwrite("test_readwrite_v1",
293                                        0x24, WriteBucketProxy, ReadBucketProxy)
294
295     def test_readwrite_v2(self):
296         return self._do_test_readwrite("test_readwrite_v2",
297                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
298
299 class Server(unittest.TestCase):
300
301     def setUp(self):
302         self.sparent = LoggingServiceParent()
303         self.sparent.startService()
304         self._lease_secret = itertools.count()
305     def tearDown(self):
306         return self.sparent.stopService()
307
308     def workdir(self, name):
309         basedir = os.path.join("storage", "Server", name)
310         return basedir
311
312     def create(self, name, reserved_space=0, klass=StorageServer):
313         workdir = self.workdir(name)
314         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
315                    stats_provider=FakeStatsProvider())
316         ss.setServiceParent(self.sparent)
317         return ss
318
319     def test_create(self):
320         self.create("test_create")
321
322     def test_declares_fixed_1528(self):
323         ss = self.create("test_declares_fixed_1528")
324         ver = ss.remote_get_version()
325         sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
326         self.failUnless(sv1.get('prevents-read-past-end-of-share-data'), sv1)
327
328     def test_declares_maximum_share_sizes(self):
329         ss = self.create("test_declares_maximum_share_sizes")
330         ver = ss.remote_get_version()
331         sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
332         self.failUnlessIn('maximum-immutable-share-size', sv1)
333         self.failUnlessIn('maximum-mutable-share-size', sv1)
334
335     def allocate(self, ss, storage_index, sharenums, size, canary=None):
336         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
337         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
338         if not canary:
339             canary = FakeCanary()
340         return ss.remote_allocate_buckets(storage_index,
341                                           renew_secret, cancel_secret,
342                                           sharenums, size, canary)
343
344     def test_large_share(self):
345         syslow = platform.system().lower()
346         if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
347             raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
348
349         avail = fileutil.get_available_space('.', 512*2**20)
350         if avail <= 4*2**30:
351             raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
352
353         ss = self.create("test_large_share")
354
355         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
356         self.failUnlessEqual(already, set())
357         self.failUnlessEqual(set(writers.keys()), set([0]))
358
359         shnum, bucket = writers.items()[0]
360         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
361         bucket.remote_write(2**32, "ab")
362         bucket.remote_close()
363
364         readers = ss.remote_get_buckets("allocate")
365         reader = readers[shnum]
366         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
367
368     def test_dont_overfill_dirs(self):
369         """
370         This test asserts that if you add a second share whose storage index
371         share lots of leading bits with an extant share (but isn't the exact
372         same storage index), this won't add an entry to the share directory.
373         """
374         ss = self.create("test_dont_overfill_dirs")
375         already, writers = self.allocate(ss, "storageindex", [0], 10)
376         for i, wb in writers.items():
377             wb.remote_write(0, "%10d" % i)
378             wb.remote_close()
379         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
380                                 "shares")
381         children_of_storedir = set(os.listdir(storedir))
382
383         # Now store another one under another storageindex that has leading
384         # chars the same as the first storageindex.
385         already, writers = self.allocate(ss, "storageindey", [0], 10)
386         for i, wb in writers.items():
387             wb.remote_write(0, "%10d" % i)
388             wb.remote_close()
389         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
390                                 "shares")
391         new_children_of_storedir = set(os.listdir(storedir))
392         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
393
394     def test_remove_incoming(self):
395         ss = self.create("test_remove_incoming")
396         already, writers = self.allocate(ss, "vid", range(3), 10)
397         for i,wb in writers.items():
398             wb.remote_write(0, "%10d" % i)
399             wb.remote_close()
400         incoming_share_dir = wb.incominghome
401         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
402         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
403         incoming_dir = os.path.dirname(incoming_prefix_dir)
404         self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
405         self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
406         self.failUnless(os.path.exists(incoming_dir), incoming_dir)
407
408     def test_abort(self):
409         # remote_abort, when called on a writer, should make sure that
410         # the allocated size of the bucket is not counted by the storage
411         # server when accounting for space.
412         ss = self.create("test_abort")
413         already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
414         self.failIfEqual(ss.allocated_size(), 0)
415
416         # Now abort the writers.
417         for writer in writers.itervalues():
418             writer.remote_abort()
419         self.failUnlessEqual(ss.allocated_size(), 0)
420
421
422     def test_allocate(self):
423         ss = self.create("test_allocate")
424
425         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
426
427         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
428         self.failUnlessEqual(already, set())
429         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
430
431         # while the buckets are open, they should not count as readable
432         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
433
434         # close the buckets
435         for i,wb in writers.items():
436             wb.remote_write(0, "%25d" % i)
437             wb.remote_close()
438             # aborting a bucket that was already closed is a no-op
439             wb.remote_abort()
440
441         # now they should be readable
442         b = ss.remote_get_buckets("allocate")
443         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
444         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
445         b_str = str(b[0])
446         self.failUnlessIn("BucketReader", b_str)
447         self.failUnlessIn("mfwgy33dmf2g 0", b_str)
448
449         # now if we ask about writing again, the server should offer those
450         # three buckets as already present. It should offer them even if we
451         # don't ask about those specific ones.
452         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
453         self.failUnlessEqual(already, set([0,1,2]))
454         self.failUnlessEqual(set(writers.keys()), set([3,4]))
455
456         # while those two buckets are open for writing, the server should
457         # refuse to offer them to uploaders
458
459         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
460         self.failUnlessEqual(already2, set([0,1,2]))
461         self.failUnlessEqual(set(writers2.keys()), set([5]))
462
463         # aborting the writes should remove the tempfiles
464         for i,wb in writers2.items():
465             wb.remote_abort()
466         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
467         self.failUnlessEqual(already2, set([0,1,2]))
468         self.failUnlessEqual(set(writers2.keys()), set([5]))
469
470         for i,wb in writers2.items():
471             wb.remote_abort()
472         for i,wb in writers.items():
473             wb.remote_abort()
474
475     def test_bad_container_version(self):
476         ss = self.create("test_bad_container_version")
477         a,w = self.allocate(ss, "si1", [0], 10)
478         w[0].remote_write(0, "\xff"*10)
479         w[0].remote_close()
480
481         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
482         f = open(fn, "rb+")
483         f.seek(0)
484         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
485         f.close()
486
487         ss.remote_get_buckets("allocate")
488
489         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
490                                   ss.remote_get_buckets, "si1")
491         self.failUnlessIn(" had version 0 but we wanted 1", str(e))
492
493     def test_disconnect(self):
494         # simulate a disconnection
495         ss = self.create("test_disconnect")
496         canary = FakeCanary()
497         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
498         self.failUnlessEqual(already, set())
499         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
500         for (f,args,kwargs) in canary.disconnectors.values():
501             f(*args, **kwargs)
502         del already
503         del writers
504
505         # that ought to delete the incoming shares
506         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
507         self.failUnlessEqual(already, set())
508         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
509
510     @mock.patch('allmydata.util.fileutil.get_disk_stats')
511     def test_reserved_space(self, mock_get_disk_stats):
512         reserved_space=10000
513         mock_get_disk_stats.return_value = {
514             'free_for_nonroot': 15000,
515             'avail': max(15000 - reserved_space, 0),
516             }
517
518         ss = self.create("test_reserved_space", reserved_space=reserved_space)
519         # 15k available, 10k reserved, leaves 5k for shares
520
521         # a newly created and filled share incurs this much overhead, beyond
522         # the size we request.
523         OVERHEAD = 3*4
524         LEASE_SIZE = 4+32+32+4
525         canary = FakeCanary(True)
526         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
527         self.failUnlessEqual(len(writers), 3)
528         # now the StorageServer should have 3000 bytes provisionally
529         # allocated, allowing only 2000 more to be claimed
530         self.failUnlessEqual(len(ss._active_writers), 3)
531
532         # allocating 1001-byte shares only leaves room for one
533         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
534         self.failUnlessEqual(len(writers2), 1)
535         self.failUnlessEqual(len(ss._active_writers), 4)
536
537         # we abandon the first set, so their provisional allocation should be
538         # returned
539         del already
540         del writers
541         self.failUnlessEqual(len(ss._active_writers), 1)
542         # now we have a provisional allocation of 1001 bytes
543
544         # and we close the second set, so their provisional allocation should
545         # become real, long-term allocation, and grows to include the
546         # overhead.
547         for bw in writers2.values():
548             bw.remote_write(0, "a"*25)
549             bw.remote_close()
550         del already2
551         del writers2
552         del bw
553         self.failUnlessEqual(len(ss._active_writers), 0)
554
555         allocated = 1001 + OVERHEAD + LEASE_SIZE
556
557         # we have to manually increase available, since we're not doing real
558         # disk measurements
559         mock_get_disk_stats.return_value = {
560             'free_for_nonroot': 15000 - allocated,
561             'avail': max(15000 - allocated - reserved_space, 0),
562             }
563
564         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
565         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
566         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
567         self.failUnlessEqual(len(writers3), 39)
568         self.failUnlessEqual(len(ss._active_writers), 39)
569
570         del already3
571         del writers3
572         self.failUnlessEqual(len(ss._active_writers), 0)
573         ss.disownServiceParent()
574         del ss
575
576     def test_seek(self):
577         basedir = self.workdir("test_seek_behavior")
578         fileutil.make_dirs(basedir)
579         filename = os.path.join(basedir, "testfile")
580         f = open(filename, "wb")
581         f.write("start")
582         f.close()
583         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
584         # files. mode="a" preserves previous contents but does not allow
585         # seeking-to-create-holes. mode="r+" allows both.
586         f = open(filename, "rb+")
587         f.seek(100)
588         f.write("100")
589         f.close()
590         filelen = os.stat(filename)[stat.ST_SIZE]
591         self.failUnlessEqual(filelen, 100+3)
592         f2 = open(filename, "rb")
593         self.failUnlessEqual(f2.read(5), "start")
594
595
596     def test_leases(self):
597         ss = self.create("test_leases")
598         canary = FakeCanary()
599         sharenums = range(5)
600         size = 100
601
602         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
603                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
604         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
605                                                      sharenums, size, canary)
606         self.failUnlessEqual(len(already), 0)
607         self.failUnlessEqual(len(writers), 5)
608         for wb in writers.values():
609             wb.remote_close()
610
611         leases = list(ss.get_leases("si0"))
612         self.failUnlessEqual(len(leases), 1)
613         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
614
615         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
616                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
617         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
618                                                      sharenums, size, canary)
619         for wb in writers.values():
620             wb.remote_close()
621
622         # take out a second lease on si1
623         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
624                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
625         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
626                                                      sharenums, size, canary)
627         self.failUnlessEqual(len(already), 5)
628         self.failUnlessEqual(len(writers), 0)
629
630         leases = list(ss.get_leases("si1"))
631         self.failUnlessEqual(len(leases), 2)
632         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
633
634         # and a third lease, using add-lease
635         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
636                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
637         ss.remote_add_lease("si1", rs2a, cs2a)
638         leases = list(ss.get_leases("si1"))
639         self.failUnlessEqual(len(leases), 3)
640         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
641
642         # add-lease on a missing storage index is silently ignored
643         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
644
645         # check that si0 is readable
646         readers = ss.remote_get_buckets("si0")
647         self.failUnlessEqual(len(readers), 5)
648
649         # renew the first lease. Only the proper renew_secret should work
650         ss.remote_renew_lease("si0", rs0)
651         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
652         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
653
654         # check that si0 is still readable
655         readers = ss.remote_get_buckets("si0")
656         self.failUnlessEqual(len(readers), 5)
657
658         # There is no such method as remote_cancel_lease for now -- see
659         # ticket #1528.
660         self.failIf(hasattr(ss, 'remote_cancel_lease'), \
661                         "ss should not have a 'remote_cancel_lease' method/attribute")
662
663         # test overlapping uploads
664         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
665                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
666         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
667                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
668         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
669                                                      sharenums, size, canary)
670         self.failUnlessEqual(len(already), 0)
671         self.failUnlessEqual(len(writers), 5)
672         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
673                                                        sharenums, size, canary)
674         self.failUnlessEqual(len(already2), 0)
675         self.failUnlessEqual(len(writers2), 0)
676         for wb in writers.values():
677             wb.remote_close()
678
679         leases = list(ss.get_leases("si3"))
680         self.failUnlessEqual(len(leases), 1)
681
682         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
683                                                        sharenums, size, canary)
684         self.failUnlessEqual(len(already3), 5)
685         self.failUnlessEqual(len(writers3), 0)
686
687         leases = list(ss.get_leases("si3"))
688         self.failUnlessEqual(len(leases), 2)
689
690     def test_readonly(self):
691         workdir = self.workdir("test_readonly")
692         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
693         ss.setServiceParent(self.sparent)
694
695         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
696         self.failUnlessEqual(already, set())
697         self.failUnlessEqual(writers, {})
698
699         stats = ss.get_stats()
700         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
701         if "storage_server.disk_avail" in stats:
702             # Some platforms may not have an API to get disk stats.
703             # But if there are stats, readonly_storage means disk_avail=0
704             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
705
706     def test_discard(self):
707         # discard is really only used for other tests, but we test it anyways
708         workdir = self.workdir("test_discard")
709         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
710         ss.setServiceParent(self.sparent)
711
712         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
713         self.failUnlessEqual(already, set())
714         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
715         for i,wb in writers.items():
716             wb.remote_write(0, "%25d" % i)
717             wb.remote_close()
718         # since we discard the data, the shares should be present but sparse.
719         # Since we write with some seeks, the data we read back will be all
720         # zeros.
721         b = ss.remote_get_buckets("vid")
722         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
723         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
724
725     def test_advise_corruption(self):
726         workdir = self.workdir("test_advise_corruption")
727         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
728         ss.setServiceParent(self.sparent)
729
730         si0_s = base32.b2a("si0")
731         ss.remote_advise_corrupt_share("immutable", "si0", 0,
732                                        "This share smells funny.\n")
733         reportdir = os.path.join(workdir, "corruption-advisories")
734         reports = os.listdir(reportdir)
735         self.failUnlessEqual(len(reports), 1)
736         report_si0 = reports[0]
737         self.failUnlessIn(si0_s, report_si0)
738         f = open(os.path.join(reportdir, report_si0), "r")
739         report = f.read()
740         f.close()
741         self.failUnlessIn("type: immutable", report)
742         self.failUnlessIn("storage_index: %s" % si0_s, report)
743         self.failUnlessIn("share_number: 0", report)
744         self.failUnlessIn("This share smells funny.", report)
745
746         # test the RIBucketWriter version too
747         si1_s = base32.b2a("si1")
748         already,writers = self.allocate(ss, "si1", [1], 75)
749         self.failUnlessEqual(already, set())
750         self.failUnlessEqual(set(writers.keys()), set([1]))
751         writers[1].remote_write(0, "data")
752         writers[1].remote_close()
753
754         b = ss.remote_get_buckets("si1")
755         self.failUnlessEqual(set(b.keys()), set([1]))
756         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
757
758         reports = os.listdir(reportdir)
759         self.failUnlessEqual(len(reports), 2)
760         report_si1 = [r for r in reports if si1_s in r][0]
761         f = open(os.path.join(reportdir, report_si1), "r")
762         report = f.read()
763         f.close()
764         self.failUnlessIn("type: immutable", report)
765         self.failUnlessIn("storage_index: %s" % si1_s, report)
766         self.failUnlessIn("share_number: 1", report)
767         self.failUnlessIn("This share tastes like dust.", report)
768
769
770
771 class MutableServer(unittest.TestCase):
772
773     def setUp(self):
774         self.sparent = LoggingServiceParent()
775         self._lease_secret = itertools.count()
776     def tearDown(self):
777         return self.sparent.stopService()
778
779     def workdir(self, name):
780         basedir = os.path.join("storage", "MutableServer", name)
781         return basedir
782
783     def create(self, name):
784         workdir = self.workdir(name)
785         ss = StorageServer(workdir, "\x00" * 20)
786         ss.setServiceParent(self.sparent)
787         return ss
788
789     def test_create(self):
790         self.create("test_create")
791
792     def write_enabler(self, we_tag):
793         return hashutil.tagged_hash("we_blah", we_tag)
794
795     def renew_secret(self, tag):
796         return hashutil.tagged_hash("renew_blah", str(tag))
797
798     def cancel_secret(self, tag):
799         return hashutil.tagged_hash("cancel_blah", str(tag))
800
801     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
802         write_enabler = self.write_enabler(we_tag)
803         renew_secret = self.renew_secret(lease_tag)
804         cancel_secret = self.cancel_secret(lease_tag)
805         rstaraw = ss.remote_slot_testv_and_readv_and_writev
806         testandwritev = dict( [ (shnum, ([], [], None) )
807                          for shnum in sharenums ] )
808         readv = []
809         rc = rstaraw(storage_index,
810                      (write_enabler, renew_secret, cancel_secret),
811                      testandwritev,
812                      readv)
813         (did_write, readv_data) = rc
814         self.failUnless(did_write)
815         self.failUnless(isinstance(readv_data, dict))
816         self.failUnlessEqual(len(readv_data), 0)
817
818     def test_bad_magic(self):
819         ss = self.create("test_bad_magic")
820         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
821         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
822         f = open(fn, "rb+")
823         f.seek(0)
824         f.write("BAD MAGIC")
825         f.close()
826         read = ss.remote_slot_readv
827         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
828                                   read, "si1", [0], [(0,10)])
829         self.failUnlessIn(" had magic ", str(e))
830         self.failUnlessIn(" but we wanted ", str(e))
831
832     def test_container_size(self):
833         ss = self.create("test_container_size")
834         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
835                       set([0,1,2]), 100)
836         read = ss.remote_slot_readv
837         rstaraw = ss.remote_slot_testv_and_readv_and_writev
838         secrets = ( self.write_enabler("we1"),
839                     self.renew_secret("we1"),
840                     self.cancel_secret("we1") )
841         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
842         answer = rstaraw("si1", secrets,
843                          {0: ([], [(0,data)], len(data)+12)},
844                          [])
845         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
846
847         # Trying to make the container too large (by sending a write vector
848         # whose offset is too high) will raise an exception.
849         TOOBIG = MutableShareFile.MAX_SIZE + 10
850         self.failUnlessRaises(DataTooLargeError,
851                               rstaraw, "si1", secrets,
852                               {0: ([], [(TOOBIG,data)], None)},
853                               [])
854
855         answer = rstaraw("si1", secrets,
856                          {0: ([], [(0,data)], None)},
857                          [])
858         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
859
860         read_answer = read("si1", [0], [(0,10)])
861         self.failUnlessEqual(read_answer, {0: [data[:10]]})
862
863         # Sending a new_length shorter than the current length truncates the
864         # data.
865         answer = rstaraw("si1", secrets,
866                          {0: ([], [], 9)},
867                          [])
868         read_answer = read("si1", [0], [(0,10)])
869         self.failUnlessEqual(read_answer, {0: [data[:9]]})
870
871         # Sending a new_length longer than the current length doesn't change
872         # the data.
873         answer = rstaraw("si1", secrets,
874                          {0: ([], [], 20)},
875                          [])
876         assert answer == (True, {0:[],1:[],2:[]})
877         read_answer = read("si1", [0], [(0, 20)])
878         self.failUnlessEqual(read_answer, {0: [data[:9]]})
879
880         # Sending a write vector whose start is after the end of the current
881         # data doesn't reveal "whatever was there last time" (palimpsest),
882         # but instead fills with zeroes.
883
884         # To test this, we fill the data area with a recognizable pattern.
885         pattern = ''.join([chr(i) for i in range(100)])
886         answer = rstaraw("si1", secrets,
887                          {0: ([], [(0, pattern)], None)},
888                          [])
889         assert answer == (True, {0:[],1:[],2:[]})
890         # Then truncate the data...
891         answer = rstaraw("si1", secrets,
892                          {0: ([], [], 20)},
893                          [])
894         assert answer == (True, {0:[],1:[],2:[]})
895         # Just confirm that you get an empty string if you try to read from
896         # past the (new) endpoint now.
897         answer = rstaraw("si1", secrets,
898                          {0: ([], [], None)},
899                          [(20, 1980)])
900         self.failUnlessEqual(answer, (True, {0:[''],1:[''],2:['']}))
901
902         # Then the extend the file by writing a vector which starts out past
903         # the end...
904         answer = rstaraw("si1", secrets,
905                          {0: ([], [(50, 'hellothere')], None)},
906                          [])
907         assert answer == (True, {0:[],1:[],2:[]})
908         # Now if you read the stuff between 20 (where we earlier truncated)
909         # and 50, it had better be all zeroes.
910         answer = rstaraw("si1", secrets,
911                          {0: ([], [], None)},
912                          [(20, 30)])
913         self.failUnlessEqual(answer, (True, {0:['\x00'*30],1:[''],2:['']}))
914
915         # Also see if the server explicitly declares that it supports this
916         # feature.
917         ver = ss.remote_get_version()
918         storage_v1_ver = ver["http://allmydata.org/tahoe/protocols/storage/v1"]
919         self.failUnless(storage_v1_ver.get("fills-holes-with-zero-bytes"))
920
921         # If the size is dropped to zero the share is deleted.
922         answer = rstaraw("si1", secrets,
923                          {0: ([], [(0,data)], 0)},
924                          [])
925         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
926
927         read_answer = read("si1", [0], [(0,10)])
928         self.failUnlessEqual(read_answer, {})
929
930     def test_allocate(self):
931         ss = self.create("test_allocate")
932         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
933                       set([0,1,2]), 100)
934
935         read = ss.remote_slot_readv
936         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
937                              {0: [""]})
938         self.failUnlessEqual(read("si1", [], [(0, 10)]),
939                              {0: [""], 1: [""], 2: [""]})
940         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
941                              {0: [""]})
942
943         # try writing to one
944         secrets = ( self.write_enabler("we1"),
945                     self.renew_secret("we1"),
946                     self.cancel_secret("we1") )
947         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
948         write = ss.remote_slot_testv_and_readv_and_writev
949         answer = write("si1", secrets,
950                        {0: ([], [(0,data)], None)},
951                        [])
952         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
953
954         self.failUnlessEqual(read("si1", [0], [(0,20)]),
955                              {0: ["00000000001111111111"]})
956         self.failUnlessEqual(read("si1", [0], [(95,10)]),
957                              {0: ["99999"]})
958         #self.failUnlessEqual(s0.remote_get_length(), 100)
959
960         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
961         f = self.failUnlessRaises(BadWriteEnablerError,
962                                   write, "si1", bad_secrets,
963                                   {}, [])
964         self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
965
966         # this testv should fail
967         answer = write("si1", secrets,
968                        {0: ([(0, 12, "eq", "444444444444"),
969                              (20, 5, "eq", "22222"),
970                              ],
971                             [(0, "x"*100)],
972                             None),
973                         },
974                        [(0,12), (20,5)],
975                        )
976         self.failUnlessEqual(answer, (False,
977                                       {0: ["000000000011", "22222"],
978                                        1: ["", ""],
979                                        2: ["", ""],
980                                        }))
981         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
982
983         # as should this one
984         answer = write("si1", secrets,
985                        {0: ([(10, 5, "lt", "11111"),
986                              ],
987                             [(0, "x"*100)],
988                             None),
989                         },
990                        [(10,5)],
991                        )
992         self.failUnlessEqual(answer, (False,
993                                       {0: ["11111"],
994                                        1: [""],
995                                        2: [""]},
996                                       ))
997         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
998
999
1000     def test_operators(self):
1001         # test operators, the data we're comparing is '11111' in all cases.
1002         # test both fail+pass, reset data after each one.
1003         ss = self.create("test_operators")
1004
1005         secrets = ( self.write_enabler("we1"),
1006                     self.renew_secret("we1"),
1007                     self.cancel_secret("we1") )
1008         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1009         write = ss.remote_slot_testv_and_readv_and_writev
1010         read = ss.remote_slot_readv
1011
1012         def reset():
1013             write("si1", secrets,
1014                   {0: ([], [(0,data)], None)},
1015                   [])
1016
1017         reset()
1018
1019         #  lt
1020         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
1021                                              ],
1022                                             [(0, "x"*100)],
1023                                             None,
1024                                             )}, [(10,5)])
1025         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1026         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1027         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
1028         reset()
1029
1030         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
1031                                              ],
1032                                             [(0, "x"*100)],
1033                                             None,
1034                                             )}, [(10,5)])
1035         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1036         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1037         reset()
1038
1039         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
1040                                              ],
1041                                             [(0, "y"*100)],
1042                                             None,
1043                                             )}, [(10,5)])
1044         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1045         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1046         reset()
1047
1048         #  le
1049         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
1050                                              ],
1051                                             [(0, "x"*100)],
1052                                             None,
1053                                             )}, [(10,5)])
1054         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1055         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1056         reset()
1057
1058         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
1059                                              ],
1060                                             [(0, "y"*100)],
1061                                             None,
1062                                             )}, [(10,5)])
1063         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1064         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1065         reset()
1066
1067         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
1068                                              ],
1069                                             [(0, "y"*100)],
1070                                             None,
1071                                             )}, [(10,5)])
1072         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1073         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1074         reset()
1075
1076         #  eq
1077         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
1078                                              ],
1079                                             [(0, "x"*100)],
1080                                             None,
1081                                             )}, [(10,5)])
1082         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1083         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1084         reset()
1085
1086         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
1087                                              ],
1088                                             [(0, "y"*100)],
1089                                             None,
1090                                             )}, [(10,5)])
1091         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1092         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1093         reset()
1094
1095         #  ne
1096         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
1097                                              ],
1098                                             [(0, "x"*100)],
1099                                             None,
1100                                             )}, [(10,5)])
1101         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1102         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1103         reset()
1104
1105         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1106                                              ],
1107                                             [(0, "y"*100)],
1108                                             None,
1109                                             )}, [(10,5)])
1110         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1111         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1112         reset()
1113
1114         #  ge
1115         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1116                                              ],
1117                                             [(0, "y"*100)],
1118                                             None,
1119                                             )}, [(10,5)])
1120         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1121         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1122         reset()
1123
1124         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1125                                              ],
1126                                             [(0, "y"*100)],
1127                                             None,
1128                                             )}, [(10,5)])
1129         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1130         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1131         reset()
1132
1133         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1134                                              ],
1135                                             [(0, "y"*100)],
1136                                             None,
1137                                             )}, [(10,5)])
1138         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1139         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1140         reset()
1141
1142         #  gt
1143         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1144                                              ],
1145                                             [(0, "y"*100)],
1146                                             None,
1147                                             )}, [(10,5)])
1148         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1149         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1150         reset()
1151
1152         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1153                                              ],
1154                                             [(0, "x"*100)],
1155                                             None,
1156                                             )}, [(10,5)])
1157         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1158         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1159         reset()
1160
1161         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1162                                              ],
1163                                             [(0, "x"*100)],
1164                                             None,
1165                                             )}, [(10,5)])
1166         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1167         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1168         reset()
1169
1170         # finally, test some operators against empty shares
1171         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1172                                              ],
1173                                             [(0, "x"*100)],
1174                                             None,
1175                                             )}, [(10,5)])
1176         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1177         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1178         reset()
1179
1180     def test_readv(self):
1181         ss = self.create("test_readv")
1182         secrets = ( self.write_enabler("we1"),
1183                     self.renew_secret("we1"),
1184                     self.cancel_secret("we1") )
1185         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1186         write = ss.remote_slot_testv_and_readv_and_writev
1187         read = ss.remote_slot_readv
1188         data = [("%d" % i) * 100 for i in range(3)]
1189         rc = write("si1", secrets,
1190                    {0: ([], [(0,data[0])], None),
1191                     1: ([], [(0,data[1])], None),
1192                     2: ([], [(0,data[2])], None),
1193                     }, [])
1194         self.failUnlessEqual(rc, (True, {}))
1195
1196         answer = read("si1", [], [(0, 10)])
1197         self.failUnlessEqual(answer, {0: ["0"*10],
1198                                       1: ["1"*10],
1199                                       2: ["2"*10]})
1200
1201     def compare_leases_without_timestamps(self, leases_a, leases_b):
1202         self.failUnlessEqual(len(leases_a), len(leases_b))
1203         for i in range(len(leases_a)):
1204             a = leases_a[i]
1205             b = leases_b[i]
1206             self.failUnlessEqual(a.owner_num,       b.owner_num)
1207             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1208             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1209             self.failUnlessEqual(a.nodeid,          b.nodeid)
1210
1211     def compare_leases(self, leases_a, leases_b):
1212         self.failUnlessEqual(len(leases_a), len(leases_b))
1213         for i in range(len(leases_a)):
1214             a = leases_a[i]
1215             b = leases_b[i]
1216             self.failUnlessEqual(a.owner_num,       b.owner_num)
1217             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1218             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1219             self.failUnlessEqual(a.nodeid,          b.nodeid)
1220             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1221
1222     def test_leases(self):
1223         ss = self.create("test_leases")
1224         def secrets(n):
1225             return ( self.write_enabler("we1"),
1226                      self.renew_secret("we1-%d" % n),
1227                      self.cancel_secret("we1-%d" % n) )
1228         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1229         write = ss.remote_slot_testv_and_readv_and_writev
1230         read = ss.remote_slot_readv
1231         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1232         self.failUnlessEqual(rc, (True, {}))
1233
1234         # create a random non-numeric file in the bucket directory, to
1235         # exercise the code that's supposed to ignore those.
1236         bucket_dir = os.path.join(self.workdir("test_leases"),
1237                                   "shares", storage_index_to_dir("si1"))
1238         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1239         f.write("you ought to be ignoring me\n")
1240         f.close()
1241
1242         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1243         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1244
1245         # add-lease on a missing storage index is silently ignored
1246         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1247
1248         # re-allocate the slots and use the same secrets, that should update
1249         # the lease
1250         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1251         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1252
1253         # renew it directly
1254         ss.remote_renew_lease("si1", secrets(0)[1])
1255         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1256
1257         # now allocate them with a bunch of different secrets, to trigger the
1258         # extended lease code. Use add_lease for one of them.
1259         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1260         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1261         secrets2 = secrets(2)
1262         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1263         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1264         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1265         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1266         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1267
1268         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1269
1270         all_leases = list(s0.get_leases())
1271         # and write enough data to expand the container, forcing the server
1272         # to move the leases
1273         write("si1", secrets(0),
1274               {0: ([], [(0,data)], 200), },
1275               [])
1276
1277         # read back the leases, make sure they're still intact.
1278         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1279
1280         ss.remote_renew_lease("si1", secrets(0)[1])
1281         ss.remote_renew_lease("si1", secrets(1)[1])
1282         ss.remote_renew_lease("si1", secrets(2)[1])
1283         ss.remote_renew_lease("si1", secrets(3)[1])
1284         ss.remote_renew_lease("si1", secrets(4)[1])
1285         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1286         # get a new copy of the leases, with the current timestamps. Reading
1287         # data and failing to renew/cancel leases should leave the timestamps
1288         # alone.
1289         all_leases = list(s0.get_leases())
1290         # renewing with a bogus token should prompt an error message
1291
1292         # examine the exception thus raised, make sure the old nodeid is
1293         # present, to provide for share migration
1294         e = self.failUnlessRaises(IndexError,
1295                                   ss.remote_renew_lease, "si1",
1296                                   secrets(20)[1])
1297         e_s = str(e)
1298         self.failUnlessIn("Unable to renew non-existent lease", e_s)
1299         self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1300         self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1301
1302         self.compare_leases(all_leases, list(s0.get_leases()))
1303
1304         # reading shares should not modify the timestamp
1305         read("si1", [], [(0,200)])
1306         self.compare_leases(all_leases, list(s0.get_leases()))
1307
1308         write("si1", secrets(0),
1309               {0: ([], [(200, "make me bigger")], None)}, [])
1310         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1311
1312         write("si1", secrets(0),
1313               {0: ([], [(500, "make me really bigger")], None)}, [])
1314         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1315
1316     def test_remove(self):
1317         ss = self.create("test_remove")
1318         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1319                       set([0,1,2]), 100)
1320         readv = ss.remote_slot_readv
1321         writev = ss.remote_slot_testv_and_readv_and_writev
1322         secrets = ( self.write_enabler("we1"),
1323                     self.renew_secret("we1"),
1324                     self.cancel_secret("we1") )
1325         # delete sh0 by setting its size to zero
1326         answer = writev("si1", secrets,
1327                         {0: ([], [], 0)},
1328                         [])
1329         # the answer should mention all the shares that existed before the
1330         # write
1331         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1332         # but a new read should show only sh1 and sh2
1333         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1334                              {1: [""], 2: [""]})
1335
1336         # delete sh1 by setting its size to zero
1337         answer = writev("si1", secrets,
1338                         {1: ([], [], 0)},
1339                         [])
1340         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1341         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1342                              {2: [""]})
1343
1344         # delete sh2 by setting its size to zero
1345         answer = writev("si1", secrets,
1346                         {2: ([], [], 0)},
1347                         [])
1348         self.failUnlessEqual(answer, (True, {2:[]}) )
1349         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1350                              {})
1351         # and the bucket directory should now be gone
1352         si = base32.b2a("si1")
1353         # note: this is a detail of the storage server implementation, and
1354         # may change in the future
1355         prefix = si[:2]
1356         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1357         bucketdir = os.path.join(prefixdir, si)
1358         self.failUnless(os.path.exists(prefixdir), prefixdir)
1359         self.failIf(os.path.exists(bucketdir), bucketdir)
1360
1361
1362 class MDMFProxies(unittest.TestCase, ShouldFailMixin):
1363     def setUp(self):
1364         self.sparent = LoggingServiceParent()
1365         self._lease_secret = itertools.count()
1366         self.ss = self.create("MDMFProxies storage test server")
1367         self.rref = RemoteBucket()
1368         self.rref.target = self.ss
1369         self.secrets = (self.write_enabler("we_secret"),
1370                         self.renew_secret("renew_secret"),
1371                         self.cancel_secret("cancel_secret"))
1372         self.segment = "aaaaaa"
1373         self.block = "aa"
1374         self.salt = "a" * 16
1375         self.block_hash = "a" * 32
1376         self.block_hash_tree = [self.block_hash for i in xrange(6)]
1377         self.share_hash = self.block_hash
1378         self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
1379         self.signature = "foobarbaz"
1380         self.verification_key = "vvvvvv"
1381         self.encprivkey = "private"
1382         self.root_hash = self.block_hash
1383         self.salt_hash = self.root_hash
1384         self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
1385         self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
1386         self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
1387         # blockhashes and salt hashes are serialized in the same way,
1388         # only we lop off the first element and store that in the
1389         # header.
1390         self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
1391
1392
1393     def tearDown(self):
1394         self.sparent.stopService()
1395         shutil.rmtree(self.workdir("MDMFProxies storage test server"))
1396
1397
1398     def write_enabler(self, we_tag):
1399         return hashutil.tagged_hash("we_blah", we_tag)
1400
1401
1402     def renew_secret(self, tag):
1403         return hashutil.tagged_hash("renew_blah", str(tag))
1404
1405
1406     def cancel_secret(self, tag):
1407         return hashutil.tagged_hash("cancel_blah", str(tag))
1408
1409
1410     def workdir(self, name):
1411         basedir = os.path.join("storage", "MutableServer", name)
1412         return basedir
1413
1414
1415     def create(self, name):
1416         workdir = self.workdir(name)
1417         ss = StorageServer(workdir, "\x00" * 20)
1418         ss.setServiceParent(self.sparent)
1419         return ss
1420
1421
1422     def build_test_mdmf_share(self, tail_segment=False, empty=False):
1423         # Start with the checkstring
1424         data = struct.pack(">BQ32s",
1425                            1,
1426                            0,
1427                            self.root_hash)
1428         self.checkstring = data
1429         # Next, the encoding parameters
1430         if tail_segment:
1431             data += struct.pack(">BBQQ",
1432                                 3,
1433                                 10,
1434                                 6,
1435                                 33)
1436         elif empty:
1437             data += struct.pack(">BBQQ",
1438                                 3,
1439                                 10,
1440                                 0,
1441                                 0)
1442         else:
1443             data += struct.pack(">BBQQ",
1444                                 3,
1445                                 10,
1446                                 6,
1447                                 36)
1448         # Now we'll build the offsets.
1449         sharedata = ""
1450         if not tail_segment and not empty:
1451             for i in xrange(6):
1452                 sharedata += self.salt + self.block
1453         elif tail_segment:
1454             for i in xrange(5):
1455                 sharedata += self.salt + self.block
1456             sharedata += self.salt + "a"
1457
1458         # The encrypted private key comes after the shares + salts
1459         offset_size = struct.calcsize(MDMFOFFSETS)
1460         encrypted_private_key_offset = len(data) + offset_size
1461         # The share has chain comes after the private key
1462         sharehashes_offset = encrypted_private_key_offset + \
1463             len(self.encprivkey)
1464
1465         # The signature comes after the share hash chain.
1466         signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
1467
1468         verification_key_offset = signature_offset + len(self.signature)
1469         verification_key_end = verification_key_offset + \
1470             len(self.verification_key)
1471
1472         share_data_offset = offset_size
1473         share_data_offset += PRIVATE_KEY_SIZE
1474         share_data_offset += SIGNATURE_SIZE
1475         share_data_offset += VERIFICATION_KEY_SIZE
1476         share_data_offset += SHARE_HASH_CHAIN_SIZE
1477
1478         blockhashes_offset = share_data_offset + len(sharedata)
1479         eof_offset = blockhashes_offset + len(self.block_hash_tree_s)
1480
1481         data += struct.pack(MDMFOFFSETS,
1482                             encrypted_private_key_offset,
1483                             sharehashes_offset,
1484                             signature_offset,
1485                             verification_key_offset,
1486                             verification_key_end,
1487                             share_data_offset,
1488                             blockhashes_offset,
1489                             eof_offset)
1490
1491         self.offsets = {}
1492         self.offsets['enc_privkey'] = encrypted_private_key_offset
1493         self.offsets['block_hash_tree'] = blockhashes_offset
1494         self.offsets['share_hash_chain'] = sharehashes_offset
1495         self.offsets['signature'] = signature_offset
1496         self.offsets['verification_key'] = verification_key_offset
1497         self.offsets['share_data'] = share_data_offset
1498         self.offsets['verification_key_end'] = verification_key_end
1499         self.offsets['EOF'] = eof_offset
1500
1501         # the private key,
1502         data += self.encprivkey
1503         # the sharehashes
1504         data += self.share_hash_chain_s
1505         # the signature,
1506         data += self.signature
1507         # and the verification key
1508         data += self.verification_key
1509         # Then we'll add in gibberish until we get to the right point.
1510         nulls = "".join([" " for i in xrange(len(data), share_data_offset)])
1511         data += nulls
1512
1513         # Then the share data
1514         data += sharedata
1515         # the blockhashes
1516         data += self.block_hash_tree_s
1517         return data
1518
1519
1520     def write_test_share_to_server(self,
1521                                    storage_index,
1522                                    tail_segment=False,
1523                                    empty=False):
1524         """
1525         I write some data for the read tests to read to self.ss
1526
1527         If tail_segment=True, then I will write a share that has a
1528         smaller tail segment than other segments.
1529         """
1530         write = self.ss.remote_slot_testv_and_readv_and_writev
1531         data = self.build_test_mdmf_share(tail_segment, empty)
1532         # Finally, we write the whole thing to the storage server in one
1533         # pass.
1534         testvs = [(0, 1, "eq", "")]
1535         tws = {}
1536         tws[0] = (testvs, [(0, data)], None)
1537         readv = [(0, 1)]
1538         results = write(storage_index, self.secrets, tws, readv)
1539         self.failUnless(results[0])
1540
1541
1542     def build_test_sdmf_share(self, empty=False):
1543         if empty:
1544             sharedata = ""
1545         else:
1546             sharedata = self.segment * 6
1547         self.sharedata = sharedata
1548         blocksize = len(sharedata) / 3
1549         block = sharedata[:blocksize]
1550         self.blockdata = block
1551         prefix = struct.pack(">BQ32s16s BBQQ",
1552                              0, # version,
1553                              0,
1554                              self.root_hash,
1555                              self.salt,
1556                              3,
1557                              10,
1558                              len(sharedata),
1559                              len(sharedata),
1560                             )
1561         post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
1562         signature_offset = post_offset + len(self.verification_key)
1563         sharehashes_offset = signature_offset + len(self.signature)
1564         blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
1565         sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
1566         encprivkey_offset = sharedata_offset + len(block)
1567         eof_offset = encprivkey_offset + len(self.encprivkey)
1568         offsets = struct.pack(">LLLLQQ",
1569                               signature_offset,
1570                               sharehashes_offset,
1571                               blockhashes_offset,
1572                               sharedata_offset,
1573                               encprivkey_offset,
1574                               eof_offset)
1575         final_share = "".join([prefix,
1576                            offsets,
1577                            self.verification_key,
1578                            self.signature,
1579                            self.share_hash_chain_s,
1580                            self.block_hash_tree_s,
1581                            block,
1582                            self.encprivkey])
1583         self.offsets = {}
1584         self.offsets['signature'] = signature_offset
1585         self.offsets['share_hash_chain'] = sharehashes_offset
1586         self.offsets['block_hash_tree'] = blockhashes_offset
1587         self.offsets['share_data'] = sharedata_offset
1588         self.offsets['enc_privkey'] = encprivkey_offset
1589         self.offsets['EOF'] = eof_offset
1590         return final_share
1591
1592
1593     def write_sdmf_share_to_server(self,
1594                                    storage_index,
1595                                    empty=False):
1596         # Some tests need SDMF shares to verify that we can still 
1597         # read them. This method writes one, which resembles but is not
1598         assert self.rref
1599         write = self.ss.remote_slot_testv_and_readv_and_writev
1600         share = self.build_test_sdmf_share(empty)
1601         testvs = [(0, 1, "eq", "")]
1602         tws = {}
1603         tws[0] = (testvs, [(0, share)], None)
1604         readv = []
1605         results = write(storage_index, self.secrets, tws, readv)
1606         self.failUnless(results[0])
1607
1608
1609     def test_read(self):
1610         self.write_test_share_to_server("si1")
1611         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1612         # Check that every method equals what we expect it to.
1613         d = defer.succeed(None)
1614         def _check_block_and_salt((block, salt)):
1615             self.failUnlessEqual(block, self.block)
1616             self.failUnlessEqual(salt, self.salt)
1617
1618         for i in xrange(6):
1619             d.addCallback(lambda ignored, i=i:
1620                 mr.get_block_and_salt(i))
1621             d.addCallback(_check_block_and_salt)
1622
1623         d.addCallback(lambda ignored:
1624             mr.get_encprivkey())
1625         d.addCallback(lambda encprivkey:
1626             self.failUnlessEqual(self.encprivkey, encprivkey))
1627
1628         d.addCallback(lambda ignored:
1629             mr.get_blockhashes())
1630         d.addCallback(lambda blockhashes:
1631             self.failUnlessEqual(self.block_hash_tree, blockhashes))
1632
1633         d.addCallback(lambda ignored:
1634             mr.get_sharehashes())
1635         d.addCallback(lambda sharehashes:
1636             self.failUnlessEqual(self.share_hash_chain, sharehashes))
1637
1638         d.addCallback(lambda ignored:
1639             mr.get_signature())
1640         d.addCallback(lambda signature:
1641             self.failUnlessEqual(signature, self.signature))
1642
1643         d.addCallback(lambda ignored:
1644             mr.get_verification_key())
1645         d.addCallback(lambda verification_key:
1646             self.failUnlessEqual(verification_key, self.verification_key))
1647
1648         d.addCallback(lambda ignored:
1649             mr.get_seqnum())
1650         d.addCallback(lambda seqnum:
1651             self.failUnlessEqual(seqnum, 0))
1652
1653         d.addCallback(lambda ignored:
1654             mr.get_root_hash())
1655         d.addCallback(lambda root_hash:
1656             self.failUnlessEqual(self.root_hash, root_hash))
1657
1658         d.addCallback(lambda ignored:
1659             mr.get_seqnum())
1660         d.addCallback(lambda seqnum:
1661             self.failUnlessEqual(0, seqnum))
1662
1663         d.addCallback(lambda ignored:
1664             mr.get_encoding_parameters())
1665         def _check_encoding_parameters((k, n, segsize, datalen)):
1666             self.failUnlessEqual(k, 3)
1667             self.failUnlessEqual(n, 10)
1668             self.failUnlessEqual(segsize, 6)
1669             self.failUnlessEqual(datalen, 36)
1670         d.addCallback(_check_encoding_parameters)
1671
1672         d.addCallback(lambda ignored:
1673             mr.get_checkstring())
1674         d.addCallback(lambda checkstring:
1675             self.failUnlessEqual(checkstring, checkstring))
1676         return d
1677
1678
1679     def test_read_with_different_tail_segment_size(self):
1680         self.write_test_share_to_server("si1", tail_segment=True)
1681         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1682         d = mr.get_block_and_salt(5)
1683         def _check_tail_segment(results):
1684             block, salt = results
1685             self.failUnlessEqual(len(block), 1)
1686             self.failUnlessEqual(block, "a")
1687         d.addCallback(_check_tail_segment)
1688         return d
1689
1690
1691     def test_get_block_with_invalid_segnum(self):
1692         self.write_test_share_to_server("si1")
1693         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1694         d = defer.succeed(None)
1695         d.addCallback(lambda ignored:
1696             self.shouldFail(LayoutInvalid, "test invalid segnum",
1697                             None,
1698                             mr.get_block_and_salt, 7))
1699         return d
1700
1701
1702     def test_get_encoding_parameters_first(self):
1703         self.write_test_share_to_server("si1")
1704         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1705         d = mr.get_encoding_parameters()
1706         def _check_encoding_parameters((k, n, segment_size, datalen)):
1707             self.failUnlessEqual(k, 3)
1708             self.failUnlessEqual(n, 10)
1709             self.failUnlessEqual(segment_size, 6)
1710             self.failUnlessEqual(datalen, 36)
1711         d.addCallback(_check_encoding_parameters)
1712         return d
1713
1714
1715     def test_get_seqnum_first(self):
1716         self.write_test_share_to_server("si1")
1717         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1718         d = mr.get_seqnum()
1719         d.addCallback(lambda seqnum:
1720             self.failUnlessEqual(seqnum, 0))
1721         return d
1722
1723
1724     def test_get_root_hash_first(self):
1725         self.write_test_share_to_server("si1")
1726         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1727         d = mr.get_root_hash()
1728         d.addCallback(lambda root_hash:
1729             self.failUnlessEqual(root_hash, self.root_hash))
1730         return d
1731
1732
1733     def test_get_checkstring_first(self):
1734         self.write_test_share_to_server("si1")
1735         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1736         d = mr.get_checkstring()
1737         d.addCallback(lambda checkstring:
1738             self.failUnlessEqual(checkstring, self.checkstring))
1739         return d
1740
1741
1742     def test_write_read_vectors(self):
1743         # When writing for us, the storage server will return to us a
1744         # read vector, along with its result. If a write fails because
1745         # the test vectors failed, this read vector can help us to
1746         # diagnose the problem. This test ensures that the read vector
1747         # is working appropriately.
1748         mw = self._make_new_mw("si1", 0)
1749
1750         for i in xrange(6):
1751             mw.put_block(self.block, i, self.salt)
1752         mw.put_encprivkey(self.encprivkey)
1753         mw.put_blockhashes(self.block_hash_tree)
1754         mw.put_sharehashes(self.share_hash_chain)
1755         mw.put_root_hash(self.root_hash)
1756         mw.put_signature(self.signature)
1757         mw.put_verification_key(self.verification_key)
1758         d = mw.finish_publishing()
1759         def _then(results):
1760             self.failUnless(len(results), 2)
1761             result, readv = results
1762             self.failUnless(result)
1763             self.failIf(readv)
1764             self.old_checkstring = mw.get_checkstring()
1765             mw.set_checkstring("")
1766         d.addCallback(_then)
1767         d.addCallback(lambda ignored:
1768             mw.finish_publishing())
1769         def _then_again(results):
1770             self.failUnlessEqual(len(results), 2)
1771             result, readvs = results
1772             self.failIf(result)
1773             self.failUnlessIn(0, readvs)
1774             readv = readvs[0][0]
1775             self.failUnlessEqual(readv, self.old_checkstring)
1776         d.addCallback(_then_again)
1777         # The checkstring remains the same for the rest of the process.
1778         return d
1779
1780
1781     def test_private_key_after_share_hash_chain(self):
1782         mw = self._make_new_mw("si1", 0)
1783         d = defer.succeed(None)
1784         for i in xrange(6):
1785             d.addCallback(lambda ignored, i=i:
1786                 mw.put_block(self.block, i, self.salt))
1787         d.addCallback(lambda ignored:
1788             mw.put_encprivkey(self.encprivkey))
1789         d.addCallback(lambda ignored:
1790             mw.put_sharehashes(self.share_hash_chain))
1791
1792         # Now try to put the private key again.
1793         d.addCallback(lambda ignored:
1794             self.shouldFail(LayoutInvalid, "test repeat private key",
1795                             None,
1796                             mw.put_encprivkey, self.encprivkey))
1797         return d
1798
1799
1800     def test_signature_after_verification_key(self):
1801         mw = self._make_new_mw("si1", 0)
1802         d = defer.succeed(None)
1803         # Put everything up to and including the verification key.
1804         for i in xrange(6):
1805             d.addCallback(lambda ignored, i=i:
1806                 mw.put_block(self.block, i, self.salt))
1807         d.addCallback(lambda ignored:
1808             mw.put_encprivkey(self.encprivkey))
1809         d.addCallback(lambda ignored:
1810             mw.put_blockhashes(self.block_hash_tree))
1811         d.addCallback(lambda ignored:
1812             mw.put_sharehashes(self.share_hash_chain))
1813         d.addCallback(lambda ignored:
1814             mw.put_root_hash(self.root_hash))
1815         d.addCallback(lambda ignored:
1816             mw.put_signature(self.signature))
1817         d.addCallback(lambda ignored:
1818             mw.put_verification_key(self.verification_key))
1819         # Now try to put the signature again. This should fail
1820         d.addCallback(lambda ignored:
1821             self.shouldFail(LayoutInvalid, "signature after verification",
1822                             None,
1823                             mw.put_signature, self.signature))
1824         return d
1825
1826
1827     def test_uncoordinated_write(self):
1828         # Make two mutable writers, both pointing to the same storage
1829         # server, both at the same storage index, and try writing to the
1830         # same share.
1831         mw1 = self._make_new_mw("si1", 0)
1832         mw2 = self._make_new_mw("si1", 0)
1833
1834         def _check_success(results):
1835             result, readvs = results
1836             self.failUnless(result)
1837
1838         def _check_failure(results):
1839             result, readvs = results
1840             self.failIf(result)
1841
1842         def _write_share(mw):
1843             for i in xrange(6):
1844                 mw.put_block(self.block, i, self.salt)
1845             mw.put_encprivkey(self.encprivkey)
1846             mw.put_blockhashes(self.block_hash_tree)
1847             mw.put_sharehashes(self.share_hash_chain)
1848             mw.put_root_hash(self.root_hash)
1849             mw.put_signature(self.signature)
1850             mw.put_verification_key(self.verification_key)
1851             return mw.finish_publishing()
1852         d = _write_share(mw1)
1853         d.addCallback(_check_success)
1854         d.addCallback(lambda ignored:
1855             _write_share(mw2))
1856         d.addCallback(_check_failure)
1857         return d
1858
1859
1860     def test_invalid_salt_size(self):
1861         # Salts need to be 16 bytes in size. Writes that attempt to
1862         # write more or less than this should be rejected.
1863         mw = self._make_new_mw("si1", 0)
1864         invalid_salt = "a" * 17 # 17 bytes
1865         another_invalid_salt = "b" * 15 # 15 bytes
1866         d = defer.succeed(None)
1867         d.addCallback(lambda ignored:
1868             self.shouldFail(LayoutInvalid, "salt too big",
1869                             None,
1870                             mw.put_block, self.block, 0, invalid_salt))
1871         d.addCallback(lambda ignored:
1872             self.shouldFail(LayoutInvalid, "salt too small",
1873                             None,
1874                             mw.put_block, self.block, 0,
1875                             another_invalid_salt))
1876         return d
1877
1878
1879     def test_write_test_vectors(self):
1880         # If we give the write proxy a bogus test vector at 
1881         # any point during the process, it should fail to write when we 
1882         # tell it to write.
1883         def _check_failure(results):
1884             self.failUnlessEqual(len(results), 2)
1885             res, d = results
1886             self.failIf(res)
1887
1888         def _check_success(results):
1889             self.failUnlessEqual(len(results), 2)
1890             res, d = results
1891             self.failUnless(results)
1892
1893         mw = self._make_new_mw("si1", 0)
1894         mw.set_checkstring("this is a lie")
1895         for i in xrange(6):
1896             mw.put_block(self.block, i, self.salt)
1897         mw.put_encprivkey(self.encprivkey)
1898         mw.put_blockhashes(self.block_hash_tree)
1899         mw.put_sharehashes(self.share_hash_chain)
1900         mw.put_root_hash(self.root_hash)
1901         mw.put_signature(self.signature)
1902         mw.put_verification_key(self.verification_key)
1903         d = mw.finish_publishing()
1904         d.addCallback(_check_failure)
1905         d.addCallback(lambda ignored:
1906             mw.set_checkstring(""))
1907         d.addCallback(lambda ignored:
1908             mw.finish_publishing())
1909         d.addCallback(_check_success)
1910         return d
1911
1912
1913     def serialize_blockhashes(self, blockhashes):
1914         return "".join(blockhashes)
1915
1916
1917     def serialize_sharehashes(self, sharehashes):
1918         ret = "".join([struct.pack(">H32s", i, sharehashes[i])
1919                         for i in sorted(sharehashes.keys())])
1920         return ret
1921
1922
1923     def test_write(self):
1924         # This translates to a file with 6 6-byte segments, and with 2-byte
1925         # blocks.
1926         mw = self._make_new_mw("si1", 0)
1927         # Test writing some blocks.
1928         read = self.ss.remote_slot_readv
1929         expected_private_key_offset = struct.calcsize(MDMFHEADER)
1930         expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
1931                                     PRIVATE_KEY_SIZE + \
1932                                     SIGNATURE_SIZE + \
1933                                     VERIFICATION_KEY_SIZE + \
1934                                     SHARE_HASH_CHAIN_SIZE
1935         written_block_size = 2 + len(self.salt)
1936         written_block = self.block + self.salt
1937         for i in xrange(6):
1938             mw.put_block(self.block, i, self.salt)
1939
1940         mw.put_encprivkey(self.encprivkey)
1941         mw.put_blockhashes(self.block_hash_tree)
1942         mw.put_sharehashes(self.share_hash_chain)
1943         mw.put_root_hash(self.root_hash)
1944         mw.put_signature(self.signature)
1945         mw.put_verification_key(self.verification_key)
1946         d = mw.finish_publishing()
1947         def _check_publish(results):
1948             self.failUnlessEqual(len(results), 2)
1949             result, ign = results
1950             self.failUnless(result, "publish failed")
1951             for i in xrange(6):
1952                 self.failUnlessEqual(read("si1", [0], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]),
1953                                 {0: [written_block]})
1954
1955             self.failUnlessEqual(len(self.encprivkey), 7)
1956             self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]),
1957                                  {0: [self.encprivkey]})
1958
1959             expected_block_hash_offset = expected_sharedata_offset + \
1960                         (6 * written_block_size)
1961             self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6)
1962             self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]),
1963                                  {0: [self.block_hash_tree_s]})
1964
1965             expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey)
1966             self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
1967                                  {0: [self.share_hash_chain_s]})
1968
1969             self.failUnlessEqual(read("si1", [0], [(9, 32)]),
1970                                  {0: [self.root_hash]})
1971             expected_signature_offset = expected_share_hash_offset + \
1972                 len(self.share_hash_chain_s)
1973             self.failUnlessEqual(len(self.signature), 9)
1974             self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]),
1975                                  {0: [self.signature]})
1976
1977             expected_verification_key_offset = expected_signature_offset + len(self.signature)
1978             self.failUnlessEqual(len(self.verification_key), 6)
1979             self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]),
1980                                  {0: [self.verification_key]})
1981
1982             signable = mw.get_signable()
1983             verno, seq, roothash, k, n, segsize, datalen = \
1984                                             struct.unpack(">BQ32sBBQQ",
1985                                                           signable)
1986             self.failUnlessEqual(verno, 1)
1987             self.failUnlessEqual(seq, 0)
1988             self.failUnlessEqual(roothash, self.root_hash)
1989             self.failUnlessEqual(k, 3)
1990             self.failUnlessEqual(n, 10)
1991             self.failUnlessEqual(segsize, 6)
1992             self.failUnlessEqual(datalen, 36)
1993             expected_eof_offset = expected_block_hash_offset + \
1994                 len(self.block_hash_tree_s)
1995
1996             # Check the version number to make sure that it is correct.
1997             expected_version_number = struct.pack(">B", 1)
1998             self.failUnlessEqual(read("si1", [0], [(0, 1)]),
1999                                  {0: [expected_version_number]})
2000             # Check the sequence number to make sure that it is correct
2001             expected_sequence_number = struct.pack(">Q", 0)
2002             self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2003                                  {0: [expected_sequence_number]})
2004             # Check that the encoding parameters (k, N, segement size, data
2005             # length) are what they should be. These are  3, 10, 6, 36
2006             expected_k = struct.pack(">B", 3)
2007             self.failUnlessEqual(read("si1", [0], [(41, 1)]),
2008                                  {0: [expected_k]})
2009             expected_n = struct.pack(">B", 10)
2010             self.failUnlessEqual(read("si1", [0], [(42, 1)]),
2011                                  {0: [expected_n]})
2012             expected_segment_size = struct.pack(">Q", 6)
2013             self.failUnlessEqual(read("si1", [0], [(43, 8)]),
2014                                  {0: [expected_segment_size]})
2015             expected_data_length = struct.pack(">Q", 36)
2016             self.failUnlessEqual(read("si1", [0], [(51, 8)]),
2017                                  {0: [expected_data_length]})
2018             expected_offset = struct.pack(">Q", expected_private_key_offset)
2019             self.failUnlessEqual(read("si1", [0], [(59, 8)]),
2020                                  {0: [expected_offset]})
2021             expected_offset = struct.pack(">Q", expected_share_hash_offset)
2022             self.failUnlessEqual(read("si1", [0], [(67, 8)]),
2023                                  {0: [expected_offset]})
2024             expected_offset = struct.pack(">Q", expected_signature_offset)
2025             self.failUnlessEqual(read("si1", [0], [(75, 8)]),
2026                                  {0: [expected_offset]})
2027             expected_offset = struct.pack(">Q", expected_verification_key_offset)
2028             self.failUnlessEqual(read("si1", [0], [(83, 8)]),
2029                                  {0: [expected_offset]})
2030             expected_offset = struct.pack(">Q", expected_verification_key_offset + len(self.verification_key))
2031             self.failUnlessEqual(read("si1", [0], [(91, 8)]),
2032                                  {0: [expected_offset]})
2033             expected_offset = struct.pack(">Q", expected_sharedata_offset)
2034             self.failUnlessEqual(read("si1", [0], [(99, 8)]),
2035                                  {0: [expected_offset]})
2036             expected_offset = struct.pack(">Q", expected_block_hash_offset)
2037             self.failUnlessEqual(read("si1", [0], [(107, 8)]),
2038                                  {0: [expected_offset]})
2039             expected_offset = struct.pack(">Q", expected_eof_offset)
2040             self.failUnlessEqual(read("si1", [0], [(115, 8)]),
2041                                  {0: [expected_offset]})
2042         d.addCallback(_check_publish)
2043         return d
2044
2045     def _make_new_mw(self, si, share, datalength=36):
2046         # This is a file of size 36 bytes. Since it has a segment
2047         # size of 6, we know that it has 6 byte segments, which will
2048         # be split into blocks of 2 bytes because our FEC k
2049         # parameter is 3.
2050         mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
2051                                 6, datalength)
2052         return mw
2053
2054
2055     def test_write_rejected_with_too_many_blocks(self):
2056         mw = self._make_new_mw("si0", 0)
2057
2058         # Try writing too many blocks. We should not be able to write
2059         # more than 6
2060         # blocks into each share.
2061         d = defer.succeed(None)
2062         for i in xrange(6):
2063             d.addCallback(lambda ignored, i=i:
2064                 mw.put_block(self.block, i, self.salt))
2065         d.addCallback(lambda ignored:
2066             self.shouldFail(LayoutInvalid, "too many blocks",
2067                             None,
2068                             mw.put_block, self.block, 7, self.salt))
2069         return d
2070
2071
2072     def test_write_rejected_with_invalid_salt(self):
2073         # Try writing an invalid salt. Salts are 16 bytes -- any more or
2074         # less should cause an error.
2075         mw = self._make_new_mw("si1", 0)
2076         bad_salt = "a" * 17 # 17 bytes
2077         d = defer.succeed(None)
2078         d.addCallback(lambda ignored:
2079             self.shouldFail(LayoutInvalid, "test_invalid_salt",
2080                             None, mw.put_block, self.block, 7, bad_salt))
2081         return d
2082
2083
2084     def test_write_rejected_with_invalid_root_hash(self):
2085         # Try writing an invalid root hash. This should be SHA256d, and
2086         # 32 bytes long as a result.
2087         mw = self._make_new_mw("si2", 0)
2088         # 17 bytes != 32 bytes
2089         invalid_root_hash = "a" * 17
2090         d = defer.succeed(None)
2091         # Before this test can work, we need to put some blocks + salts,
2092         # a block hash tree, and a share hash tree. Otherwise, we'll see
2093         # failures that match what we are looking for, but are caused by
2094         # the constraints imposed on operation ordering.
2095         for i in xrange(6):
2096             d.addCallback(lambda ignored, i=i:
2097                 mw.put_block(self.block, i, self.salt))
2098         d.addCallback(lambda ignored:
2099             mw.put_encprivkey(self.encprivkey))
2100         d.addCallback(lambda ignored:
2101             mw.put_blockhashes(self.block_hash_tree))
2102         d.addCallback(lambda ignored:
2103             mw.put_sharehashes(self.share_hash_chain))
2104         d.addCallback(lambda ignored:
2105             self.shouldFail(LayoutInvalid, "invalid root hash",
2106                             None, mw.put_root_hash, invalid_root_hash))
2107         return d
2108
2109
2110     def test_write_rejected_with_invalid_blocksize(self):
2111         # The blocksize implied by the writer that we get from
2112         # _make_new_mw is 2bytes -- any more or any less than this
2113         # should be cause for failure, unless it is the tail segment, in
2114         # which case it may not be failure.
2115         invalid_block = "a"
2116         mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
2117                                              # one byte blocks
2118         # 1 bytes != 2 bytes
2119         d = defer.succeed(None)
2120         d.addCallback(lambda ignored, invalid_block=invalid_block:
2121             self.shouldFail(LayoutInvalid, "test blocksize too small",
2122                             None, mw.put_block, invalid_block, 0,
2123                             self.salt))
2124         invalid_block = invalid_block * 3
2125         # 3 bytes != 2 bytes
2126         d.addCallback(lambda ignored:
2127             self.shouldFail(LayoutInvalid, "test blocksize too large",
2128                             None,
2129                             mw.put_block, invalid_block, 0, self.salt))
2130         for i in xrange(5):
2131             d.addCallback(lambda ignored, i=i:
2132                 mw.put_block(self.block, i, self.salt))
2133         # Try to put an invalid tail segment
2134         d.addCallback(lambda ignored:
2135             self.shouldFail(LayoutInvalid, "test invalid tail segment",
2136                             None,
2137                             mw.put_block, self.block, 5, self.salt))
2138         valid_block = "a"
2139         d.addCallback(lambda ignored:
2140             mw.put_block(valid_block, 5, self.salt))
2141         return d
2142
2143
2144     def test_write_enforces_order_constraints(self):
2145         # We require that the MDMFSlotWriteProxy be interacted with in a
2146         # specific way.
2147         # That way is:
2148         # 0: __init__
2149         # 1: write blocks and salts
2150         # 2: Write the encrypted private key
2151         # 3: Write the block hashes
2152         # 4: Write the share hashes
2153         # 5: Write the root hash and salt hash
2154         # 6: Write the signature and verification key
2155         # 7: Write the file.
2156         # 
2157         # Some of these can be performed out-of-order, and some can't.
2158         # The dependencies that I want to test here are:
2159         #  - Private key before block hashes
2160         #  - share hashes and block hashes before root hash
2161         #  - root hash before signature
2162         #  - signature before verification key
2163         mw0 = self._make_new_mw("si0", 0)
2164         # Write some shares
2165         d = defer.succeed(None)
2166         for i in xrange(6):
2167             d.addCallback(lambda ignored, i=i:
2168                 mw0.put_block(self.block, i, self.salt))
2169
2170         # Try to write the share hash chain without writing the
2171         # encrypted private key
2172         d.addCallback(lambda ignored:
2173             self.shouldFail(LayoutInvalid, "share hash chain before "
2174                                            "private key",
2175                             None,
2176                             mw0.put_sharehashes, self.share_hash_chain))
2177         # Write the private key.
2178         d.addCallback(lambda ignored:
2179             mw0.put_encprivkey(self.encprivkey))
2180
2181         # Now write the block hashes and try again
2182         d.addCallback(lambda ignored:
2183             mw0.put_blockhashes(self.block_hash_tree))
2184
2185         # We haven't yet put the root hash on the share, so we shouldn't
2186         # be able to sign it.
2187         d.addCallback(lambda ignored:
2188             self.shouldFail(LayoutInvalid, "signature before root hash",
2189                             None, mw0.put_signature, self.signature))
2190
2191         d.addCallback(lambda ignored:
2192             self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
2193
2194         # ..and, since that fails, we also shouldn't be able to put the
2195         # verification key.
2196         d.addCallback(lambda ignored:
2197             self.shouldFail(LayoutInvalid, "key before signature",
2198                             None, mw0.put_verification_key,
2199                             self.verification_key))
2200
2201         # Now write the share hashes.
2202         d.addCallback(lambda ignored:
2203             mw0.put_sharehashes(self.share_hash_chain))
2204         # We should be able to write the root hash now too
2205         d.addCallback(lambda ignored:
2206             mw0.put_root_hash(self.root_hash))
2207
2208         # We should still be unable to put the verification key
2209         d.addCallback(lambda ignored:
2210             self.shouldFail(LayoutInvalid, "key before signature",
2211                             None, mw0.put_verification_key,
2212                             self.verification_key))
2213
2214         d.addCallback(lambda ignored:
2215             mw0.put_signature(self.signature))
2216
2217         # We shouldn't be able to write the offsets to the remote server
2218         # until the offset table is finished; IOW, until we have written
2219         # the verification key.
2220         d.addCallback(lambda ignored:
2221             self.shouldFail(LayoutInvalid, "offsets before verification key",
2222                             None,
2223                             mw0.finish_publishing))
2224
2225         d.addCallback(lambda ignored:
2226             mw0.put_verification_key(self.verification_key))
2227         return d
2228
2229
2230     def test_end_to_end(self):
2231         mw = self._make_new_mw("si1", 0)
2232         # Write a share using the mutable writer, and make sure that the
2233         # reader knows how to read everything back to us.
2234         d = defer.succeed(None)
2235         for i in xrange(6):
2236             d.addCallback(lambda ignored, i=i:
2237                 mw.put_block(self.block, i, self.salt))
2238         d.addCallback(lambda ignored:
2239             mw.put_encprivkey(self.encprivkey))
2240         d.addCallback(lambda ignored:
2241             mw.put_blockhashes(self.block_hash_tree))
2242         d.addCallback(lambda ignored:
2243             mw.put_sharehashes(self.share_hash_chain))
2244         d.addCallback(lambda ignored:
2245             mw.put_root_hash(self.root_hash))
2246         d.addCallback(lambda ignored:
2247             mw.put_signature(self.signature))
2248         d.addCallback(lambda ignored:
2249             mw.put_verification_key(self.verification_key))
2250         d.addCallback(lambda ignored:
2251             mw.finish_publishing())
2252
2253         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2254         def _check_block_and_salt((block, salt)):
2255             self.failUnlessEqual(block, self.block)
2256             self.failUnlessEqual(salt, self.salt)
2257
2258         for i in xrange(6):
2259             d.addCallback(lambda ignored, i=i:
2260                 mr.get_block_and_salt(i))
2261             d.addCallback(_check_block_and_salt)
2262
2263         d.addCallback(lambda ignored:
2264             mr.get_encprivkey())
2265         d.addCallback(lambda encprivkey:
2266             self.failUnlessEqual(self.encprivkey, encprivkey))
2267
2268         d.addCallback(lambda ignored:
2269             mr.get_blockhashes())
2270         d.addCallback(lambda blockhashes:
2271             self.failUnlessEqual(self.block_hash_tree, blockhashes))
2272
2273         d.addCallback(lambda ignored:
2274             mr.get_sharehashes())
2275         d.addCallback(lambda sharehashes:
2276             self.failUnlessEqual(self.share_hash_chain, sharehashes))
2277
2278         d.addCallback(lambda ignored:
2279             mr.get_signature())
2280         d.addCallback(lambda signature:
2281             self.failUnlessEqual(signature, self.signature))
2282
2283         d.addCallback(lambda ignored:
2284             mr.get_verification_key())
2285         d.addCallback(lambda verification_key:
2286             self.failUnlessEqual(verification_key, self.verification_key))
2287
2288         d.addCallback(lambda ignored:
2289             mr.get_seqnum())
2290         d.addCallback(lambda seqnum:
2291             self.failUnlessEqual(seqnum, 0))
2292
2293         d.addCallback(lambda ignored:
2294             mr.get_root_hash())
2295         d.addCallback(lambda root_hash:
2296             self.failUnlessEqual(self.root_hash, root_hash))
2297
2298         d.addCallback(lambda ignored:
2299             mr.get_encoding_parameters())
2300         def _check_encoding_parameters((k, n, segsize, datalen)):
2301             self.failUnlessEqual(k, 3)
2302             self.failUnlessEqual(n, 10)
2303             self.failUnlessEqual(segsize, 6)
2304             self.failUnlessEqual(datalen, 36)
2305         d.addCallback(_check_encoding_parameters)
2306
2307         d.addCallback(lambda ignored:
2308             mr.get_checkstring())
2309         d.addCallback(lambda checkstring:
2310             self.failUnlessEqual(checkstring, mw.get_checkstring()))
2311         return d
2312
2313
2314     def test_is_sdmf(self):
2315         # The MDMFSlotReadProxy should also know how to read SDMF files,
2316         # since it will encounter them on the grid. Callers use the
2317         # is_sdmf method to test this.
2318         self.write_sdmf_share_to_server("si1")
2319         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2320         d = mr.is_sdmf()
2321         d.addCallback(lambda issdmf:
2322             self.failUnless(issdmf))
2323         return d
2324
2325
2326     def test_reads_sdmf(self):
2327         # The slot read proxy should, naturally, know how to tell us
2328         # about data in the SDMF format
2329         self.write_sdmf_share_to_server("si1")
2330         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2331         d = defer.succeed(None)
2332         d.addCallback(lambda ignored:
2333             mr.is_sdmf())
2334         d.addCallback(lambda issdmf:
2335             self.failUnless(issdmf))
2336
2337         # What do we need to read?
2338         #  - The sharedata
2339         #  - The salt
2340         d.addCallback(lambda ignored:
2341             mr.get_block_and_salt(0))
2342         def _check_block_and_salt(results):
2343             block, salt = results
2344             # Our original file is 36 bytes long. Then each share is 12
2345             # bytes in size. The share is composed entirely of the
2346             # letter a. self.block contains 2 as, so 6 * self.block is
2347             # what we are looking for.
2348             self.failUnlessEqual(block, self.block * 6)
2349             self.failUnlessEqual(salt, self.salt)
2350         d.addCallback(_check_block_and_salt)
2351
2352         #  - The blockhashes
2353         d.addCallback(lambda ignored:
2354             mr.get_blockhashes())
2355         d.addCallback(lambda blockhashes:
2356             self.failUnlessEqual(self.block_hash_tree,
2357                                  blockhashes,
2358                                  blockhashes))
2359         #  - The sharehashes
2360         d.addCallback(lambda ignored:
2361             mr.get_sharehashes())
2362         d.addCallback(lambda sharehashes:
2363             self.failUnlessEqual(self.share_hash_chain,
2364                                  sharehashes))
2365         #  - The keys
2366         d.addCallback(lambda ignored:
2367             mr.get_encprivkey())
2368         d.addCallback(lambda encprivkey:
2369             self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
2370         d.addCallback(lambda ignored:
2371             mr.get_verification_key())
2372         d.addCallback(lambda verification_key:
2373             self.failUnlessEqual(verification_key,
2374                                  self.verification_key,
2375                                  verification_key))
2376         #  - The signature
2377         d.addCallback(lambda ignored:
2378             mr.get_signature())
2379         d.addCallback(lambda signature:
2380             self.failUnlessEqual(signature, self.signature, signature))
2381
2382         #  - The sequence number
2383         d.addCallback(lambda ignored:
2384             mr.get_seqnum())
2385         d.addCallback(lambda seqnum:
2386             self.failUnlessEqual(seqnum, 0, seqnum))
2387
2388         #  - The root hash
2389         d.addCallback(lambda ignored:
2390             mr.get_root_hash())
2391         d.addCallback(lambda root_hash:
2392             self.failUnlessEqual(root_hash, self.root_hash, root_hash))
2393         return d
2394
2395
2396     def test_only_reads_one_segment_sdmf(self):
2397         # SDMF shares have only one segment, so it doesn't make sense to
2398         # read more segments than that. The reader should know this and
2399         # complain if we try to do that.
2400         self.write_sdmf_share_to_server("si1")
2401         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2402         d = defer.succeed(None)
2403         d.addCallback(lambda ignored:
2404             mr.is_sdmf())
2405         d.addCallback(lambda issdmf:
2406             self.failUnless(issdmf))
2407         d.addCallback(lambda ignored:
2408             self.shouldFail(LayoutInvalid, "test bad segment",
2409                             None,
2410                             mr.get_block_and_salt, 1))
2411         return d
2412
2413
2414     def test_read_with_prefetched_mdmf_data(self):
2415         # The MDMFSlotReadProxy will prefill certain fields if you pass
2416         # it data that you have already fetched. This is useful for
2417         # cases like the Servermap, which prefetches ~2kb of data while
2418         # finding out which shares are on the remote peer so that it
2419         # doesn't waste round trips.
2420         mdmf_data = self.build_test_mdmf_share()
2421         self.write_test_share_to_server("si1")
2422         def _make_mr(ignored, length):
2423             mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
2424             return mr
2425
2426         d = defer.succeed(None)
2427         # This should be enough to fill in both the encoding parameters
2428         # and the table of offsets, which will complete the version
2429         # information tuple.
2430         d.addCallback(_make_mr, 123)
2431         d.addCallback(lambda mr:
2432             mr.get_verinfo())
2433         def _check_verinfo(verinfo):
2434             self.failUnless(verinfo)
2435             self.failUnlessEqual(len(verinfo), 9)
2436             (seqnum,
2437              root_hash,
2438              salt_hash,
2439              segsize,
2440              datalen,
2441              k,
2442              n,
2443              prefix,
2444              offsets) = verinfo
2445             self.failUnlessEqual(seqnum, 0)
2446             self.failUnlessEqual(root_hash, self.root_hash)
2447             self.failUnlessEqual(segsize, 6)
2448             self.failUnlessEqual(datalen, 36)
2449             self.failUnlessEqual(k, 3)
2450             self.failUnlessEqual(n, 10)
2451             expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
2452                                           1,
2453                                           seqnum,
2454                                           root_hash,
2455                                           k,
2456                                           n,
2457                                           segsize,
2458                                           datalen)
2459             self.failUnlessEqual(expected_prefix, prefix)
2460             self.failUnlessEqual(self.rref.read_count, 0)
2461         d.addCallback(_check_verinfo)
2462         # This is not enough data to read a block and a share, so the
2463         # wrapper should attempt to read this from the remote server.
2464         d.addCallback(_make_mr, 123)
2465         d.addCallback(lambda mr:
2466             mr.get_block_and_salt(0))
2467         def _check_block_and_salt((block, salt)):
2468             self.failUnlessEqual(block, self.block)
2469             self.failUnlessEqual(salt, self.salt)
2470             self.failUnlessEqual(self.rref.read_count, 1)
2471         # This should be enough data to read one block.
2472         d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140)
2473         d.addCallback(lambda mr:
2474             mr.get_block_and_salt(0))
2475         d.addCallback(_check_block_and_salt)
2476         return d
2477
2478
2479     def test_read_with_prefetched_sdmf_data(self):
2480         sdmf_data = self.build_test_sdmf_share()
2481         self.write_sdmf_share_to_server("si1")
2482         def _make_mr(ignored, length):
2483             mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
2484             return mr
2485
2486         d = defer.succeed(None)
2487         # This should be enough to get us the encoding parameters,
2488         # offset table, and everything else we need to build a verinfo
2489         # string.
2490         d.addCallback(_make_mr, 123)
2491         d.addCallback(lambda mr:
2492             mr.get_verinfo())
2493         def _check_verinfo(verinfo):
2494             self.failUnless(verinfo)
2495             self.failUnlessEqual(len(verinfo), 9)
2496             (seqnum,
2497              root_hash,
2498              salt,
2499              segsize,
2500              datalen,
2501              k,
2502              n,
2503              prefix,
2504              offsets) = verinfo
2505             self.failUnlessEqual(seqnum, 0)
2506             self.failUnlessEqual(root_hash, self.root_hash)
2507             self.failUnlessEqual(salt, self.salt)
2508             self.failUnlessEqual(segsize, 36)
2509             self.failUnlessEqual(datalen, 36)
2510             self.failUnlessEqual(k, 3)
2511             self.failUnlessEqual(n, 10)
2512             expected_prefix = struct.pack(SIGNED_PREFIX,
2513                                           0,
2514                                           seqnum,
2515                                           root_hash,
2516                                           salt,
2517                                           k,
2518                                           n,
2519                                           segsize,
2520                                           datalen)
2521             self.failUnlessEqual(expected_prefix, prefix)
2522             self.failUnlessEqual(self.rref.read_count, 0)
2523         d.addCallback(_check_verinfo)
2524         # This shouldn't be enough to read any share data.
2525         d.addCallback(_make_mr, 123)
2526         d.addCallback(lambda mr:
2527             mr.get_block_and_salt(0))
2528         def _check_block_and_salt((block, salt)):
2529             self.failUnlessEqual(block, self.block * 6)
2530             self.failUnlessEqual(salt, self.salt)
2531             # TODO: Fix the read routine so that it reads only the data
2532             #       that it has cached if it can't read all of it.
2533             self.failUnlessEqual(self.rref.read_count, 2)
2534
2535         # This should be enough to read share data.
2536         d.addCallback(_make_mr, self.offsets['share_data'])
2537         d.addCallback(lambda mr:
2538             mr.get_block_and_salt(0))
2539         d.addCallback(_check_block_and_salt)
2540         return d
2541
2542
2543     def test_read_with_empty_mdmf_file(self):
2544         # Some tests upload a file with no contents to test things
2545         # unrelated to the actual handling of the content of the file.
2546         # The reader should behave intelligently in these cases.
2547         self.write_test_share_to_server("si1", empty=True)
2548         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2549         # We should be able to get the encoding parameters, and they
2550         # should be correct.
2551         d = defer.succeed(None)
2552         d.addCallback(lambda ignored:
2553             mr.get_encoding_parameters())
2554         def _check_encoding_parameters(params):
2555             self.failUnlessEqual(len(params), 4)
2556             k, n, segsize, datalen = params
2557             self.failUnlessEqual(k, 3)
2558             self.failUnlessEqual(n, 10)
2559             self.failUnlessEqual(segsize, 0)
2560             self.failUnlessEqual(datalen, 0)
2561         d.addCallback(_check_encoding_parameters)
2562
2563         # We should not be able to fetch a block, since there are no
2564         # blocks to fetch
2565         d.addCallback(lambda ignored:
2566             self.shouldFail(LayoutInvalid, "get block on empty file",
2567                             None,
2568                             mr.get_block_and_salt, 0))
2569         return d
2570
2571
2572     def test_read_with_empty_sdmf_file(self):
2573         self.write_sdmf_share_to_server("si1", empty=True)
2574         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2575         # We should be able to get the encoding parameters, and they
2576         # should be correct
2577         d = defer.succeed(None)
2578         d.addCallback(lambda ignored:
2579             mr.get_encoding_parameters())
2580         def _check_encoding_parameters(params):
2581             self.failUnlessEqual(len(params), 4)
2582             k, n, segsize, datalen = params
2583             self.failUnlessEqual(k, 3)
2584             self.failUnlessEqual(n, 10)
2585             self.failUnlessEqual(segsize, 0)
2586             self.failUnlessEqual(datalen, 0)
2587         d.addCallback(_check_encoding_parameters)
2588
2589         # It does not make sense to get a block in this format, so we
2590         # should not be able to.
2591         d.addCallback(lambda ignored:
2592             self.shouldFail(LayoutInvalid, "get block on an empty file",
2593                             None,
2594                             mr.get_block_and_salt, 0))
2595         return d
2596
2597
2598     def test_verinfo_with_sdmf_file(self):
2599         self.write_sdmf_share_to_server("si1")
2600         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2601         # We should be able to get the version information.
2602         d = defer.succeed(None)
2603         d.addCallback(lambda ignored:
2604             mr.get_verinfo())
2605         def _check_verinfo(verinfo):
2606             self.failUnless(verinfo)
2607             self.failUnlessEqual(len(verinfo), 9)
2608             (seqnum,
2609              root_hash,
2610              salt,
2611              segsize,
2612              datalen,
2613              k,
2614              n,
2615              prefix,
2616              offsets) = verinfo
2617             self.failUnlessEqual(seqnum, 0)
2618             self.failUnlessEqual(root_hash, self.root_hash)
2619             self.failUnlessEqual(salt, self.salt)
2620             self.failUnlessEqual(segsize, 36)
2621             self.failUnlessEqual(datalen, 36)
2622             self.failUnlessEqual(k, 3)
2623             self.failUnlessEqual(n, 10)
2624             expected_prefix = struct.pack(">BQ32s16s BBQQ",
2625                                           0,
2626                                           seqnum,
2627                                           root_hash,
2628                                           salt,
2629                                           k,
2630                                           n,
2631                                           segsize,
2632                                           datalen)
2633             self.failUnlessEqual(prefix, expected_prefix)
2634             self.failUnlessEqual(offsets, self.offsets)
2635         d.addCallback(_check_verinfo)
2636         return d
2637
2638
2639     def test_verinfo_with_mdmf_file(self):
2640         self.write_test_share_to_server("si1")
2641         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2642         d = defer.succeed(None)
2643         d.addCallback(lambda ignored:
2644             mr.get_verinfo())
2645         def _check_verinfo(verinfo):
2646             self.failUnless(verinfo)
2647             self.failUnlessEqual(len(verinfo), 9)
2648             (seqnum,
2649              root_hash,
2650              IV,
2651              segsize,
2652              datalen,
2653              k,
2654              n,
2655              prefix,
2656              offsets) = verinfo
2657             self.failUnlessEqual(seqnum, 0)
2658             self.failUnlessEqual(root_hash, self.root_hash)
2659             self.failIf(IV)
2660             self.failUnlessEqual(segsize, 6)
2661             self.failUnlessEqual(datalen, 36)
2662             self.failUnlessEqual(k, 3)
2663             self.failUnlessEqual(n, 10)
2664             expected_prefix = struct.pack(">BQ32s BBQQ",
2665                                           1,
2666                                           seqnum,
2667                                           root_hash,
2668                                           k,
2669                                           n,
2670                                           segsize,
2671                                           datalen)
2672             self.failUnlessEqual(prefix, expected_prefix)
2673             self.failUnlessEqual(offsets, self.offsets)
2674         d.addCallback(_check_verinfo)
2675         return d
2676
2677
2678     def test_sdmf_writer(self):
2679         # Go through the motions of writing an SDMF share to the storage
2680         # server. Then read the storage server to see that the share got
2681         # written in the way that we think it should have. 
2682
2683         # We do this first so that the necessary instance variables get
2684         # set the way we want them for the tests below.
2685         data = self.build_test_sdmf_share()
2686         sdmfr = SDMFSlotWriteProxy(0,
2687                                    self.rref,
2688                                    "si1",
2689                                    self.secrets,
2690                                    0, 3, 10, 36, 36)
2691         # Put the block and salt.
2692         sdmfr.put_block(self.blockdata, 0, self.salt)
2693
2694         # Put the encprivkey
2695         sdmfr.put_encprivkey(self.encprivkey)
2696
2697         # Put the block and share hash chains
2698         sdmfr.put_blockhashes(self.block_hash_tree)
2699         sdmfr.put_sharehashes(self.share_hash_chain)
2700         sdmfr.put_root_hash(self.root_hash)
2701
2702         # Put the signature
2703         sdmfr.put_signature(self.signature)
2704
2705         # Put the verification key
2706         sdmfr.put_verification_key(self.verification_key)
2707
2708         # Now check to make sure that nothing has been written yet.
2709         self.failUnlessEqual(self.rref.write_count, 0)
2710
2711         # Now finish publishing
2712         d = sdmfr.finish_publishing()
2713         def _then(ignored):
2714             self.failUnlessEqual(self.rref.write_count, 1)
2715             read = self.ss.remote_slot_readv
2716             self.failUnlessEqual(read("si1", [0], [(0, len(data))]),
2717                                  {0: [data]})
2718         d.addCallback(_then)
2719         return d
2720
2721
2722     def test_sdmf_writer_preexisting_share(self):
2723         data = self.build_test_sdmf_share()
2724         self.write_sdmf_share_to_server("si1")
2725
2726         # Now there is a share on the storage server. To successfully
2727         # write, we need to set the checkstring correctly. When we
2728         # don't, no write should occur.
2729         sdmfw = SDMFSlotWriteProxy(0,
2730                                    self.rref,
2731                                    "si1",
2732                                    self.secrets,
2733                                    1, 3, 10, 36, 36)
2734         sdmfw.put_block(self.blockdata, 0, self.salt)
2735
2736         # Put the encprivkey
2737         sdmfw.put_encprivkey(self.encprivkey)
2738
2739         # Put the block and share hash chains
2740         sdmfw.put_blockhashes(self.block_hash_tree)
2741         sdmfw.put_sharehashes(self.share_hash_chain)
2742
2743         # Put the root hash
2744         sdmfw.put_root_hash(self.root_hash)
2745
2746         # Put the signature
2747         sdmfw.put_signature(self.signature)
2748
2749         # Put the verification key
2750         sdmfw.put_verification_key(self.verification_key)
2751
2752         # We shouldn't have a checkstring yet
2753         self.failUnlessEqual(sdmfw.get_checkstring(), "")
2754
2755         d = sdmfw.finish_publishing()
2756         def _then(results):
2757             self.failIf(results[0])
2758             # this is the correct checkstring
2759             self._expected_checkstring = results[1][0][0]
2760             return self._expected_checkstring
2761
2762         d.addCallback(_then)
2763         d.addCallback(sdmfw.set_checkstring)
2764         d.addCallback(lambda ignored:
2765             sdmfw.get_checkstring())
2766         d.addCallback(lambda checkstring:
2767             self.failUnlessEqual(checkstring, self._expected_checkstring))
2768         d.addCallback(lambda ignored:
2769             sdmfw.finish_publishing())
2770         def _then_again(results):
2771             self.failUnless(results[0])
2772             read = self.ss.remote_slot_readv
2773             self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2774                                  {0: [struct.pack(">Q", 1)]})
2775             self.failUnlessEqual(read("si1", [0], [(9, len(data) - 9)]),
2776                                  {0: [data[9:]]})
2777         d.addCallback(_then_again)
2778         return d
2779
2780
2781 class Stats(unittest.TestCase):
2782
2783     def setUp(self):
2784         self.sparent = LoggingServiceParent()
2785         self._lease_secret = itertools.count()
2786     def tearDown(self):
2787         return self.sparent.stopService()
2788
2789     def workdir(self, name):
2790         basedir = os.path.join("storage", "Server", name)
2791         return basedir
2792
2793     def create(self, name):
2794         workdir = self.workdir(name)
2795         ss = StorageServer(workdir, "\x00" * 20)
2796         ss.setServiceParent(self.sparent)
2797         return ss
2798
2799     def test_latencies(self):
2800         ss = self.create("test_latencies")
2801         for i in range(10000):
2802             ss.add_latency("allocate", 1.0 * i)
2803         for i in range(1000):
2804             ss.add_latency("renew", 1.0 * i)
2805         for i in range(20):
2806             ss.add_latency("write", 1.0 * i)
2807         for i in range(10):
2808             ss.add_latency("cancel", 2.0 * i)
2809         ss.add_latency("get", 5.0)
2810
2811         output = ss.get_latencies()
2812
2813         self.failUnlessEqual(sorted(output.keys()),
2814                              sorted(["allocate", "renew", "cancel", "write", "get"]))
2815         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
2816         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
2817         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
2818         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
2819         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
2820         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
2821         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
2822         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
2823         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
2824
2825         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
2826         self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
2827         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1, output)
2828         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
2829         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
2830         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
2831         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
2832         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
2833         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
2834
2835         self.failUnlessEqual(len(ss.latencies["write"]), 20)
2836         self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
2837         self.failUnless(output["write"]["01_0_percentile"] is None, output)
2838         self.failUnless(abs(output["write"]["10_0_percentile"] -  2) < 1, output)
2839         self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
2840         self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
2841         self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
2842         self.failUnless(output["write"]["99_0_percentile"] is None, output)
2843         self.failUnless(output["write"]["99_9_percentile"] is None, output)
2844
2845         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
2846         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
2847         self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
2848         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1, output)
2849         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
2850         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
2851         self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
2852         self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
2853         self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
2854
2855         self.failUnlessEqual(len(ss.latencies["get"]), 1)
2856         self.failUnless(output["get"]["mean"] is None, output)
2857         self.failUnless(output["get"]["01_0_percentile"] is None, output)
2858         self.failUnless(output["get"]["10_0_percentile"] is None, output)
2859         self.failUnless(output["get"]["50_0_percentile"] is None, output)
2860         self.failUnless(output["get"]["90_0_percentile"] is None, output)
2861         self.failUnless(output["get"]["95_0_percentile"] is None, output)
2862         self.failUnless(output["get"]["99_0_percentile"] is None, output)
2863         self.failUnless(output["get"]["99_9_percentile"] is None, output)
2864
2865 def remove_tags(s):
2866     s = re.sub(r'<[^>]*>', ' ', s)
2867     s = re.sub(r'\s+', ' ', s)
2868     return s
2869
2870 class MyBucketCountingCrawler(BucketCountingCrawler):
2871     def finished_prefix(self, cycle, prefix):
2872         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
2873         if self.hook_ds:
2874             d = self.hook_ds.pop(0)
2875             d.callback(None)
2876
2877 class MyStorageServer(StorageServer):
2878     def add_bucket_counter(self):
2879         statefile = os.path.join(self.storedir, "bucket_counter.state")
2880         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
2881         self.bucket_counter.setServiceParent(self)
2882
2883 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
2884
2885     def setUp(self):
2886         self.s = service.MultiService()
2887         self.s.startService()
2888     def tearDown(self):
2889         return self.s.stopService()
2890
2891     def test_bucket_counter(self):
2892         basedir = "storage/BucketCounter/bucket_counter"
2893         fileutil.make_dirs(basedir)
2894         ss = StorageServer(basedir, "\x00" * 20)
2895         # to make sure we capture the bucket-counting-crawler in the middle
2896         # of a cycle, we reach in and reduce its maximum slice time to 0. We
2897         # also make it start sooner than usual.
2898         ss.bucket_counter.slow_start = 0
2899         orig_cpu_slice = ss.bucket_counter.cpu_slice
2900         ss.bucket_counter.cpu_slice = 0
2901         ss.setServiceParent(self.s)
2902
2903         w = StorageStatus(ss)
2904
2905         # this sample is before the crawler has started doing anything
2906         html = w.renderSynchronously()
2907         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2908         s = remove_tags(html)
2909         self.failUnlessIn("Accepting new shares: Yes", s)
2910         self.failUnlessIn("Reserved space: - 0 B (0)", s)
2911         self.failUnlessIn("Total buckets: Not computed yet", s)
2912         self.failUnlessIn("Next crawl in", s)
2913
2914         # give the bucket-counting-crawler one tick to get started. The
2915         # cpu_slice=0 will force it to yield right after it processes the
2916         # first prefix
2917
2918         d = fireEventually()
2919         def _check(ignored):
2920             # are we really right after the first prefix?
2921             state = ss.bucket_counter.get_state()
2922             if state["last-complete-prefix"] is None:
2923                 d2 = fireEventually()
2924                 d2.addCallback(_check)
2925                 return d2
2926             self.failUnlessEqual(state["last-complete-prefix"],
2927                                  ss.bucket_counter.prefixes[0])
2928             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2929             html = w.renderSynchronously()
2930             s = remove_tags(html)
2931             self.failUnlessIn(" Current crawl ", s)
2932             self.failUnlessIn(" (next work in ", s)
2933         d.addCallback(_check)
2934
2935         # now give it enough time to complete a full cycle
2936         def _watch():
2937             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2938         d.addCallback(lambda ignored: self.poll(_watch))
2939         def _check2(ignored):
2940             ss.bucket_counter.cpu_slice = orig_cpu_slice
2941             html = w.renderSynchronously()
2942             s = remove_tags(html)
2943             self.failUnlessIn("Total buckets: 0 (the number of", s)
2944             self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
2945         d.addCallback(_check2)
2946         return d
2947
2948     def test_bucket_counter_cleanup(self):
2949         basedir = "storage/BucketCounter/bucket_counter_cleanup"
2950         fileutil.make_dirs(basedir)
2951         ss = StorageServer(basedir, "\x00" * 20)
2952         # to make sure we capture the bucket-counting-crawler in the middle
2953         # of a cycle, we reach in and reduce its maximum slice time to 0.
2954         ss.bucket_counter.slow_start = 0
2955         orig_cpu_slice = ss.bucket_counter.cpu_slice
2956         ss.bucket_counter.cpu_slice = 0
2957         ss.setServiceParent(self.s)
2958
2959         d = fireEventually()
2960
2961         def _after_first_prefix(ignored):
2962             state = ss.bucket_counter.state
2963             if state["last-complete-prefix"] is None:
2964                 d2 = fireEventually()
2965                 d2.addCallback(_after_first_prefix)
2966                 return d2
2967             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2968             # now sneak in and mess with its state, to make sure it cleans up
2969             # properly at the end of the cycle
2970             self.failUnlessEqual(state["last-complete-prefix"],
2971                                  ss.bucket_counter.prefixes[0])
2972             state["bucket-counts"][-12] = {}
2973             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
2974             ss.bucket_counter.save_state()
2975         d.addCallback(_after_first_prefix)
2976
2977         # now give it enough time to complete a cycle
2978         def _watch():
2979             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2980         d.addCallback(lambda ignored: self.poll(_watch))
2981         def _check2(ignored):
2982             ss.bucket_counter.cpu_slice = orig_cpu_slice
2983             s = ss.bucket_counter.get_state()
2984             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
2985             self.failIf("bogusprefix!" in s["storage-index-samples"],
2986                         s["storage-index-samples"].keys())
2987         d.addCallback(_check2)
2988         return d
2989
2990     def test_bucket_counter_eta(self):
2991         basedir = "storage/BucketCounter/bucket_counter_eta"
2992         fileutil.make_dirs(basedir)
2993         ss = MyStorageServer(basedir, "\x00" * 20)
2994         ss.bucket_counter.slow_start = 0
2995         # these will be fired inside finished_prefix()
2996         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
2997         w = StorageStatus(ss)
2998
2999         d = defer.Deferred()
3000
3001         def _check_1(ignored):
3002             # no ETA is available yet
3003             html = w.renderSynchronously()
3004             s = remove_tags(html)
3005             self.failUnlessIn("complete (next work", s)
3006
3007         def _check_2(ignored):
3008             # one prefix has finished, so an ETA based upon that elapsed time
3009             # should be available.
3010             html = w.renderSynchronously()
3011             s = remove_tags(html)
3012             self.failUnlessIn("complete (ETA ", s)
3013
3014         def _check_3(ignored):
3015             # two prefixes have finished
3016             html = w.renderSynchronously()
3017             s = remove_tags(html)
3018             self.failUnlessIn("complete (ETA ", s)
3019             d.callback("done")
3020
3021         hooks[0].addCallback(_check_1).addErrback(d.errback)
3022         hooks[1].addCallback(_check_2).addErrback(d.errback)
3023         hooks[2].addCallback(_check_3).addErrback(d.errback)
3024
3025         ss.setServiceParent(self.s)
3026         return d
3027
3028 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
3029     stop_after_first_bucket = False
3030     def process_bucket(self, *args, **kwargs):
3031         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
3032         if self.stop_after_first_bucket:
3033             self.stop_after_first_bucket = False
3034             self.cpu_slice = -1.0
3035     def yielding(self, sleep_time):
3036         if not self.stop_after_first_bucket:
3037             self.cpu_slice = 500
3038
3039 class BrokenStatResults:
3040     pass
3041 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
3042     def stat(self, fn):
3043         s = os.stat(fn)
3044         bsr = BrokenStatResults()
3045         for attrname in dir(s):
3046             if attrname.startswith("_"):
3047                 continue
3048             if attrname == "st_blocks":
3049                 continue
3050             setattr(bsr, attrname, getattr(s, attrname))
3051         return bsr
3052
3053 class InstrumentedStorageServer(StorageServer):
3054     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
3055 class No_ST_BLOCKS_StorageServer(StorageServer):
3056     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
3057
3058 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3059
3060     def setUp(self):
3061         self.s = service.MultiService()
3062         self.s.startService()
3063     def tearDown(self):
3064         return self.s.stopService()
3065
3066     def make_shares(self, ss):
3067         def make(si):
3068             return (si, hashutil.tagged_hash("renew", si),
3069                     hashutil.tagged_hash("cancel", si))
3070         def make_mutable(si):
3071             return (si, hashutil.tagged_hash("renew", si),
3072                     hashutil.tagged_hash("cancel", si),
3073                     hashutil.tagged_hash("write-enabler", si))
3074         def make_extra_lease(si, num):
3075             return (hashutil.tagged_hash("renew-%d" % num, si),
3076                     hashutil.tagged_hash("cancel-%d" % num, si))
3077
3078         immutable_si_0, rs0, cs0 = make("\x00" * 16)
3079         immutable_si_1, rs1, cs1 = make("\x01" * 16)
3080         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
3081         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
3082         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
3083         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
3084         sharenums = [0]
3085         canary = FakeCanary()
3086         # note: 'tahoe debug dump-share' will not handle this file, since the
3087         # inner contents are not a valid CHK share
3088         data = "\xff" * 1000
3089
3090         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
3091                                          1000, canary)
3092         w[0].remote_write(0, data)
3093         w[0].remote_close()
3094
3095         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
3096                                          1000, canary)
3097         w[0].remote_write(0, data)
3098         w[0].remote_close()
3099         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
3100
3101         writev = ss.remote_slot_testv_and_readv_and_writev
3102         writev(mutable_si_2, (we2, rs2, cs2),
3103                {0: ([], [(0,data)], len(data))}, [])
3104         writev(mutable_si_3, (we3, rs3, cs3),
3105                {0: ([], [(0,data)], len(data))}, [])
3106         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
3107
3108         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
3109         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
3110         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
3111
3112     def test_basic(self):
3113         basedir = "storage/LeaseCrawler/basic"
3114         fileutil.make_dirs(basedir)
3115         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3116         # make it start sooner than usual.
3117         lc = ss.lease_checker
3118         lc.slow_start = 0
3119         lc.cpu_slice = 500
3120         lc.stop_after_first_bucket = True
3121         webstatus = StorageStatus(ss)
3122
3123         # create a few shares, with some leases on them
3124         self.make_shares(ss)
3125         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3126
3127         # add a non-sharefile to exercise another code path
3128         fn = os.path.join(ss.sharedir,
3129                           storage_index_to_dir(immutable_si_0),
3130                           "not-a-share")
3131         f = open(fn, "wb")
3132         f.write("I am not a share.\n")
3133         f.close()
3134
3135         # this is before the crawl has started, so we're not in a cycle yet
3136         initial_state = lc.get_state()
3137         self.failIf(lc.get_progress()["cycle-in-progress"])
3138         self.failIfIn("cycle-to-date", initial_state)
3139         self.failIfIn("estimated-remaining-cycle", initial_state)
3140         self.failIfIn("estimated-current-cycle", initial_state)
3141         self.failUnlessIn("history", initial_state)
3142         self.failUnlessEqual(initial_state["history"], {})
3143
3144         ss.setServiceParent(self.s)
3145
3146         DAY = 24*60*60
3147
3148         d = fireEventually()
3149
3150         # now examine the state right after the first bucket has been
3151         # processed.
3152         def _after_first_bucket(ignored):
3153             initial_state = lc.get_state()
3154             if "cycle-to-date" not in initial_state:
3155                 d2 = fireEventually()
3156                 d2.addCallback(_after_first_bucket)
3157                 return d2
3158             self.failUnlessIn("cycle-to-date", initial_state)
3159             self.failUnlessIn("estimated-remaining-cycle", initial_state)
3160             self.failUnlessIn("estimated-current-cycle", initial_state)
3161             self.failUnlessIn("history", initial_state)
3162             self.failUnlessEqual(initial_state["history"], {})
3163
3164             so_far = initial_state["cycle-to-date"]
3165             self.failUnlessEqual(so_far["expiration-enabled"], False)
3166             self.failUnlessIn("configured-expiration-mode", so_far)
3167             self.failUnlessIn("lease-age-histogram", so_far)
3168             lah = so_far["lease-age-histogram"]
3169             self.failUnlessEqual(type(lah), list)
3170             self.failUnlessEqual(len(lah), 1)
3171             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
3172             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
3173             self.failUnlessEqual(so_far["corrupt-shares"], [])
3174             sr1 = so_far["space-recovered"]
3175             self.failUnlessEqual(sr1["examined-buckets"], 1)
3176             self.failUnlessEqual(sr1["examined-shares"], 1)
3177             self.failUnlessEqual(sr1["actual-shares"], 0)
3178             self.failUnlessEqual(sr1["configured-diskbytes"], 0)
3179             self.failUnlessEqual(sr1["original-sharebytes"], 0)
3180             left = initial_state["estimated-remaining-cycle"]
3181             sr2 = left["space-recovered"]
3182             self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
3183             self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
3184             self.failIfEqual(sr2["actual-shares"], None)
3185             self.failIfEqual(sr2["configured-diskbytes"], None)
3186             self.failIfEqual(sr2["original-sharebytes"], None)
3187         d.addCallback(_after_first_bucket)
3188         d.addCallback(lambda ign: self.render1(webstatus))
3189         def _check_html_in_cycle(html):
3190             s = remove_tags(html)
3191             self.failUnlessIn("So far, this cycle has examined "
3192                               "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
3193             self.failUnlessIn("and has recovered: "
3194                               "0 shares, 0 buckets (0 mutable / 0 immutable), "
3195                               "0 B (0 B / 0 B)", s)
3196             self.failUnlessIn("If expiration were enabled, "
3197                               "we would have recovered: "
3198                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3199                               " 0 B (0 B / 0 B) by now", s)
3200             self.failUnlessIn("and the remainder of this cycle "
3201                               "would probably recover: "
3202                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3203                               " 0 B (0 B / 0 B)", s)
3204             self.failUnlessIn("and the whole cycle would probably recover: "
3205                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3206                               " 0 B (0 B / 0 B)", s)
3207             self.failUnlessIn("if we were strictly using each lease's default "
3208                               "31-day lease lifetime", s)
3209             self.failUnlessIn("this cycle would be expected to recover: ", s)
3210         d.addCallback(_check_html_in_cycle)
3211
3212         # wait for the crawler to finish the first cycle. Nothing should have
3213         # been removed.
3214         def _wait():
3215             return bool(lc.get_state()["last-cycle-finished"] is not None)
3216         d.addCallback(lambda ign: self.poll(_wait))
3217
3218         def _after_first_cycle(ignored):
3219             s = lc.get_state()
3220             self.failIf("cycle-to-date" in s)
3221             self.failIf("estimated-remaining-cycle" in s)
3222             self.failIf("estimated-current-cycle" in s)
3223             last = s["history"][0]
3224             self.failUnlessIn("cycle-start-finish-times", last)
3225             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
3226             self.failUnlessEqual(last["expiration-enabled"], False)
3227             self.failUnlessIn("configured-expiration-mode", last)
3228
3229             self.failUnlessIn("lease-age-histogram", last)
3230             lah = last["lease-age-histogram"]
3231             self.failUnlessEqual(type(lah), list)
3232             self.failUnlessEqual(len(lah), 1)
3233             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
3234
3235             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3236             self.failUnlessEqual(last["corrupt-shares"], [])
3237
3238             rec = last["space-recovered"]
3239             self.failUnlessEqual(rec["examined-buckets"], 4)
3240             self.failUnlessEqual(rec["examined-shares"], 4)
3241             self.failUnlessEqual(rec["actual-buckets"], 0)
3242             self.failUnlessEqual(rec["original-buckets"], 0)
3243             self.failUnlessEqual(rec["configured-buckets"], 0)
3244             self.failUnlessEqual(rec["actual-shares"], 0)
3245             self.failUnlessEqual(rec["original-shares"], 0)
3246             self.failUnlessEqual(rec["configured-shares"], 0)
3247             self.failUnlessEqual(rec["actual-diskbytes"], 0)
3248             self.failUnlessEqual(rec["original-diskbytes"], 0)
3249             self.failUnlessEqual(rec["configured-diskbytes"], 0)
3250             self.failUnlessEqual(rec["actual-sharebytes"], 0)
3251             self.failUnlessEqual(rec["original-sharebytes"], 0)
3252             self.failUnlessEqual(rec["configured-sharebytes"], 0)
3253
3254             def _get_sharefile(si):
3255                 return list(ss._iter_share_files(si))[0]
3256             def count_leases(si):
3257                 return len(list(_get_sharefile(si).get_leases()))
3258             self.failUnlessEqual(count_leases(immutable_si_0), 1)
3259             self.failUnlessEqual(count_leases(immutable_si_1), 2)
3260             self.failUnlessEqual(count_leases(mutable_si_2), 1)
3261             self.failUnlessEqual(count_leases(mutable_si_3), 2)
3262         d.addCallback(_after_first_cycle)
3263         d.addCallback(lambda ign: self.render1(webstatus))
3264         def _check_html(html):
3265             s = remove_tags(html)
3266             self.failUnlessIn("recovered: 0 shares, 0 buckets "
3267                               "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
3268             self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
3269                               "(2 mutable / 2 immutable),", s)
3270             self.failUnlessIn("but expiration was not enabled", s)
3271         d.addCallback(_check_html)
3272         d.addCallback(lambda ign: self.render_json(webstatus))
3273         def _check_json(json):
3274             data = simplejson.loads(json)
3275             self.failUnlessIn("lease-checker", data)
3276             self.failUnlessIn("lease-checker-progress", data)
3277         d.addCallback(_check_json)
3278         return d
3279
3280     def backdate_lease(self, sf, renew_secret, new_expire_time):
3281         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
3282         # "renew" a lease with a new_expire_time that is older than what the
3283         # current lease has), so we have to reach inside it.
3284         for i,lease in enumerate(sf.get_leases()):
3285             if lease.renew_secret == renew_secret:
3286                 lease.expiration_time = new_expire_time
3287                 f = open(sf.home, 'rb+')
3288                 sf._write_lease_record(f, i, lease)
3289                 f.close()
3290                 return
3291         raise IndexError("unable to renew non-existent lease")
3292
3293     def test_expire_age(self):
3294         basedir = "storage/LeaseCrawler/expire_age"
3295         fileutil.make_dirs(basedir)
3296         # setting expiration_time to 2000 means that any lease which is more
3297         # than 2000s old will be expired.
3298         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3299                                        expiration_enabled=True,
3300                                        expiration_mode="age",
3301                                        expiration_override_lease_duration=2000)
3302         # make it start sooner than usual.
3303         lc = ss.lease_checker
3304         lc.slow_start = 0
3305         lc.stop_after_first_bucket = True
3306         webstatus = StorageStatus(ss)
3307
3308         # create a few shares, with some leases on them
3309         self.make_shares(ss)
3310         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3311
3312         def count_shares(si):
3313             return len(list(ss._iter_share_files(si)))
3314         def _get_sharefile(si):
3315             return list(ss._iter_share_files(si))[0]
3316         def count_leases(si):
3317             return len(list(_get_sharefile(si).get_leases()))
3318
3319         self.failUnlessEqual(count_shares(immutable_si_0), 1)
3320         self.failUnlessEqual(count_leases(immutable_si_0), 1)
3321         self.failUnlessEqual(count_shares(immutable_si_1), 1)
3322         self.failUnlessEqual(count_leases(immutable_si_1), 2)
3323         self.failUnlessEqual(count_shares(mutable_si_2), 1)
3324         self.failUnlessEqual(count_leases(mutable_si_2), 1)
3325         self.failUnlessEqual(count_shares(mutable_si_3), 1)
3326         self.failUnlessEqual(count_leases(mutable_si_3), 2)
3327
3328         # artificially crank back the expiration time on the first lease of
3329         # each share, to make it look like it expired already (age=1000s).
3330         # Some shares have an extra lease which is set to expire at the
3331         # default time in 31 days from now (age=31days). We then run the
3332         # crawler, which will expire the first lease, making some shares get
3333         # deleted and others stay alive (with one remaining lease)
3334         now = time.time()
3335
3336         sf0 = _get_sharefile(immutable_si_0)
3337         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
3338         sf0_size = os.stat(sf0.home).st_size
3339
3340         # immutable_si_1 gets an extra lease
3341         sf1 = _get_sharefile(immutable_si_1)
3342         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
3343
3344         sf2 = _get_sharefile(mutable_si_2)
3345         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
3346         sf2_size = os.stat(sf2.home).st_size
3347
3348         # mutable_si_3 gets an extra lease
3349         sf3 = _get_sharefile(mutable_si_3)
3350         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
3351
3352         ss.setServiceParent(self.s)
3353
3354         d = fireEventually()
3355         # examine the state right after the first bucket has been processed
3356         def _after_first_bucket(ignored):
3357             p = lc.get_progress()
3358             if not p["cycle-in-progress"]:
3359                 d2 = fireEventually()
3360                 d2.addCallback(_after_first_bucket)
3361                 return d2
3362         d.addCallback(_after_first_bucket)
3363         d.addCallback(lambda ign: self.render1(webstatus))
3364         def _check_html_in_cycle(html):
3365             s = remove_tags(html)
3366             # the first bucket encountered gets deleted, and its prefix
3367             # happens to be about 1/5th of the way through the ring, so the
3368             # predictor thinks we'll have 5 shares and that we'll delete them
3369             # all. This part of the test depends upon the SIs landing right
3370             # where they do now.
3371             self.failUnlessIn("The remainder of this cycle is expected to "
3372                               "recover: 4 shares, 4 buckets", s)
3373             self.failUnlessIn("The whole cycle is expected to examine "
3374                               "5 shares in 5 buckets and to recover: "
3375                               "5 shares, 5 buckets", s)
3376         d.addCallback(_check_html_in_cycle)
3377
3378         # wait for the crawler to finish the first cycle. Two shares should
3379         # have been removed
3380         def _wait():
3381             return bool(lc.get_state()["last-cycle-finished"] is not None)
3382         d.addCallback(lambda ign: self.poll(_wait))
3383
3384         def _after_first_cycle(ignored):
3385             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3386             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3387             self.failUnlessEqual(count_leases(immutable_si_1), 1)
3388             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3389             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3390             self.failUnlessEqual(count_leases(mutable_si_3), 1)
3391
3392             s = lc.get_state()
3393             last = s["history"][0]
3394
3395             self.failUnlessEqual(last["expiration-enabled"], True)
3396             self.failUnlessEqual(last["configured-expiration-mode"],
3397                                  ("age", 2000, None, ("mutable", "immutable")))
3398             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3399
3400             rec = last["space-recovered"]
3401             self.failUnlessEqual(rec["examined-buckets"], 4)
3402             self.failUnlessEqual(rec["examined-shares"], 4)
3403             self.failUnlessEqual(rec["actual-buckets"], 2)
3404             self.failUnlessEqual(rec["original-buckets"], 2)
3405             self.failUnlessEqual(rec["configured-buckets"], 2)
3406             self.failUnlessEqual(rec["actual-shares"], 2)
3407             self.failUnlessEqual(rec["original-shares"], 2)
3408             self.failUnlessEqual(rec["configured-shares"], 2)
3409             size = sf0_size + sf2_size
3410             self.failUnlessEqual(rec["actual-sharebytes"], size)
3411             self.failUnlessEqual(rec["original-sharebytes"], size)
3412             self.failUnlessEqual(rec["configured-sharebytes"], size)
3413             # different platforms have different notions of "blocks used by
3414             # this file", so merely assert that it's a number
3415             self.failUnless(rec["actual-diskbytes"] >= 0,
3416                             rec["actual-diskbytes"])
3417             self.failUnless(rec["original-diskbytes"] >= 0,
3418                             rec["original-diskbytes"])
3419             self.failUnless(rec["configured-diskbytes"] >= 0,
3420                             rec["configured-diskbytes"])
3421         d.addCallback(_after_first_cycle)
3422         d.addCallback(lambda ign: self.render1(webstatus))
3423         def _check_html(html):
3424             s = remove_tags(html)
3425             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
3426             self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
3427             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3428         d.addCallback(_check_html)
3429         return d
3430
3431     def test_expire_cutoff_date(self):
3432         basedir = "storage/LeaseCrawler/expire_cutoff_date"
3433         fileutil.make_dirs(basedir)
3434         # setting cutoff-date to 2000 seconds ago means that any lease which
3435         # is more than 2000s old will be expired.
3436         now = time.time()
3437         then = int(now - 2000)
3438         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3439                                        expiration_enabled=True,
3440                                        expiration_mode="cutoff-date",
3441                                        expiration_cutoff_date=then)
3442         # make it start sooner than usual.
3443         lc = ss.lease_checker
3444         lc.slow_start = 0
3445         lc.stop_after_first_bucket = True
3446         webstatus = StorageStatus(ss)
3447
3448         # create a few shares, with some leases on them
3449         self.make_shares(ss)
3450         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3451
3452         def count_shares(si):
3453             return len(list(ss._iter_share_files(si)))
3454         def _get_sharefile(si):
3455             return list(ss._iter_share_files(si))[0]
3456         def count_leases(si):
3457             return len(list(_get_sharefile(si).get_leases()))
3458
3459         self.failUnlessEqual(count_shares(immutable_si_0), 1)
3460         self.failUnlessEqual(count_leases(immutable_si_0), 1)
3461         self.failUnlessEqual(count_shares(immutable_si_1), 1)
3462         self.failUnlessEqual(count_leases(immutable_si_1), 2)
3463         self.failUnlessEqual(count_shares(mutable_si_2), 1)
3464         self.failUnlessEqual(count_leases(mutable_si_2), 1)
3465         self.failUnlessEqual(count_shares(mutable_si_3), 1)
3466         self.failUnlessEqual(count_leases(mutable_si_3), 2)
3467
3468         # artificially crank back the expiration time on the first lease of
3469         # each share, to make it look like was renewed 3000s ago. To achieve
3470         # this, we need to set the expiration time to now-3000+31days. This
3471         # will change when the lease format is improved to contain both
3472         # create/renew time and duration.
3473         new_expiration_time = now - 3000 + 31*24*60*60
3474
3475         # Some shares have an extra lease which is set to expire at the
3476         # default time in 31 days from now (age=31days). We then run the
3477         # crawler, which will expire the first lease, making some shares get
3478         # deleted and others stay alive (with one remaining lease)
3479
3480         sf0 = _get_sharefile(immutable_si_0)
3481         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3482         sf0_size = os.stat(sf0.home).st_size
3483
3484         # immutable_si_1 gets an extra lease
3485         sf1 = _get_sharefile(immutable_si_1)
3486         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3487
3488         sf2 = _get_sharefile(mutable_si_2)
3489         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3490         sf2_size = os.stat(sf2.home).st_size
3491
3492         # mutable_si_3 gets an extra lease
3493         sf3 = _get_sharefile(mutable_si_3)
3494         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3495
3496         ss.setServiceParent(self.s)
3497
3498         d = fireEventually()
3499         # examine the state right after the first bucket has been processed
3500         def _after_first_bucket(ignored):
3501             p = lc.get_progress()
3502             if not p["cycle-in-progress"]:
3503                 d2 = fireEventually()
3504                 d2.addCallback(_after_first_bucket)
3505                 return d2
3506         d.addCallback(_after_first_bucket)
3507         d.addCallback(lambda ign: self.render1(webstatus))
3508         def _check_html_in_cycle(html):
3509             s = remove_tags(html)
3510             # the first bucket encountered gets deleted, and its prefix
3511             # happens to be about 1/5th of the way through the ring, so the
3512             # predictor thinks we'll have 5 shares and that we'll delete them
3513             # all. This part of the test depends upon the SIs landing right
3514             # where they do now.
3515             self.failUnlessIn("The remainder of this cycle is expected to "
3516                               "recover: 4 shares, 4 buckets", s)
3517             self.failUnlessIn("The whole cycle is expected to examine "
3518                               "5 shares in 5 buckets and to recover: "
3519                               "5 shares, 5 buckets", s)
3520         d.addCallback(_check_html_in_cycle)
3521
3522         # wait for the crawler to finish the first cycle. Two shares should
3523         # have been removed
3524         def _wait():
3525             return bool(lc.get_state()["last-cycle-finished"] is not None)
3526         d.addCallback(lambda ign: self.poll(_wait))
3527
3528         def _after_first_cycle(ignored):
3529             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3530             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3531             self.failUnlessEqual(count_leases(immutable_si_1), 1)
3532             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3533             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3534             self.failUnlessEqual(count_leases(mutable_si_3), 1)
3535
3536             s = lc.get_state()
3537             last = s["history"][0]
3538
3539             self.failUnlessEqual(last["expiration-enabled"], True)
3540             self.failUnlessEqual(last["configured-expiration-mode"],
3541                                  ("cutoff-date", None, then,
3542                                   ("mutable", "immutable")))
3543             self.failUnlessEqual(last["leases-per-share-histogram"],
3544                                  {1: 2, 2: 2})
3545
3546             rec = last["space-recovered"]
3547             self.failUnlessEqual(rec["examined-buckets"], 4)
3548             self.failUnlessEqual(rec["examined-shares"], 4)
3549             self.failUnlessEqual(rec["actual-buckets"], 2)
3550             self.failUnlessEqual(rec["original-buckets"], 0)
3551             self.failUnlessEqual(rec["configured-buckets"], 2)
3552             self.failUnlessEqual(rec["actual-shares"], 2)
3553             self.failUnlessEqual(rec["original-shares"], 0)
3554             self.failUnlessEqual(rec["configured-shares"], 2)
3555             size = sf0_size + sf2_size
3556             self.failUnlessEqual(rec["actual-sharebytes"], size)
3557             self.failUnlessEqual(rec["original-sharebytes"], 0)
3558             self.failUnlessEqual(rec["configured-sharebytes"], size)
3559             # different platforms have different notions of "blocks used by
3560             # this file", so merely assert that it's a number
3561             self.failUnless(rec["actual-diskbytes"] >= 0,
3562                             rec["actual-diskbytes"])
3563             self.failUnless(rec["original-diskbytes"] >= 0,
3564                             rec["original-diskbytes"])
3565             self.failUnless(rec["configured-diskbytes"] >= 0,
3566                             rec["configured-diskbytes"])
3567         d.addCallback(_after_first_cycle)
3568         d.addCallback(lambda ign: self.render1(webstatus))
3569         def _check_html(html):
3570             s = remove_tags(html)
3571             self.failUnlessIn("Expiration Enabled:"
3572                               " expired leases will be removed", s)
3573             date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
3574             substr = "Leases created or last renewed before %s will be considered expired." % date
3575             self.failUnlessIn(substr, s)
3576             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3577         d.addCallback(_check_html)
3578         return d
3579
3580     def test_only_immutable(self):
3581         basedir = "storage/LeaseCrawler/only_immutable"
3582         fileutil.make_dirs(basedir)
3583         now = time.time()
3584         then = int(now - 2000)
3585         ss = StorageServer(basedir, "\x00" * 20,
3586                            expiration_enabled=True,
3587                            expiration_mode="cutoff-date",
3588                            expiration_cutoff_date=then,
3589                            expiration_sharetypes=("immutable",))
3590         lc = ss.lease_checker
3591         lc.slow_start = 0
3592         webstatus = StorageStatus(ss)
3593
3594         self.make_shares(ss)
3595         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3596         # set all leases to be expirable
3597         new_expiration_time = now - 3000 + 31*24*60*60
3598
3599         def count_shares(si):
3600             return len(list(ss._iter_share_files(si)))
3601         def _get_sharefile(si):
3602             return list(ss._iter_share_files(si))[0]
3603         def count_leases(si):
3604             return len(list(_get_sharefile(si).get_leases()))
3605
3606         sf0 = _get_sharefile(immutable_si_0)
3607         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3608         sf1 = _get_sharefile(immutable_si_1)
3609         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3610         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3611         sf2 = _get_sharefile(mutable_si_2)
3612         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3613         sf3 = _get_sharefile(mutable_si_3)
3614         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3615         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3616
3617         ss.setServiceParent(self.s)
3618         def _wait():
3619             return bool(lc.get_state()["last-cycle-finished"] is not None)
3620         d = self.poll(_wait)
3621
3622         def _after_first_cycle(ignored):
3623             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3624             self.failUnlessEqual(count_shares(immutable_si_1), 0)
3625             self.failUnlessEqual(count_shares(mutable_si_2), 1)
3626             self.failUnlessEqual(count_leases(mutable_si_2), 1)
3627             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3628             self.failUnlessEqual(count_leases(mutable_si_3), 2)
3629         d.addCallback(_after_first_cycle)
3630         d.addCallback(lambda ign: self.render1(webstatus))
3631         def _check_html(html):
3632             s = remove_tags(html)
3633             self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
3634         d.addCallback(_check_html)
3635         return d
3636
3637     def test_only_mutable(self):
3638         basedir = "storage/LeaseCrawler/only_mutable"
3639         fileutil.make_dirs(basedir)
3640         now = time.time()
3641         then = int(now - 2000)
3642         ss = StorageServer(basedir, "\x00" * 20,
3643                            expiration_enabled=True,
3644                            expiration_mode="cutoff-date",
3645                            expiration_cutoff_date=then,
3646                            expiration_sharetypes=("mutable",))
3647         lc = ss.lease_checker
3648         lc.slow_start = 0
3649         webstatus = StorageStatus(ss)
3650
3651         self.make_shares(ss)
3652         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3653         # set all leases to be expirable
3654         new_expiration_time = now - 3000 + 31*24*60*60
3655
3656         def count_shares(si):
3657             return len(list(ss._iter_share_files(si)))
3658         def _get_sharefile(si):
3659             return list(ss._iter_share_files(si))[0]
3660         def count_leases(si):
3661             return len(list(_get_sharefile(si).get_leases()))
3662
3663         sf0 = _get_sharefile(immutable_si_0)
3664         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3665         sf1 = _get_sharefile(immutable_si_1)
3666         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3667         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3668         sf2 = _get_sharefile(mutable_si_2)
3669         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3670         sf3 = _get_sharefile(mutable_si_3)
3671         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3672         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3673
3674         ss.setServiceParent(self.s)
3675         def _wait():
3676             return bool(lc.get_state()["last-cycle-finished"] is not None)
3677         d = self.poll(_wait)
3678
3679         def _after_first_cycle(ignored):
3680             self.failUnlessEqual(count_shares(immutable_si_0), 1)
3681             self.failUnlessEqual(count_leases(immutable_si_0), 1)
3682             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3683             self.failUnlessEqual(count_leases(immutable_si_1), 2)
3684             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3685             self.failUnlessEqual(count_shares(mutable_si_3), 0)
3686         d.addCallback(_after_first_cycle)
3687         d.addCallback(lambda ign: self.render1(webstatus))
3688         def _check_html(html):
3689             s = remove_tags(html)
3690             self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
3691         d.addCallback(_check_html)
3692         return d
3693
3694     def test_bad_mode(self):
3695         basedir = "storage/LeaseCrawler/bad_mode"
3696         fileutil.make_dirs(basedir)
3697         e = self.failUnlessRaises(ValueError,
3698                                   StorageServer, basedir, "\x00" * 20,
3699                                   expiration_mode="bogus")
3700         self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
3701
3702     def test_parse_duration(self):
3703         DAY = 24*60*60
3704         MONTH = 31*DAY
3705         YEAR = 365*DAY
3706         p = time_format.parse_duration
3707         self.failUnlessEqual(p("7days"), 7*DAY)
3708         self.failUnlessEqual(p("31day"), 31*DAY)
3709         self.failUnlessEqual(p("60 days"), 60*DAY)
3710         self.failUnlessEqual(p("2mo"), 2*MONTH)
3711         self.failUnlessEqual(p("3 month"), 3*MONTH)
3712         self.failUnlessEqual(p("2years"), 2*YEAR)
3713         e = self.failUnlessRaises(ValueError, p, "2kumquats")
3714         self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
3715
3716     def test_parse_date(self):
3717         p = time_format.parse_date
3718         self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
3719         self.failUnlessEqual(p("2009-03-18"), 1237334400)
3720
3721     def test_limited_history(self):
3722         basedir = "storage/LeaseCrawler/limited_history"
3723         fileutil.make_dirs(basedir)
3724         ss = StorageServer(basedir, "\x00" * 20)
3725         # make it start sooner than usual.
3726         lc = ss.lease_checker
3727         lc.slow_start = 0
3728         lc.cpu_slice = 500
3729
3730         # create a few shares, with some leases on them
3731         self.make_shares(ss)
3732
3733         ss.setServiceParent(self.s)
3734
3735         def _wait_until_15_cycles_done():
3736             last = lc.state["last-cycle-finished"]
3737             if last is not None and last >= 15:
3738                 return True
3739             if lc.timer:
3740                 lc.timer.reset(0)
3741             return False
3742         d = self.poll(_wait_until_15_cycles_done)
3743
3744         def _check(ignored):
3745             s = lc.get_state()
3746             h = s["history"]
3747             self.failUnlessEqual(len(h), 10)
3748             self.failUnlessEqual(max(h.keys()), 15)
3749             self.failUnlessEqual(min(h.keys()), 6)
3750         d.addCallback(_check)
3751         return d
3752
3753     def test_unpredictable_future(self):
3754         basedir = "storage/LeaseCrawler/unpredictable_future"
3755         fileutil.make_dirs(basedir)
3756         ss = StorageServer(basedir, "\x00" * 20)
3757         # make it start sooner than usual.
3758         lc = ss.lease_checker
3759         lc.slow_start = 0
3760         lc.cpu_slice = -1.0 # stop quickly
3761
3762         self.make_shares(ss)
3763
3764         ss.setServiceParent(self.s)
3765
3766         d = fireEventually()
3767         def _check(ignored):
3768             # this should fire after the first bucket is complete, but before
3769             # the first prefix is complete, so the progress-measurer won't
3770             # think we've gotten far enough to raise our percent-complete
3771             # above 0%, triggering the cannot-predict-the-future code in
3772             # expirer.py . This will have to change if/when the
3773             # progress-measurer gets smart enough to count buckets (we'll
3774             # have to interrupt it even earlier, before it's finished the
3775             # first bucket).
3776             s = lc.get_state()
3777             if "cycle-to-date" not in s:
3778                 d2 = fireEventually()
3779                 d2.addCallback(_check)
3780                 return d2
3781             self.failUnlessIn("cycle-to-date", s)
3782             self.failUnlessIn("estimated-remaining-cycle", s)
3783             self.failUnlessIn("estimated-current-cycle", s)
3784
3785             left = s["estimated-remaining-cycle"]["space-recovered"]
3786             self.failUnlessEqual(left["actual-buckets"], None)
3787             self.failUnlessEqual(left["original-buckets"], None)
3788             self.failUnlessEqual(left["configured-buckets"], None)
3789             self.failUnlessEqual(left["actual-shares"], None)
3790             self.failUnlessEqual(left["original-shares"], None)
3791             self.failUnlessEqual(left["configured-shares"], None)
3792             self.failUnlessEqual(left["actual-diskbytes"], None)
3793             self.failUnlessEqual(left["original-diskbytes"], None)
3794             self.failUnlessEqual(left["configured-diskbytes"], None)
3795             self.failUnlessEqual(left["actual-sharebytes"], None)
3796             self.failUnlessEqual(left["original-sharebytes"], None)
3797             self.failUnlessEqual(left["configured-sharebytes"], None)
3798
3799             full = s["estimated-remaining-cycle"]["space-recovered"]
3800             self.failUnlessEqual(full["actual-buckets"], None)
3801             self.failUnlessEqual(full["original-buckets"], None)
3802             self.failUnlessEqual(full["configured-buckets"], None)
3803             self.failUnlessEqual(full["actual-shares"], None)
3804             self.failUnlessEqual(full["original-shares"], None)
3805             self.failUnlessEqual(full["configured-shares"], None)
3806             self.failUnlessEqual(full["actual-diskbytes"], None)
3807             self.failUnlessEqual(full["original-diskbytes"], None)
3808             self.failUnlessEqual(full["configured-diskbytes"], None)
3809             self.failUnlessEqual(full["actual-sharebytes"], None)
3810             self.failUnlessEqual(full["original-sharebytes"], None)
3811             self.failUnlessEqual(full["configured-sharebytes"], None)
3812
3813         d.addCallback(_check)
3814         return d
3815
3816     def test_no_st_blocks(self):
3817         basedir = "storage/LeaseCrawler/no_st_blocks"
3818         fileutil.make_dirs(basedir)
3819         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
3820                                         expiration_mode="age",
3821                                         expiration_override_lease_duration=-1000)
3822         # a negative expiration_time= means the "configured-"
3823         # space-recovered counts will be non-zero, since all shares will have
3824         # expired by then
3825
3826         # make it start sooner than usual.
3827         lc = ss.lease_checker
3828         lc.slow_start = 0
3829
3830         self.make_shares(ss)
3831         ss.setServiceParent(self.s)
3832         def _wait():
3833             return bool(lc.get_state()["last-cycle-finished"] is not None)
3834         d = self.poll(_wait)
3835
3836         def _check(ignored):
3837             s = lc.get_state()
3838             last = s["history"][0]
3839             rec = last["space-recovered"]
3840             self.failUnlessEqual(rec["configured-buckets"], 4)
3841             self.failUnlessEqual(rec["configured-shares"], 4)
3842             self.failUnless(rec["configured-sharebytes"] > 0,
3843                             rec["configured-sharebytes"])
3844             # without the .st_blocks field in os.stat() results, we should be
3845             # reporting diskbytes==sharebytes
3846             self.failUnlessEqual(rec["configured-sharebytes"],
3847                                  rec["configured-diskbytes"])
3848         d.addCallback(_check)
3849         return d
3850
3851     def test_share_corruption(self):
3852         self._poll_should_ignore_these_errors = [
3853             UnknownMutableContainerVersionError,
3854             UnknownImmutableContainerVersionError,
3855             ]
3856         basedir = "storage/LeaseCrawler/share_corruption"
3857         fileutil.make_dirs(basedir)
3858         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3859         w = StorageStatus(ss)
3860         # make it start sooner than usual.
3861         lc = ss.lease_checker
3862         lc.stop_after_first_bucket = True
3863         lc.slow_start = 0
3864         lc.cpu_slice = 500
3865
3866         # create a few shares, with some leases on them
3867         self.make_shares(ss)
3868
3869         # now corrupt one, and make sure the lease-checker keeps going
3870         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3871         first = min(self.sis)
3872         first_b32 = base32.b2a(first)
3873         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
3874         f = open(fn, "rb+")
3875         f.seek(0)
3876         f.write("BAD MAGIC")
3877         f.close()
3878         # if get_share_file() doesn't see the correct mutable magic, it
3879         # assumes the file is an immutable share, and then
3880         # immutable.ShareFile sees a bad version. So regardless of which kind
3881         # of share we corrupted, this will trigger an
3882         # UnknownImmutableContainerVersionError.
3883
3884         # also create an empty bucket
3885         empty_si = base32.b2a("\x04"*16)
3886         empty_bucket_dir = os.path.join(ss.sharedir,
3887                                         storage_index_to_dir(empty_si))
3888         fileutil.make_dirs(empty_bucket_dir)
3889
3890         ss.setServiceParent(self.s)
3891
3892         d = fireEventually()
3893
3894         # now examine the state right after the first bucket has been
3895         # processed.
3896         def _after_first_bucket(ignored):
3897             s = lc.get_state()
3898             if "cycle-to-date" not in s:
3899                 d2 = fireEventually()
3900                 d2.addCallback(_after_first_bucket)
3901                 return d2
3902             so_far = s["cycle-to-date"]
3903             rec = so_far["space-recovered"]
3904             self.failUnlessEqual(rec["examined-buckets"], 1)
3905             self.failUnlessEqual(rec["examined-shares"], 0)
3906             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
3907         d.addCallback(_after_first_bucket)
3908
3909         d.addCallback(lambda ign: self.render_json(w))
3910         def _check_json(json):
3911             data = simplejson.loads(json)
3912             # grr. json turns all dict keys into strings.
3913             so_far = data["lease-checker"]["cycle-to-date"]
3914             corrupt_shares = so_far["corrupt-shares"]
3915             # it also turns all tuples into lists
3916             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3917         d.addCallback(_check_json)
3918         d.addCallback(lambda ign: self.render1(w))
3919         def _check_html(html):
3920             s = remove_tags(html)
3921             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3922         d.addCallback(_check_html)
3923
3924         def _wait():
3925             return bool(lc.get_state()["last-cycle-finished"] is not None)
3926         d.addCallback(lambda ign: self.poll(_wait))
3927
3928         def _after_first_cycle(ignored):
3929             s = lc.get_state()
3930             last = s["history"][0]
3931             rec = last["space-recovered"]
3932             self.failUnlessEqual(rec["examined-buckets"], 5)
3933             self.failUnlessEqual(rec["examined-shares"], 3)
3934             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
3935         d.addCallback(_after_first_cycle)
3936         d.addCallback(lambda ign: self.render_json(w))
3937         def _check_json_history(json):
3938             data = simplejson.loads(json)
3939             last = data["lease-checker"]["history"]["0"]
3940             corrupt_shares = last["corrupt-shares"]
3941             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3942         d.addCallback(_check_json_history)
3943         d.addCallback(lambda ign: self.render1(w))
3944         def _check_html_history(html):
3945             s = remove_tags(html)
3946             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3947         d.addCallback(_check_html_history)
3948
3949         def _cleanup(res):
3950             self.flushLoggedErrors(UnknownMutableContainerVersionError,
3951                                    UnknownImmutableContainerVersionError)
3952             return res
3953         d.addBoth(_cleanup)
3954         return d
3955
3956     def render_json(self, page):
3957         d = self.render1(page, args={"t": ["json"]})
3958         return d
3959
3960 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3961
3962     def setUp(self):
3963         self.s = service.MultiService()
3964         self.s.startService()
3965     def tearDown(self):
3966         return self.s.stopService()
3967
3968     def test_no_server(self):
3969         w = StorageStatus(None)
3970         html = w.renderSynchronously()
3971         self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
3972
3973     def test_status(self):
3974         basedir = "storage/WebStatus/status"
3975         fileutil.make_dirs(basedir)
3976         nodeid = "\x00" * 20
3977         ss = StorageServer(basedir, nodeid)
3978         ss.setServiceParent(self.s)
3979         w = StorageStatus(ss, "nickname")
3980         d = self.render1(w)
3981         def _check_html(html):
3982             self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3983             s = remove_tags(html)
3984             self.failUnlessIn("Server Nickname: nickname", s)
3985             self.failUnlessIn("Server Nodeid: %s"  % base32.b2a(nodeid), s)
3986             self.failUnlessIn("Accepting new shares: Yes", s)
3987             self.failUnlessIn("Reserved space: - 0 B (0)", s)
3988         d.addCallback(_check_html)
3989         d.addCallback(lambda ign: self.render_json(w))
3990         def _check_json(json):
3991             data = simplejson.loads(json)
3992             s = data["stats"]
3993             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
3994             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
3995             self.failUnlessIn("bucket-counter", data)
3996             self.failUnlessIn("lease-checker", data)
3997         d.addCallback(_check_json)
3998         return d
3999
4000     def render_json(self, page):
4001         d = self.render1(page, args={"t": ["json"]})
4002         return d
4003
4004     @mock.patch('allmydata.util.fileutil.get_disk_stats')
4005     def test_status_no_disk_stats(self, mock_get_disk_stats):
4006         mock_get_disk_stats.side_effect = AttributeError()
4007
4008         # Some platforms may have no disk stats API. Make sure the code can handle that
4009         # (test runs on all platforms).
4010         basedir = "storage/WebStatus/status_no_disk_stats"
4011         fileutil.make_dirs(basedir)
4012         ss = StorageServer(basedir, "\x00" * 20)
4013         ss.setServiceParent(self.s)
4014         w = StorageStatus(ss)
4015         html = w.renderSynchronously()
4016         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4017         s = remove_tags(html)
4018         self.failUnlessIn("Accepting new shares: Yes", s)
4019         self.failUnlessIn("Total disk space: ?", s)
4020         self.failUnlessIn("Space Available to Tahoe: ?", s)
4021         self.failUnless(ss.get_available_space() is None)
4022
4023     @mock.patch('allmydata.util.fileutil.get_disk_stats')
4024     def test_status_bad_disk_stats(self, mock_get_disk_stats):
4025         mock_get_disk_stats.side_effect = OSError()
4026
4027         # If the API to get disk stats exists but a call to it fails, then the status should
4028         # show that no shares will be accepted, and get_available_space() should be 0.
4029         basedir = "storage/WebStatus/status_bad_disk_stats"
4030         fileutil.make_dirs(basedir)
4031         ss = StorageServer(basedir, "\x00" * 20)
4032         ss.setServiceParent(self.s)
4033         w = StorageStatus(ss)
4034         html = w.renderSynchronously()
4035         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4036         s = remove_tags(html)
4037         self.failUnlessIn("Accepting new shares: No", s)
4038         self.failUnlessIn("Total disk space: ?", s)
4039         self.failUnlessIn("Space Available to Tahoe: ?", s)
4040         self.failUnlessEqual(ss.get_available_space(), 0)
4041
4042     @mock.patch('allmydata.util.fileutil.get_disk_stats')
4043     def test_status_right_disk_stats(self, mock_get_disk_stats):
4044         GB = 1000000000
4045         total            = 5*GB
4046         free_for_root    = 4*GB
4047         free_for_nonroot = 3*GB
4048         reserved_space   = 1*GB
4049         used = total - free_for_root
4050         avail = max(free_for_nonroot - reserved_space, 0)
4051         mock_get_disk_stats.return_value = {
4052             'total': total,
4053             'free_for_root': free_for_root,
4054             'free_for_nonroot': free_for_nonroot,
4055             'used': used,
4056             'avail': avail,
4057         }
4058
4059         basedir = "storage/WebStatus/status_right_disk_stats"
4060         fileutil.make_dirs(basedir)
4061         ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved_space)
4062         expecteddir = ss.sharedir
4063         ss.setServiceParent(self.s)
4064         w = StorageStatus(ss)
4065         html = w.renderSynchronously()
4066
4067         self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
4068                     mock_get_disk_stats.call_args_list)
4069
4070         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4071         s = remove_tags(html)
4072         self.failUnlessIn("Total disk space: 5.00 GB", s)
4073         self.failUnlessIn("Disk space used: - 1.00 GB", s)
4074         self.failUnlessIn("Disk space free (root): 4.00 GB", s)
4075         self.failUnlessIn("Disk space free (non-root): 3.00 GB", s)
4076         self.failUnlessIn("Reserved space: - 1.00 GB", s)
4077         self.failUnlessIn("Space Available to Tahoe: 2.00 GB", s)
4078         self.failUnlessEqual(ss.get_available_space(), 2*GB)
4079
4080     def test_readonly(self):
4081         basedir = "storage/WebStatus/readonly"
4082         fileutil.make_dirs(basedir)
4083         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
4084         ss.setServiceParent(self.s)
4085         w = StorageStatus(ss)
4086         html = w.renderSynchronously()
4087         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4088         s = remove_tags(html)
4089         self.failUnlessIn("Accepting new shares: No", s)
4090
4091     def test_reserved(self):
4092         basedir = "storage/WebStatus/reserved"
4093         fileutil.make_dirs(basedir)
4094         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4095         ss.setServiceParent(self.s)
4096         w = StorageStatus(ss)
4097         html = w.renderSynchronously()
4098         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4099         s = remove_tags(html)
4100         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4101
4102     def test_huge_reserved(self):
4103         basedir = "storage/WebStatus/reserved"
4104         fileutil.make_dirs(basedir)
4105         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4106         ss.setServiceParent(self.s)
4107         w = StorageStatus(ss)
4108         html = w.renderSynchronously()
4109         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4110         s = remove_tags(html)
4111         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4112
4113     def test_util(self):
4114         w = StorageStatus(None)
4115         self.failUnlessEqual(w.render_space(None, None), "?")
4116         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
4117         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
4118         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
4119         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
4120         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)