]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
Eliminate mock dependency.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 import time, os.path, platform, stat, re, simplejson, struct, shutil
3
4 from twisted.trial import unittest
5
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap.api import fireEventually
9 import itertools
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
12 from allmydata.storage.server import StorageServer
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
16      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
17 from allmydata.storage.lease import LeaseInfo
18 from allmydata.storage.crawler import BucketCountingCrawler
19 from allmydata.storage.expirer import LeaseCheckingCrawler
20 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
21      ReadBucketProxy
22 from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
23                                      LayoutInvalid, MDMFSIGNABLEHEADER, \
24                                      SIGNED_PREFIX, MDMFHEADER, \
25                                      MDMFOFFSETS, SDMFSlotWriteProxy, \
26                                      PRIVATE_KEY_SIZE, \
27                                      SIGNATURE_SIZE, \
28                                      VERIFICATION_KEY_SIZE, \
29                                      SHARE_HASH_CHAIN_SIZE
30 from allmydata.interfaces import BadWriteEnablerError
31 from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
32 from allmydata.test.common_web import WebRenderingMixin
33 from allmydata.test.no_network import NoNetworkServer
34 from allmydata.web.storage import StorageStatus, remove_prefix
35
36 class Marker:
37     pass
38 class FakeCanary:
39     def __init__(self, ignore_disconnectors=False):
40         self.ignore = ignore_disconnectors
41         self.disconnectors = {}
42     def notifyOnDisconnect(self, f, *args, **kwargs):
43         if self.ignore:
44             return
45         m = Marker()
46         self.disconnectors[m] = (f, args, kwargs)
47         return m
48     def dontNotifyOnDisconnect(self, marker):
49         if self.ignore:
50             return
51         del self.disconnectors[marker]
52
53 class FakeStatsProvider:
54     def count(self, name, delta=1):
55         pass
56     def register_producer(self, producer):
57         pass
58
59 class Bucket(unittest.TestCase):
60     def make_workdir(self, name):
61         basedir = os.path.join("storage", "Bucket", name)
62         incoming = os.path.join(basedir, "tmp", "bucket")
63         final = os.path.join(basedir, "bucket")
64         fileutil.make_dirs(basedir)
65         fileutil.make_dirs(os.path.join(basedir, "tmp"))
66         return incoming, final
67
68     def bucket_writer_closed(self, bw, consumed):
69         pass
70     def add_latency(self, category, latency):
71         pass
72     def count(self, name, delta=1):
73         pass
74
75     def make_lease(self):
76         owner_num = 0
77         renew_secret = os.urandom(32)
78         cancel_secret = os.urandom(32)
79         expiration_time = time.time() + 5000
80         return LeaseInfo(owner_num, renew_secret, cancel_secret,
81                          expiration_time, "\x00" * 20)
82
83     def test_create(self):
84         incoming, final = self.make_workdir("test_create")
85         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
86                           FakeCanary())
87         bw.remote_write(0, "a"*25)
88         bw.remote_write(25, "b"*25)
89         bw.remote_write(50, "c"*25)
90         bw.remote_write(75, "d"*7)
91         bw.remote_close()
92
93     def test_readwrite(self):
94         incoming, final = self.make_workdir("test_readwrite")
95         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
96                           FakeCanary())
97         bw.remote_write(0, "a"*25)
98         bw.remote_write(25, "b"*25)
99         bw.remote_write(50, "c"*7) # last block may be short
100         bw.remote_close()
101
102         # now read from it
103         br = BucketReader(self, bw.finalhome)
104         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
105         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
106         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
107
108     def test_read_past_end_of_share_data(self):
109         # test vector for immutable files (hard-coded contents of an immutable share
110         # file):
111
112         # The following immutable share file content is identical to that
113         # generated with storage.immutable.ShareFile from Tahoe-LAFS v1.8.2
114         # with share data == 'a'. The total size of this content is 85
115         # bytes.
116
117         containerdata = struct.pack('>LLL', 1, 1, 1)
118
119         # A Tahoe-LAFS storage client would send as the share_data a
120         # complicated string involving hash trees and a URI Extension Block
121         # -- see allmydata/immutable/layout.py . This test, which is
122         # simulating a client, just sends 'a'.
123         share_data = 'a'
124
125         ownernumber = struct.pack('>L', 0)
126         renewsecret  = 'THIS LETS ME RENEW YOUR FILE....'
127         assert len(renewsecret) == 32
128         cancelsecret = 'THIS LETS ME KILL YOUR FILE HAHA'
129         assert len(cancelsecret) == 32
130         expirationtime = struct.pack('>L', 60*60*24*31) # 31 days in seconds
131
132         lease_data = ownernumber + renewsecret + cancelsecret + expirationtime
133
134         share_file_data = containerdata + share_data + lease_data
135
136         incoming, final = self.make_workdir("test_read_past_end_of_share_data")
137
138         fileutil.write(final, share_file_data)
139
140         class MockStorageServer(object):
141             def add_latency(self, category, latency):
142                 pass
143             def count(self, name, delta=1):
144                 pass
145
146         mockstorageserver = MockStorageServer()
147
148         # Now read from it.
149         br = BucketReader(mockstorageserver, final)
150
151         self.failUnlessEqual(br.remote_read(0, len(share_data)), share_data)
152
153         # Read past the end of share data to get the cancel secret.
154         read_length = len(share_data) + len(ownernumber) + len(renewsecret) + len(cancelsecret)
155
156         result_of_read = br.remote_read(0, read_length)
157         self.failUnlessEqual(result_of_read, share_data)
158
159         result_of_read = br.remote_read(0, len(share_data)+1)
160         self.failUnlessEqual(result_of_read, share_data)
161
162 class RemoteBucket:
163
164     def __init__(self):
165         self.read_count = 0
166         self.write_count = 0
167
168     def callRemote(self, methname, *args, **kwargs):
169         def _call():
170             meth = getattr(self.target, "remote_" + methname)
171             return meth(*args, **kwargs)
172
173         if methname == "slot_readv":
174             self.read_count += 1
175         if "writev" in methname:
176             self.write_count += 1
177
178         return defer.maybeDeferred(_call)
179
180
181 class BucketProxy(unittest.TestCase):
182     def make_bucket(self, name, size):
183         basedir = os.path.join("storage", "BucketProxy", name)
184         incoming = os.path.join(basedir, "tmp", "bucket")
185         final = os.path.join(basedir, "bucket")
186         fileutil.make_dirs(basedir)
187         fileutil.make_dirs(os.path.join(basedir, "tmp"))
188         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
189                           FakeCanary())
190         rb = RemoteBucket()
191         rb.target = bw
192         return bw, rb, final
193
194     def make_lease(self):
195         owner_num = 0
196         renew_secret = os.urandom(32)
197         cancel_secret = os.urandom(32)
198         expiration_time = time.time() + 5000
199         return LeaseInfo(owner_num, renew_secret, cancel_secret,
200                          expiration_time, "\x00" * 20)
201
202     def bucket_writer_closed(self, bw, consumed):
203         pass
204     def add_latency(self, category, latency):
205         pass
206     def count(self, name, delta=1):
207         pass
208
209     def test_create(self):
210         bw, rb, sharefname = self.make_bucket("test_create", 500)
211         bp = WriteBucketProxy(rb, None,
212                               data_size=300,
213                               block_size=10,
214                               num_segments=5,
215                               num_share_hashes=3,
216                               uri_extension_size_max=500)
217         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
218
219     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
220         # Let's pretend each share has 100 bytes of data, and that there are
221         # 4 segments (25 bytes each), and 8 shares total. So the two
222         # per-segment merkle trees (crypttext_hash_tree,
223         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
224         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
225         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
226         # long. That should make the whole share:
227         #
228         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
229         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
230
231         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
232
233         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
234                             for i in range(7)]
235         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
236                         for i in range(7)]
237         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
238                         for i in (1,9,13)]
239         uri_extension = "s" + "E"*498 + "e"
240
241         bw, rb, sharefname = self.make_bucket(name, sharesize)
242         bp = wbp_class(rb, None,
243                        data_size=95,
244                        block_size=25,
245                        num_segments=4,
246                        num_share_hashes=3,
247                        uri_extension_size_max=len(uri_extension))
248
249         d = bp.put_header()
250         d.addCallback(lambda res: bp.put_block(0, "a"*25))
251         d.addCallback(lambda res: bp.put_block(1, "b"*25))
252         d.addCallback(lambda res: bp.put_block(2, "c"*25))
253         d.addCallback(lambda res: bp.put_block(3, "d"*20))
254         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
255         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
256         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
257         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
258         d.addCallback(lambda res: bp.close())
259
260         # now read everything back
261         def _start_reading(res):
262             br = BucketReader(self, sharefname)
263             rb = RemoteBucket()
264             rb.target = br
265             server = NoNetworkServer("abc", None)
266             rbp = rbp_class(rb, server, storage_index="")
267             self.failUnlessIn("to peer", repr(rbp))
268             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
269
270             d1 = rbp.get_block_data(0, 25, 25)
271             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
272             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
273             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
274             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
275             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
276             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
277             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
278
279             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
280             d1.addCallback(lambda res:
281                            self.failUnlessEqual(res, crypttext_hashes))
282             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
283             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
284             d1.addCallback(lambda res: rbp.get_share_hashes())
285             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
286             d1.addCallback(lambda res: rbp.get_uri_extension())
287             d1.addCallback(lambda res:
288                            self.failUnlessEqual(res, uri_extension))
289
290             return d1
291
292         d.addCallback(_start_reading)
293
294         return d
295
296     def test_readwrite_v1(self):
297         return self._do_test_readwrite("test_readwrite_v1",
298                                        0x24, WriteBucketProxy, ReadBucketProxy)
299
300     def test_readwrite_v2(self):
301         return self._do_test_readwrite("test_readwrite_v2",
302                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
303
304 class Server(unittest.TestCase):
305
306     def setUp(self):
307         self.sparent = LoggingServiceParent()
308         self.sparent.startService()
309         self._lease_secret = itertools.count()
310     def tearDown(self):
311         return self.sparent.stopService()
312
313     def workdir(self, name):
314         basedir = os.path.join("storage", "Server", name)
315         return basedir
316
317     def create(self, name, reserved_space=0, klass=StorageServer):
318         workdir = self.workdir(name)
319         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
320                    stats_provider=FakeStatsProvider())
321         ss.setServiceParent(self.sparent)
322         return ss
323
324     def test_create(self):
325         self.create("test_create")
326
327     def test_declares_fixed_1528(self):
328         ss = self.create("test_declares_fixed_1528")
329         ver = ss.remote_get_version()
330         sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
331         self.failUnless(sv1.get('prevents-read-past-end-of-share-data'), sv1)
332
333     def test_declares_maximum_share_sizes(self):
334         ss = self.create("test_declares_maximum_share_sizes")
335         ver = ss.remote_get_version()
336         sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
337         self.failUnlessIn('maximum-immutable-share-size', sv1)
338         self.failUnlessIn('maximum-mutable-share-size', sv1)
339
340     def test_declares_available_space(self):
341         ss = self.create("test_declares_available_space")
342         ver = ss.remote_get_version()
343         sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
344         self.failUnlessIn('available-space', sv1)
345
346     def allocate(self, ss, storage_index, sharenums, size, canary=None):
347         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
348         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
349         if not canary:
350             canary = FakeCanary()
351         return ss.remote_allocate_buckets(storage_index,
352                                           renew_secret, cancel_secret,
353                                           sharenums, size, canary)
354
355     def test_large_share(self):
356         syslow = platform.system().lower()
357         if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
358             raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
359
360         avail = fileutil.get_available_space('.', 512*2**20)
361         if avail <= 4*2**30:
362             raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
363
364         ss = self.create("test_large_share")
365
366         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
367         self.failUnlessEqual(already, set())
368         self.failUnlessEqual(set(writers.keys()), set([0]))
369
370         shnum, bucket = writers.items()[0]
371         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
372         bucket.remote_write(2**32, "ab")
373         bucket.remote_close()
374
375         readers = ss.remote_get_buckets("allocate")
376         reader = readers[shnum]
377         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
378
379     def test_dont_overfill_dirs(self):
380         """
381         This test asserts that if you add a second share whose storage index
382         share lots of leading bits with an extant share (but isn't the exact
383         same storage index), this won't add an entry to the share directory.
384         """
385         ss = self.create("test_dont_overfill_dirs")
386         already, writers = self.allocate(ss, "storageindex", [0], 10)
387         for i, wb in writers.items():
388             wb.remote_write(0, "%10d" % i)
389             wb.remote_close()
390         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
391                                 "shares")
392         children_of_storedir = set(os.listdir(storedir))
393
394         # Now store another one under another storageindex that has leading
395         # chars the same as the first storageindex.
396         already, writers = self.allocate(ss, "storageindey", [0], 10)
397         for i, wb in writers.items():
398             wb.remote_write(0, "%10d" % i)
399             wb.remote_close()
400         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
401                                 "shares")
402         new_children_of_storedir = set(os.listdir(storedir))
403         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
404
405     def test_remove_incoming(self):
406         ss = self.create("test_remove_incoming")
407         already, writers = self.allocate(ss, "vid", range(3), 10)
408         for i,wb in writers.items():
409             wb.remote_write(0, "%10d" % i)
410             wb.remote_close()
411         incoming_share_dir = wb.incominghome
412         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
413         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
414         incoming_dir = os.path.dirname(incoming_prefix_dir)
415         self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
416         self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
417         self.failUnless(os.path.exists(incoming_dir), incoming_dir)
418
419     def test_abort(self):
420         # remote_abort, when called on a writer, should make sure that
421         # the allocated size of the bucket is not counted by the storage
422         # server when accounting for space.
423         ss = self.create("test_abort")
424         already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
425         self.failIfEqual(ss.allocated_size(), 0)
426
427         # Now abort the writers.
428         for writer in writers.itervalues():
429             writer.remote_abort()
430         self.failUnlessEqual(ss.allocated_size(), 0)
431
432
433     def test_allocate(self):
434         ss = self.create("test_allocate")
435
436         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
437
438         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
439         self.failUnlessEqual(already, set())
440         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
441
442         # while the buckets are open, they should not count as readable
443         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
444
445         # close the buckets
446         for i,wb in writers.items():
447             wb.remote_write(0, "%25d" % i)
448             wb.remote_close()
449             # aborting a bucket that was already closed is a no-op
450             wb.remote_abort()
451
452         # now they should be readable
453         b = ss.remote_get_buckets("allocate")
454         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
455         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
456         b_str = str(b[0])
457         self.failUnlessIn("BucketReader", b_str)
458         self.failUnlessIn("mfwgy33dmf2g 0", b_str)
459
460         # now if we ask about writing again, the server should offer those
461         # three buckets as already present. It should offer them even if we
462         # don't ask about those specific ones.
463         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
464         self.failUnlessEqual(already, set([0,1,2]))
465         self.failUnlessEqual(set(writers.keys()), set([3,4]))
466
467         # while those two buckets are open for writing, the server should
468         # refuse to offer them to uploaders
469
470         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
471         self.failUnlessEqual(already2, set([0,1,2]))
472         self.failUnlessEqual(set(writers2.keys()), set([5]))
473
474         # aborting the writes should remove the tempfiles
475         for i,wb in writers2.items():
476             wb.remote_abort()
477         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
478         self.failUnlessEqual(already2, set([0,1,2]))
479         self.failUnlessEqual(set(writers2.keys()), set([5]))
480
481         for i,wb in writers2.items():
482             wb.remote_abort()
483         for i,wb in writers.items():
484             wb.remote_abort()
485
486     def test_bad_container_version(self):
487         ss = self.create("test_bad_container_version")
488         a,w = self.allocate(ss, "si1", [0], 10)
489         w[0].remote_write(0, "\xff"*10)
490         w[0].remote_close()
491
492         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
493         f = open(fn, "rb+")
494         f.seek(0)
495         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
496         f.close()
497
498         ss.remote_get_buckets("allocate")
499
500         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
501                                   ss.remote_get_buckets, "si1")
502         self.failUnlessIn(" had version 0 but we wanted 1", str(e))
503
504     def test_disconnect(self):
505         # simulate a disconnection
506         ss = self.create("test_disconnect")
507         canary = FakeCanary()
508         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
509         self.failUnlessEqual(already, set())
510         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
511         for (f,args,kwargs) in canary.disconnectors.values():
512             f(*args, **kwargs)
513         del already
514         del writers
515
516         # that ought to delete the incoming shares
517         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
518         self.failUnlessEqual(already, set())
519         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
520
521     def test_reserved_space(self):
522         reserved = 10000
523         allocated = 0
524
525         def call_get_disk_stats(whichdir, reserved_space=0):
526             self.failUnlessEqual(reserved_space, reserved)
527             return {
528               'free_for_nonroot': 15000 - allocated,
529               'avail': max(15000 - allocated - reserved_space, 0),
530             }
531         self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
532
533         ss = self.create("test_reserved_space", reserved_space=reserved)
534         # 15k available, 10k reserved, leaves 5k for shares
535
536         # a newly created and filled share incurs this much overhead, beyond
537         # the size we request.
538         OVERHEAD = 3*4
539         LEASE_SIZE = 4+32+32+4
540         canary = FakeCanary(True)
541         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
542         self.failUnlessEqual(len(writers), 3)
543         # now the StorageServer should have 3000 bytes provisionally
544         # allocated, allowing only 2000 more to be claimed
545         self.failUnlessEqual(len(ss._active_writers), 3)
546
547         # allocating 1001-byte shares only leaves room for one
548         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
549         self.failUnlessEqual(len(writers2), 1)
550         self.failUnlessEqual(len(ss._active_writers), 4)
551
552         # we abandon the first set, so their provisional allocation should be
553         # returned
554         del already
555         del writers
556         self.failUnlessEqual(len(ss._active_writers), 1)
557         # now we have a provisional allocation of 1001 bytes
558
559         # and we close the second set, so their provisional allocation should
560         # become real, long-term allocation, and grows to include the
561         # overhead.
562         for bw in writers2.values():
563             bw.remote_write(0, "a"*25)
564             bw.remote_close()
565         del already2
566         del writers2
567         del bw
568         self.failUnlessEqual(len(ss._active_writers), 0)
569
570         # this also changes the amount reported as available by call_get_disk_stats
571         allocated = 1001 + OVERHEAD + LEASE_SIZE
572
573         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
574         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
575         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
576         self.failUnlessEqual(len(writers3), 39)
577         self.failUnlessEqual(len(ss._active_writers), 39)
578
579         del already3
580         del writers3
581         self.failUnlessEqual(len(ss._active_writers), 0)
582         ss.disownServiceParent()
583         del ss
584
585     def test_seek(self):
586         basedir = self.workdir("test_seek_behavior")
587         fileutil.make_dirs(basedir)
588         filename = os.path.join(basedir, "testfile")
589         f = open(filename, "wb")
590         f.write("start")
591         f.close()
592         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
593         # files. mode="a" preserves previous contents but does not allow
594         # seeking-to-create-holes. mode="r+" allows both.
595         f = open(filename, "rb+")
596         f.seek(100)
597         f.write("100")
598         f.close()
599         filelen = os.stat(filename)[stat.ST_SIZE]
600         self.failUnlessEqual(filelen, 100+3)
601         f2 = open(filename, "rb")
602         self.failUnlessEqual(f2.read(5), "start")
603
604
605     def test_leases(self):
606         ss = self.create("test_leases")
607         canary = FakeCanary()
608         sharenums = range(5)
609         size = 100
610
611         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
612                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
613         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
614                                                      sharenums, size, canary)
615         self.failUnlessEqual(len(already), 0)
616         self.failUnlessEqual(len(writers), 5)
617         for wb in writers.values():
618             wb.remote_close()
619
620         leases = list(ss.get_leases("si0"))
621         self.failUnlessEqual(len(leases), 1)
622         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
623
624         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
625                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
626         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
627                                                      sharenums, size, canary)
628         for wb in writers.values():
629             wb.remote_close()
630
631         # take out a second lease on si1
632         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
633                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
634         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
635                                                      sharenums, size, canary)
636         self.failUnlessEqual(len(already), 5)
637         self.failUnlessEqual(len(writers), 0)
638
639         leases = list(ss.get_leases("si1"))
640         self.failUnlessEqual(len(leases), 2)
641         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
642
643         # and a third lease, using add-lease
644         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
645                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
646         ss.remote_add_lease("si1", rs2a, cs2a)
647         leases = list(ss.get_leases("si1"))
648         self.failUnlessEqual(len(leases), 3)
649         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
650
651         # add-lease on a missing storage index is silently ignored
652         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
653
654         # check that si0 is readable
655         readers = ss.remote_get_buckets("si0")
656         self.failUnlessEqual(len(readers), 5)
657
658         # renew the first lease. Only the proper renew_secret should work
659         ss.remote_renew_lease("si0", rs0)
660         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
661         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
662
663         # check that si0 is still readable
664         readers = ss.remote_get_buckets("si0")
665         self.failUnlessEqual(len(readers), 5)
666
667         # There is no such method as remote_cancel_lease for now -- see
668         # ticket #1528.
669         self.failIf(hasattr(ss, 'remote_cancel_lease'), \
670                         "ss should not have a 'remote_cancel_lease' method/attribute")
671
672         # test overlapping uploads
673         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
674                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
675         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
676                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
677         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
678                                                      sharenums, size, canary)
679         self.failUnlessEqual(len(already), 0)
680         self.failUnlessEqual(len(writers), 5)
681         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
682                                                        sharenums, size, canary)
683         self.failUnlessEqual(len(already2), 0)
684         self.failUnlessEqual(len(writers2), 0)
685         for wb in writers.values():
686             wb.remote_close()
687
688         leases = list(ss.get_leases("si3"))
689         self.failUnlessEqual(len(leases), 1)
690
691         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
692                                                        sharenums, size, canary)
693         self.failUnlessEqual(len(already3), 5)
694         self.failUnlessEqual(len(writers3), 0)
695
696         leases = list(ss.get_leases("si3"))
697         self.failUnlessEqual(len(leases), 2)
698
699     def test_readonly(self):
700         workdir = self.workdir("test_readonly")
701         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
702         ss.setServiceParent(self.sparent)
703
704         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
705         self.failUnlessEqual(already, set())
706         self.failUnlessEqual(writers, {})
707
708         stats = ss.get_stats()
709         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
710         if "storage_server.disk_avail" in stats:
711             # Some platforms may not have an API to get disk stats.
712             # But if there are stats, readonly_storage means disk_avail=0
713             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
714
715     def test_discard(self):
716         # discard is really only used for other tests, but we test it anyways
717         workdir = self.workdir("test_discard")
718         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
719         ss.setServiceParent(self.sparent)
720
721         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
722         self.failUnlessEqual(already, set())
723         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
724         for i,wb in writers.items():
725             wb.remote_write(0, "%25d" % i)
726             wb.remote_close()
727         # since we discard the data, the shares should be present but sparse.
728         # Since we write with some seeks, the data we read back will be all
729         # zeros.
730         b = ss.remote_get_buckets("vid")
731         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
732         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
733
734     def test_advise_corruption(self):
735         workdir = self.workdir("test_advise_corruption")
736         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
737         ss.setServiceParent(self.sparent)
738
739         si0_s = base32.b2a("si0")
740         ss.remote_advise_corrupt_share("immutable", "si0", 0,
741                                        "This share smells funny.\n")
742         reportdir = os.path.join(workdir, "corruption-advisories")
743         reports = os.listdir(reportdir)
744         self.failUnlessEqual(len(reports), 1)
745         report_si0 = reports[0]
746         self.failUnlessIn(si0_s, report_si0)
747         f = open(os.path.join(reportdir, report_si0), "r")
748         report = f.read()
749         f.close()
750         self.failUnlessIn("type: immutable", report)
751         self.failUnlessIn("storage_index: %s" % si0_s, report)
752         self.failUnlessIn("share_number: 0", report)
753         self.failUnlessIn("This share smells funny.", report)
754
755         # test the RIBucketWriter version too
756         si1_s = base32.b2a("si1")
757         already,writers = self.allocate(ss, "si1", [1], 75)
758         self.failUnlessEqual(already, set())
759         self.failUnlessEqual(set(writers.keys()), set([1]))
760         writers[1].remote_write(0, "data")
761         writers[1].remote_close()
762
763         b = ss.remote_get_buckets("si1")
764         self.failUnlessEqual(set(b.keys()), set([1]))
765         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
766
767         reports = os.listdir(reportdir)
768         self.failUnlessEqual(len(reports), 2)
769         report_si1 = [r for r in reports if si1_s in r][0]
770         f = open(os.path.join(reportdir, report_si1), "r")
771         report = f.read()
772         f.close()
773         self.failUnlessIn("type: immutable", report)
774         self.failUnlessIn("storage_index: %s" % si1_s, report)
775         self.failUnlessIn("share_number: 1", report)
776         self.failUnlessIn("This share tastes like dust.", report)
777
778
779
780 class MutableServer(unittest.TestCase):
781
782     def setUp(self):
783         self.sparent = LoggingServiceParent()
784         self._lease_secret = itertools.count()
785     def tearDown(self):
786         return self.sparent.stopService()
787
788     def workdir(self, name):
789         basedir = os.path.join("storage", "MutableServer", name)
790         return basedir
791
792     def create(self, name):
793         workdir = self.workdir(name)
794         ss = StorageServer(workdir, "\x00" * 20)
795         ss.setServiceParent(self.sparent)
796         return ss
797
798     def test_create(self):
799         self.create("test_create")
800
801     def write_enabler(self, we_tag):
802         return hashutil.tagged_hash("we_blah", we_tag)
803
804     def renew_secret(self, tag):
805         return hashutil.tagged_hash("renew_blah", str(tag))
806
807     def cancel_secret(self, tag):
808         return hashutil.tagged_hash("cancel_blah", str(tag))
809
810     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
811         write_enabler = self.write_enabler(we_tag)
812         renew_secret = self.renew_secret(lease_tag)
813         cancel_secret = self.cancel_secret(lease_tag)
814         rstaraw = ss.remote_slot_testv_and_readv_and_writev
815         testandwritev = dict( [ (shnum, ([], [], None) )
816                          for shnum in sharenums ] )
817         readv = []
818         rc = rstaraw(storage_index,
819                      (write_enabler, renew_secret, cancel_secret),
820                      testandwritev,
821                      readv)
822         (did_write, readv_data) = rc
823         self.failUnless(did_write)
824         self.failUnless(isinstance(readv_data, dict))
825         self.failUnlessEqual(len(readv_data), 0)
826
827     def test_bad_magic(self):
828         ss = self.create("test_bad_magic")
829         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
830         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
831         f = open(fn, "rb+")
832         f.seek(0)
833         f.write("BAD MAGIC")
834         f.close()
835         read = ss.remote_slot_readv
836         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
837                                   read, "si1", [0], [(0,10)])
838         self.failUnlessIn(" had magic ", str(e))
839         self.failUnlessIn(" but we wanted ", str(e))
840
841     def test_container_size(self):
842         ss = self.create("test_container_size")
843         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
844                       set([0,1,2]), 100)
845         read = ss.remote_slot_readv
846         rstaraw = ss.remote_slot_testv_and_readv_and_writev
847         secrets = ( self.write_enabler("we1"),
848                     self.renew_secret("we1"),
849                     self.cancel_secret("we1") )
850         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
851         answer = rstaraw("si1", secrets,
852                          {0: ([], [(0,data)], len(data)+12)},
853                          [])
854         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
855
856         # Trying to make the container too large (by sending a write vector
857         # whose offset is too high) will raise an exception.
858         TOOBIG = MutableShareFile.MAX_SIZE + 10
859         self.failUnlessRaises(DataTooLargeError,
860                               rstaraw, "si1", secrets,
861                               {0: ([], [(TOOBIG,data)], None)},
862                               [])
863
864         answer = rstaraw("si1", secrets,
865                          {0: ([], [(0,data)], None)},
866                          [])
867         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
868
869         read_answer = read("si1", [0], [(0,10)])
870         self.failUnlessEqual(read_answer, {0: [data[:10]]})
871
872         # Sending a new_length shorter than the current length truncates the
873         # data.
874         answer = rstaraw("si1", secrets,
875                          {0: ([], [], 9)},
876                          [])
877         read_answer = read("si1", [0], [(0,10)])
878         self.failUnlessEqual(read_answer, {0: [data[:9]]})
879
880         # Sending a new_length longer than the current length doesn't change
881         # the data.
882         answer = rstaraw("si1", secrets,
883                          {0: ([], [], 20)},
884                          [])
885         assert answer == (True, {0:[],1:[],2:[]})
886         read_answer = read("si1", [0], [(0, 20)])
887         self.failUnlessEqual(read_answer, {0: [data[:9]]})
888
889         # Sending a write vector whose start is after the end of the current
890         # data doesn't reveal "whatever was there last time" (palimpsest),
891         # but instead fills with zeroes.
892
893         # To test this, we fill the data area with a recognizable pattern.
894         pattern = ''.join([chr(i) for i in range(100)])
895         answer = rstaraw("si1", secrets,
896                          {0: ([], [(0, pattern)], None)},
897                          [])
898         assert answer == (True, {0:[],1:[],2:[]})
899         # Then truncate the data...
900         answer = rstaraw("si1", secrets,
901                          {0: ([], [], 20)},
902                          [])
903         assert answer == (True, {0:[],1:[],2:[]})
904         # Just confirm that you get an empty string if you try to read from
905         # past the (new) endpoint now.
906         answer = rstaraw("si1", secrets,
907                          {0: ([], [], None)},
908                          [(20, 1980)])
909         self.failUnlessEqual(answer, (True, {0:[''],1:[''],2:['']}))
910
911         # Then the extend the file by writing a vector which starts out past
912         # the end...
913         answer = rstaraw("si1", secrets,
914                          {0: ([], [(50, 'hellothere')], None)},
915                          [])
916         assert answer == (True, {0:[],1:[],2:[]})
917         # Now if you read the stuff between 20 (where we earlier truncated)
918         # and 50, it had better be all zeroes.
919         answer = rstaraw("si1", secrets,
920                          {0: ([], [], None)},
921                          [(20, 30)])
922         self.failUnlessEqual(answer, (True, {0:['\x00'*30],1:[''],2:['']}))
923
924         # Also see if the server explicitly declares that it supports this
925         # feature.
926         ver = ss.remote_get_version()
927         storage_v1_ver = ver["http://allmydata.org/tahoe/protocols/storage/v1"]
928         self.failUnless(storage_v1_ver.get("fills-holes-with-zero-bytes"))
929
930         # If the size is dropped to zero the share is deleted.
931         answer = rstaraw("si1", secrets,
932                          {0: ([], [(0,data)], 0)},
933                          [])
934         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
935
936         read_answer = read("si1", [0], [(0,10)])
937         self.failUnlessEqual(read_answer, {})
938
939     def test_allocate(self):
940         ss = self.create("test_allocate")
941         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
942                       set([0,1,2]), 100)
943
944         read = ss.remote_slot_readv
945         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
946                              {0: [""]})
947         self.failUnlessEqual(read("si1", [], [(0, 10)]),
948                              {0: [""], 1: [""], 2: [""]})
949         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
950                              {0: [""]})
951
952         # try writing to one
953         secrets = ( self.write_enabler("we1"),
954                     self.renew_secret("we1"),
955                     self.cancel_secret("we1") )
956         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
957         write = ss.remote_slot_testv_and_readv_and_writev
958         answer = write("si1", secrets,
959                        {0: ([], [(0,data)], None)},
960                        [])
961         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
962
963         self.failUnlessEqual(read("si1", [0], [(0,20)]),
964                              {0: ["00000000001111111111"]})
965         self.failUnlessEqual(read("si1", [0], [(95,10)]),
966                              {0: ["99999"]})
967         #self.failUnlessEqual(s0.remote_get_length(), 100)
968
969         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
970         f = self.failUnlessRaises(BadWriteEnablerError,
971                                   write, "si1", bad_secrets,
972                                   {}, [])
973         self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
974
975         # this testv should fail
976         answer = write("si1", secrets,
977                        {0: ([(0, 12, "eq", "444444444444"),
978                              (20, 5, "eq", "22222"),
979                              ],
980                             [(0, "x"*100)],
981                             None),
982                         },
983                        [(0,12), (20,5)],
984                        )
985         self.failUnlessEqual(answer, (False,
986                                       {0: ["000000000011", "22222"],
987                                        1: ["", ""],
988                                        2: ["", ""],
989                                        }))
990         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
991
992         # as should this one
993         answer = write("si1", secrets,
994                        {0: ([(10, 5, "lt", "11111"),
995                              ],
996                             [(0, "x"*100)],
997                             None),
998                         },
999                        [(10,5)],
1000                        )
1001         self.failUnlessEqual(answer, (False,
1002                                       {0: ["11111"],
1003                                        1: [""],
1004                                        2: [""]},
1005                                       ))
1006         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1007
1008
1009     def test_operators(self):
1010         # test operators, the data we're comparing is '11111' in all cases.
1011         # test both fail+pass, reset data after each one.
1012         ss = self.create("test_operators")
1013
1014         secrets = ( self.write_enabler("we1"),
1015                     self.renew_secret("we1"),
1016                     self.cancel_secret("we1") )
1017         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1018         write = ss.remote_slot_testv_and_readv_and_writev
1019         read = ss.remote_slot_readv
1020
1021         def reset():
1022             write("si1", secrets,
1023                   {0: ([], [(0,data)], None)},
1024                   [])
1025
1026         reset()
1027
1028         #  lt
1029         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
1030                                              ],
1031                                             [(0, "x"*100)],
1032                                             None,
1033                                             )}, [(10,5)])
1034         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1035         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1036         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
1037         reset()
1038
1039         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
1040                                              ],
1041                                             [(0, "x"*100)],
1042                                             None,
1043                                             )}, [(10,5)])
1044         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1045         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1046         reset()
1047
1048         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
1049                                              ],
1050                                             [(0, "y"*100)],
1051                                             None,
1052                                             )}, [(10,5)])
1053         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1054         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1055         reset()
1056
1057         #  le
1058         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
1059                                              ],
1060                                             [(0, "x"*100)],
1061                                             None,
1062                                             )}, [(10,5)])
1063         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1064         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1065         reset()
1066
1067         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
1068                                              ],
1069                                             [(0, "y"*100)],
1070                                             None,
1071                                             )}, [(10,5)])
1072         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1073         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1074         reset()
1075
1076         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
1077                                              ],
1078                                             [(0, "y"*100)],
1079                                             None,
1080                                             )}, [(10,5)])
1081         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1082         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1083         reset()
1084
1085         #  eq
1086         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
1087                                              ],
1088                                             [(0, "x"*100)],
1089                                             None,
1090                                             )}, [(10,5)])
1091         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1092         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1093         reset()
1094
1095         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
1096                                              ],
1097                                             [(0, "y"*100)],
1098                                             None,
1099                                             )}, [(10,5)])
1100         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1101         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1102         reset()
1103
1104         #  ne
1105         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
1106                                              ],
1107                                             [(0, "x"*100)],
1108                                             None,
1109                                             )}, [(10,5)])
1110         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1111         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1112         reset()
1113
1114         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1115                                              ],
1116                                             [(0, "y"*100)],
1117                                             None,
1118                                             )}, [(10,5)])
1119         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1120         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1121         reset()
1122
1123         #  ge
1124         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1125                                              ],
1126                                             [(0, "y"*100)],
1127                                             None,
1128                                             )}, [(10,5)])
1129         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1130         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1131         reset()
1132
1133         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1134                                              ],
1135                                             [(0, "y"*100)],
1136                                             None,
1137                                             )}, [(10,5)])
1138         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1139         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1140         reset()
1141
1142         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1143                                              ],
1144                                             [(0, "y"*100)],
1145                                             None,
1146                                             )}, [(10,5)])
1147         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1148         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1149         reset()
1150
1151         #  gt
1152         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1153                                              ],
1154                                             [(0, "y"*100)],
1155                                             None,
1156                                             )}, [(10,5)])
1157         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1158         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1159         reset()
1160
1161         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1162                                              ],
1163                                             [(0, "x"*100)],
1164                                             None,
1165                                             )}, [(10,5)])
1166         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1167         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1168         reset()
1169
1170         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1171                                              ],
1172                                             [(0, "x"*100)],
1173                                             None,
1174                                             )}, [(10,5)])
1175         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1176         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1177         reset()
1178
1179         # finally, test some operators against empty shares
1180         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1181                                              ],
1182                                             [(0, "x"*100)],
1183                                             None,
1184                                             )}, [(10,5)])
1185         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1186         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1187         reset()
1188
1189     def test_readv(self):
1190         ss = self.create("test_readv")
1191         secrets = ( self.write_enabler("we1"),
1192                     self.renew_secret("we1"),
1193                     self.cancel_secret("we1") )
1194         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1195         write = ss.remote_slot_testv_and_readv_and_writev
1196         read = ss.remote_slot_readv
1197         data = [("%d" % i) * 100 for i in range(3)]
1198         rc = write("si1", secrets,
1199                    {0: ([], [(0,data[0])], None),
1200                     1: ([], [(0,data[1])], None),
1201                     2: ([], [(0,data[2])], None),
1202                     }, [])
1203         self.failUnlessEqual(rc, (True, {}))
1204
1205         answer = read("si1", [], [(0, 10)])
1206         self.failUnlessEqual(answer, {0: ["0"*10],
1207                                       1: ["1"*10],
1208                                       2: ["2"*10]})
1209
1210     def compare_leases_without_timestamps(self, leases_a, leases_b):
1211         self.failUnlessEqual(len(leases_a), len(leases_b))
1212         for i in range(len(leases_a)):
1213             a = leases_a[i]
1214             b = leases_b[i]
1215             self.failUnlessEqual(a.owner_num,       b.owner_num)
1216             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1217             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1218             self.failUnlessEqual(a.nodeid,          b.nodeid)
1219
1220     def compare_leases(self, leases_a, leases_b):
1221         self.failUnlessEqual(len(leases_a), len(leases_b))
1222         for i in range(len(leases_a)):
1223             a = leases_a[i]
1224             b = leases_b[i]
1225             self.failUnlessEqual(a.owner_num,       b.owner_num)
1226             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1227             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1228             self.failUnlessEqual(a.nodeid,          b.nodeid)
1229             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1230
1231     def test_leases(self):
1232         ss = self.create("test_leases")
1233         def secrets(n):
1234             return ( self.write_enabler("we1"),
1235                      self.renew_secret("we1-%d" % n),
1236                      self.cancel_secret("we1-%d" % n) )
1237         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1238         write = ss.remote_slot_testv_and_readv_and_writev
1239         read = ss.remote_slot_readv
1240         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1241         self.failUnlessEqual(rc, (True, {}))
1242
1243         # create a random non-numeric file in the bucket directory, to
1244         # exercise the code that's supposed to ignore those.
1245         bucket_dir = os.path.join(self.workdir("test_leases"),
1246                                   "shares", storage_index_to_dir("si1"))
1247         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1248         f.write("you ought to be ignoring me\n")
1249         f.close()
1250
1251         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1252         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1253
1254         # add-lease on a missing storage index is silently ignored
1255         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1256
1257         # re-allocate the slots and use the same secrets, that should update
1258         # the lease
1259         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1260         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1261
1262         # renew it directly
1263         ss.remote_renew_lease("si1", secrets(0)[1])
1264         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1265
1266         # now allocate them with a bunch of different secrets, to trigger the
1267         # extended lease code. Use add_lease for one of them.
1268         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1269         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1270         secrets2 = secrets(2)
1271         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1272         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1273         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1274         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1275         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1276
1277         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1278
1279         all_leases = list(s0.get_leases())
1280         # and write enough data to expand the container, forcing the server
1281         # to move the leases
1282         write("si1", secrets(0),
1283               {0: ([], [(0,data)], 200), },
1284               [])
1285
1286         # read back the leases, make sure they're still intact.
1287         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1288
1289         ss.remote_renew_lease("si1", secrets(0)[1])
1290         ss.remote_renew_lease("si1", secrets(1)[1])
1291         ss.remote_renew_lease("si1", secrets(2)[1])
1292         ss.remote_renew_lease("si1", secrets(3)[1])
1293         ss.remote_renew_lease("si1", secrets(4)[1])
1294         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1295         # get a new copy of the leases, with the current timestamps. Reading
1296         # data and failing to renew/cancel leases should leave the timestamps
1297         # alone.
1298         all_leases = list(s0.get_leases())
1299         # renewing with a bogus token should prompt an error message
1300
1301         # examine the exception thus raised, make sure the old nodeid is
1302         # present, to provide for share migration
1303         e = self.failUnlessRaises(IndexError,
1304                                   ss.remote_renew_lease, "si1",
1305                                   secrets(20)[1])
1306         e_s = str(e)
1307         self.failUnlessIn("Unable to renew non-existent lease", e_s)
1308         self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1309         self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1310
1311         self.compare_leases(all_leases, list(s0.get_leases()))
1312
1313         # reading shares should not modify the timestamp
1314         read("si1", [], [(0,200)])
1315         self.compare_leases(all_leases, list(s0.get_leases()))
1316
1317         write("si1", secrets(0),
1318               {0: ([], [(200, "make me bigger")], None)}, [])
1319         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1320
1321         write("si1", secrets(0),
1322               {0: ([], [(500, "make me really bigger")], None)}, [])
1323         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1324
1325     def test_remove(self):
1326         ss = self.create("test_remove")
1327         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1328                       set([0,1,2]), 100)
1329         readv = ss.remote_slot_readv
1330         writev = ss.remote_slot_testv_and_readv_and_writev
1331         secrets = ( self.write_enabler("we1"),
1332                     self.renew_secret("we1"),
1333                     self.cancel_secret("we1") )
1334         # delete sh0 by setting its size to zero
1335         answer = writev("si1", secrets,
1336                         {0: ([], [], 0)},
1337                         [])
1338         # the answer should mention all the shares that existed before the
1339         # write
1340         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1341         # but a new read should show only sh1 and sh2
1342         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1343                              {1: [""], 2: [""]})
1344
1345         # delete sh1 by setting its size to zero
1346         answer = writev("si1", secrets,
1347                         {1: ([], [], 0)},
1348                         [])
1349         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1350         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1351                              {2: [""]})
1352
1353         # delete sh2 by setting its size to zero
1354         answer = writev("si1", secrets,
1355                         {2: ([], [], 0)},
1356                         [])
1357         self.failUnlessEqual(answer, (True, {2:[]}) )
1358         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1359                              {})
1360         # and the bucket directory should now be gone
1361         si = base32.b2a("si1")
1362         # note: this is a detail of the storage server implementation, and
1363         # may change in the future
1364         prefix = si[:2]
1365         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1366         bucketdir = os.path.join(prefixdir, si)
1367         self.failUnless(os.path.exists(prefixdir), prefixdir)
1368         self.failIf(os.path.exists(bucketdir), bucketdir)
1369
1370
1371 class MDMFProxies(unittest.TestCase, ShouldFailMixin):
1372     def setUp(self):
1373         self.sparent = LoggingServiceParent()
1374         self._lease_secret = itertools.count()
1375         self.ss = self.create("MDMFProxies storage test server")
1376         self.rref = RemoteBucket()
1377         self.rref.target = self.ss
1378         self.secrets = (self.write_enabler("we_secret"),
1379                         self.renew_secret("renew_secret"),
1380                         self.cancel_secret("cancel_secret"))
1381         self.segment = "aaaaaa"
1382         self.block = "aa"
1383         self.salt = "a" * 16
1384         self.block_hash = "a" * 32
1385         self.block_hash_tree = [self.block_hash for i in xrange(6)]
1386         self.share_hash = self.block_hash
1387         self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
1388         self.signature = "foobarbaz"
1389         self.verification_key = "vvvvvv"
1390         self.encprivkey = "private"
1391         self.root_hash = self.block_hash
1392         self.salt_hash = self.root_hash
1393         self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
1394         self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
1395         self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
1396         # blockhashes and salt hashes are serialized in the same way,
1397         # only we lop off the first element and store that in the
1398         # header.
1399         self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
1400
1401
1402     def tearDown(self):
1403         self.sparent.stopService()
1404         shutil.rmtree(self.workdir("MDMFProxies storage test server"))
1405
1406
1407     def write_enabler(self, we_tag):
1408         return hashutil.tagged_hash("we_blah", we_tag)
1409
1410
1411     def renew_secret(self, tag):
1412         return hashutil.tagged_hash("renew_blah", str(tag))
1413
1414
1415     def cancel_secret(self, tag):
1416         return hashutil.tagged_hash("cancel_blah", str(tag))
1417
1418
1419     def workdir(self, name):
1420         basedir = os.path.join("storage", "MutableServer", name)
1421         return basedir
1422
1423
1424     def create(self, name):
1425         workdir = self.workdir(name)
1426         ss = StorageServer(workdir, "\x00" * 20)
1427         ss.setServiceParent(self.sparent)
1428         return ss
1429
1430
1431     def build_test_mdmf_share(self, tail_segment=False, empty=False):
1432         # Start with the checkstring
1433         data = struct.pack(">BQ32s",
1434                            1,
1435                            0,
1436                            self.root_hash)
1437         self.checkstring = data
1438         # Next, the encoding parameters
1439         if tail_segment:
1440             data += struct.pack(">BBQQ",
1441                                 3,
1442                                 10,
1443                                 6,
1444                                 33)
1445         elif empty:
1446             data += struct.pack(">BBQQ",
1447                                 3,
1448                                 10,
1449                                 0,
1450                                 0)
1451         else:
1452             data += struct.pack(">BBQQ",
1453                                 3,
1454                                 10,
1455                                 6,
1456                                 36)
1457         # Now we'll build the offsets.
1458         sharedata = ""
1459         if not tail_segment and not empty:
1460             for i in xrange(6):
1461                 sharedata += self.salt + self.block
1462         elif tail_segment:
1463             for i in xrange(5):
1464                 sharedata += self.salt + self.block
1465             sharedata += self.salt + "a"
1466
1467         # The encrypted private key comes after the shares + salts
1468         offset_size = struct.calcsize(MDMFOFFSETS)
1469         encrypted_private_key_offset = len(data) + offset_size
1470         # The share has chain comes after the private key
1471         sharehashes_offset = encrypted_private_key_offset + \
1472             len(self.encprivkey)
1473
1474         # The signature comes after the share hash chain.
1475         signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
1476
1477         verification_key_offset = signature_offset + len(self.signature)
1478         verification_key_end = verification_key_offset + \
1479             len(self.verification_key)
1480
1481         share_data_offset = offset_size
1482         share_data_offset += PRIVATE_KEY_SIZE
1483         share_data_offset += SIGNATURE_SIZE
1484         share_data_offset += VERIFICATION_KEY_SIZE
1485         share_data_offset += SHARE_HASH_CHAIN_SIZE
1486
1487         blockhashes_offset = share_data_offset + len(sharedata)
1488         eof_offset = blockhashes_offset + len(self.block_hash_tree_s)
1489
1490         data += struct.pack(MDMFOFFSETS,
1491                             encrypted_private_key_offset,
1492                             sharehashes_offset,
1493                             signature_offset,
1494                             verification_key_offset,
1495                             verification_key_end,
1496                             share_data_offset,
1497                             blockhashes_offset,
1498                             eof_offset)
1499
1500         self.offsets = {}
1501         self.offsets['enc_privkey'] = encrypted_private_key_offset
1502         self.offsets['block_hash_tree'] = blockhashes_offset
1503         self.offsets['share_hash_chain'] = sharehashes_offset
1504         self.offsets['signature'] = signature_offset
1505         self.offsets['verification_key'] = verification_key_offset
1506         self.offsets['share_data'] = share_data_offset
1507         self.offsets['verification_key_end'] = verification_key_end
1508         self.offsets['EOF'] = eof_offset
1509
1510         # the private key,
1511         data += self.encprivkey
1512         # the sharehashes
1513         data += self.share_hash_chain_s
1514         # the signature,
1515         data += self.signature
1516         # and the verification key
1517         data += self.verification_key
1518         # Then we'll add in gibberish until we get to the right point.
1519         nulls = "".join([" " for i in xrange(len(data), share_data_offset)])
1520         data += nulls
1521
1522         # Then the share data
1523         data += sharedata
1524         # the blockhashes
1525         data += self.block_hash_tree_s
1526         return data
1527
1528
1529     def write_test_share_to_server(self,
1530                                    storage_index,
1531                                    tail_segment=False,
1532                                    empty=False):
1533         """
1534         I write some data for the read tests to read to self.ss
1535
1536         If tail_segment=True, then I will write a share that has a
1537         smaller tail segment than other segments.
1538         """
1539         write = self.ss.remote_slot_testv_and_readv_and_writev
1540         data = self.build_test_mdmf_share(tail_segment, empty)
1541         # Finally, we write the whole thing to the storage server in one
1542         # pass.
1543         testvs = [(0, 1, "eq", "")]
1544         tws = {}
1545         tws[0] = (testvs, [(0, data)], None)
1546         readv = [(0, 1)]
1547         results = write(storage_index, self.secrets, tws, readv)
1548         self.failUnless(results[0])
1549
1550
1551     def build_test_sdmf_share(self, empty=False):
1552         if empty:
1553             sharedata = ""
1554         else:
1555             sharedata = self.segment * 6
1556         self.sharedata = sharedata
1557         blocksize = len(sharedata) / 3
1558         block = sharedata[:blocksize]
1559         self.blockdata = block
1560         prefix = struct.pack(">BQ32s16s BBQQ",
1561                              0, # version,
1562                              0,
1563                              self.root_hash,
1564                              self.salt,
1565                              3,
1566                              10,
1567                              len(sharedata),
1568                              len(sharedata),
1569                             )
1570         post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
1571         signature_offset = post_offset + len(self.verification_key)
1572         sharehashes_offset = signature_offset + len(self.signature)
1573         blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
1574         sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
1575         encprivkey_offset = sharedata_offset + len(block)
1576         eof_offset = encprivkey_offset + len(self.encprivkey)
1577         offsets = struct.pack(">LLLLQQ",
1578                               signature_offset,
1579                               sharehashes_offset,
1580                               blockhashes_offset,
1581                               sharedata_offset,
1582                               encprivkey_offset,
1583                               eof_offset)
1584         final_share = "".join([prefix,
1585                            offsets,
1586                            self.verification_key,
1587                            self.signature,
1588                            self.share_hash_chain_s,
1589                            self.block_hash_tree_s,
1590                            block,
1591                            self.encprivkey])
1592         self.offsets = {}
1593         self.offsets['signature'] = signature_offset
1594         self.offsets['share_hash_chain'] = sharehashes_offset
1595         self.offsets['block_hash_tree'] = blockhashes_offset
1596         self.offsets['share_data'] = sharedata_offset
1597         self.offsets['enc_privkey'] = encprivkey_offset
1598         self.offsets['EOF'] = eof_offset
1599         return final_share
1600
1601
1602     def write_sdmf_share_to_server(self,
1603                                    storage_index,
1604                                    empty=False):
1605         # Some tests need SDMF shares to verify that we can still
1606         # read them. This method writes one, which resembles but is not
1607         assert self.rref
1608         write = self.ss.remote_slot_testv_and_readv_and_writev
1609         share = self.build_test_sdmf_share(empty)
1610         testvs = [(0, 1, "eq", "")]
1611         tws = {}
1612         tws[0] = (testvs, [(0, share)], None)
1613         readv = []
1614         results = write(storage_index, self.secrets, tws, readv)
1615         self.failUnless(results[0])
1616
1617
1618     def test_read(self):
1619         self.write_test_share_to_server("si1")
1620         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1621         # Check that every method equals what we expect it to.
1622         d = defer.succeed(None)
1623         def _check_block_and_salt((block, salt)):
1624             self.failUnlessEqual(block, self.block)
1625             self.failUnlessEqual(salt, self.salt)
1626
1627         for i in xrange(6):
1628             d.addCallback(lambda ignored, i=i:
1629                 mr.get_block_and_salt(i))
1630             d.addCallback(_check_block_and_salt)
1631
1632         d.addCallback(lambda ignored:
1633             mr.get_encprivkey())
1634         d.addCallback(lambda encprivkey:
1635             self.failUnlessEqual(self.encprivkey, encprivkey))
1636
1637         d.addCallback(lambda ignored:
1638             mr.get_blockhashes())
1639         d.addCallback(lambda blockhashes:
1640             self.failUnlessEqual(self.block_hash_tree, blockhashes))
1641
1642         d.addCallback(lambda ignored:
1643             mr.get_sharehashes())
1644         d.addCallback(lambda sharehashes:
1645             self.failUnlessEqual(self.share_hash_chain, sharehashes))
1646
1647         d.addCallback(lambda ignored:
1648             mr.get_signature())
1649         d.addCallback(lambda signature:
1650             self.failUnlessEqual(signature, self.signature))
1651
1652         d.addCallback(lambda ignored:
1653             mr.get_verification_key())
1654         d.addCallback(lambda verification_key:
1655             self.failUnlessEqual(verification_key, self.verification_key))
1656
1657         d.addCallback(lambda ignored:
1658             mr.get_seqnum())
1659         d.addCallback(lambda seqnum:
1660             self.failUnlessEqual(seqnum, 0))
1661
1662         d.addCallback(lambda ignored:
1663             mr.get_root_hash())
1664         d.addCallback(lambda root_hash:
1665             self.failUnlessEqual(self.root_hash, root_hash))
1666
1667         d.addCallback(lambda ignored:
1668             mr.get_seqnum())
1669         d.addCallback(lambda seqnum:
1670             self.failUnlessEqual(0, seqnum))
1671
1672         d.addCallback(lambda ignored:
1673             mr.get_encoding_parameters())
1674         def _check_encoding_parameters((k, n, segsize, datalen)):
1675             self.failUnlessEqual(k, 3)
1676             self.failUnlessEqual(n, 10)
1677             self.failUnlessEqual(segsize, 6)
1678             self.failUnlessEqual(datalen, 36)
1679         d.addCallback(_check_encoding_parameters)
1680
1681         d.addCallback(lambda ignored:
1682             mr.get_checkstring())
1683         d.addCallback(lambda checkstring:
1684             self.failUnlessEqual(checkstring, checkstring))
1685         return d
1686
1687
1688     def test_read_with_different_tail_segment_size(self):
1689         self.write_test_share_to_server("si1", tail_segment=True)
1690         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1691         d = mr.get_block_and_salt(5)
1692         def _check_tail_segment(results):
1693             block, salt = results
1694             self.failUnlessEqual(len(block), 1)
1695             self.failUnlessEqual(block, "a")
1696         d.addCallback(_check_tail_segment)
1697         return d
1698
1699
1700     def test_get_block_with_invalid_segnum(self):
1701         self.write_test_share_to_server("si1")
1702         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1703         d = defer.succeed(None)
1704         d.addCallback(lambda ignored:
1705             self.shouldFail(LayoutInvalid, "test invalid segnum",
1706                             None,
1707                             mr.get_block_and_salt, 7))
1708         return d
1709
1710
1711     def test_get_encoding_parameters_first(self):
1712         self.write_test_share_to_server("si1")
1713         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1714         d = mr.get_encoding_parameters()
1715         def _check_encoding_parameters((k, n, segment_size, datalen)):
1716             self.failUnlessEqual(k, 3)
1717             self.failUnlessEqual(n, 10)
1718             self.failUnlessEqual(segment_size, 6)
1719             self.failUnlessEqual(datalen, 36)
1720         d.addCallback(_check_encoding_parameters)
1721         return d
1722
1723
1724     def test_get_seqnum_first(self):
1725         self.write_test_share_to_server("si1")
1726         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1727         d = mr.get_seqnum()
1728         d.addCallback(lambda seqnum:
1729             self.failUnlessEqual(seqnum, 0))
1730         return d
1731
1732
1733     def test_get_root_hash_first(self):
1734         self.write_test_share_to_server("si1")
1735         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1736         d = mr.get_root_hash()
1737         d.addCallback(lambda root_hash:
1738             self.failUnlessEqual(root_hash, self.root_hash))
1739         return d
1740
1741
1742     def test_get_checkstring_first(self):
1743         self.write_test_share_to_server("si1")
1744         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1745         d = mr.get_checkstring()
1746         d.addCallback(lambda checkstring:
1747             self.failUnlessEqual(checkstring, self.checkstring))
1748         return d
1749
1750
1751     def test_write_read_vectors(self):
1752         # When writing for us, the storage server will return to us a
1753         # read vector, along with its result. If a write fails because
1754         # the test vectors failed, this read vector can help us to
1755         # diagnose the problem. This test ensures that the read vector
1756         # is working appropriately.
1757         mw = self._make_new_mw("si1", 0)
1758
1759         for i in xrange(6):
1760             mw.put_block(self.block, i, self.salt)
1761         mw.put_encprivkey(self.encprivkey)
1762         mw.put_blockhashes(self.block_hash_tree)
1763         mw.put_sharehashes(self.share_hash_chain)
1764         mw.put_root_hash(self.root_hash)
1765         mw.put_signature(self.signature)
1766         mw.put_verification_key(self.verification_key)
1767         d = mw.finish_publishing()
1768         def _then(results):
1769             self.failUnless(len(results), 2)
1770             result, readv = results
1771             self.failUnless(result)
1772             self.failIf(readv)
1773             self.old_checkstring = mw.get_checkstring()
1774             mw.set_checkstring("")
1775         d.addCallback(_then)
1776         d.addCallback(lambda ignored:
1777             mw.finish_publishing())
1778         def _then_again(results):
1779             self.failUnlessEqual(len(results), 2)
1780             result, readvs = results
1781             self.failIf(result)
1782             self.failUnlessIn(0, readvs)
1783             readv = readvs[0][0]
1784             self.failUnlessEqual(readv, self.old_checkstring)
1785         d.addCallback(_then_again)
1786         # The checkstring remains the same for the rest of the process.
1787         return d
1788
1789
1790     def test_private_key_after_share_hash_chain(self):
1791         mw = self._make_new_mw("si1", 0)
1792         d = defer.succeed(None)
1793         for i in xrange(6):
1794             d.addCallback(lambda ignored, i=i:
1795                 mw.put_block(self.block, i, self.salt))
1796         d.addCallback(lambda ignored:
1797             mw.put_encprivkey(self.encprivkey))
1798         d.addCallback(lambda ignored:
1799             mw.put_sharehashes(self.share_hash_chain))
1800
1801         # Now try to put the private key again.
1802         d.addCallback(lambda ignored:
1803             self.shouldFail(LayoutInvalid, "test repeat private key",
1804                             None,
1805                             mw.put_encprivkey, self.encprivkey))
1806         return d
1807
1808
1809     def test_signature_after_verification_key(self):
1810         mw = self._make_new_mw("si1", 0)
1811         d = defer.succeed(None)
1812         # Put everything up to and including the verification key.
1813         for i in xrange(6):
1814             d.addCallback(lambda ignored, i=i:
1815                 mw.put_block(self.block, i, self.salt))
1816         d.addCallback(lambda ignored:
1817             mw.put_encprivkey(self.encprivkey))
1818         d.addCallback(lambda ignored:
1819             mw.put_blockhashes(self.block_hash_tree))
1820         d.addCallback(lambda ignored:
1821             mw.put_sharehashes(self.share_hash_chain))
1822         d.addCallback(lambda ignored:
1823             mw.put_root_hash(self.root_hash))
1824         d.addCallback(lambda ignored:
1825             mw.put_signature(self.signature))
1826         d.addCallback(lambda ignored:
1827             mw.put_verification_key(self.verification_key))
1828         # Now try to put the signature again. This should fail
1829         d.addCallback(lambda ignored:
1830             self.shouldFail(LayoutInvalid, "signature after verification",
1831                             None,
1832                             mw.put_signature, self.signature))
1833         return d
1834
1835
1836     def test_uncoordinated_write(self):
1837         # Make two mutable writers, both pointing to the same storage
1838         # server, both at the same storage index, and try writing to the
1839         # same share.
1840         mw1 = self._make_new_mw("si1", 0)
1841         mw2 = self._make_new_mw("si1", 0)
1842
1843         def _check_success(results):
1844             result, readvs = results
1845             self.failUnless(result)
1846
1847         def _check_failure(results):
1848             result, readvs = results
1849             self.failIf(result)
1850
1851         def _write_share(mw):
1852             for i in xrange(6):
1853                 mw.put_block(self.block, i, self.salt)
1854             mw.put_encprivkey(self.encprivkey)
1855             mw.put_blockhashes(self.block_hash_tree)
1856             mw.put_sharehashes(self.share_hash_chain)
1857             mw.put_root_hash(self.root_hash)
1858             mw.put_signature(self.signature)
1859             mw.put_verification_key(self.verification_key)
1860             return mw.finish_publishing()
1861         d = _write_share(mw1)
1862         d.addCallback(_check_success)
1863         d.addCallback(lambda ignored:
1864             _write_share(mw2))
1865         d.addCallback(_check_failure)
1866         return d
1867
1868
1869     def test_invalid_salt_size(self):
1870         # Salts need to be 16 bytes in size. Writes that attempt to
1871         # write more or less than this should be rejected.
1872         mw = self._make_new_mw("si1", 0)
1873         invalid_salt = "a" * 17 # 17 bytes
1874         another_invalid_salt = "b" * 15 # 15 bytes
1875         d = defer.succeed(None)
1876         d.addCallback(lambda ignored:
1877             self.shouldFail(LayoutInvalid, "salt too big",
1878                             None,
1879                             mw.put_block, self.block, 0, invalid_salt))
1880         d.addCallback(lambda ignored:
1881             self.shouldFail(LayoutInvalid, "salt too small",
1882                             None,
1883                             mw.put_block, self.block, 0,
1884                             another_invalid_salt))
1885         return d
1886
1887
1888     def test_write_test_vectors(self):
1889         # If we give the write proxy a bogus test vector at
1890         # any point during the process, it should fail to write when we
1891         # tell it to write.
1892         def _check_failure(results):
1893             self.failUnlessEqual(len(results), 2)
1894             res, d = results
1895             self.failIf(res)
1896
1897         def _check_success(results):
1898             self.failUnlessEqual(len(results), 2)
1899             res, d = results
1900             self.failUnless(results)
1901
1902         mw = self._make_new_mw("si1", 0)
1903         mw.set_checkstring("this is a lie")
1904         for i in xrange(6):
1905             mw.put_block(self.block, i, self.salt)
1906         mw.put_encprivkey(self.encprivkey)
1907         mw.put_blockhashes(self.block_hash_tree)
1908         mw.put_sharehashes(self.share_hash_chain)
1909         mw.put_root_hash(self.root_hash)
1910         mw.put_signature(self.signature)
1911         mw.put_verification_key(self.verification_key)
1912         d = mw.finish_publishing()
1913         d.addCallback(_check_failure)
1914         d.addCallback(lambda ignored:
1915             mw.set_checkstring(""))
1916         d.addCallback(lambda ignored:
1917             mw.finish_publishing())
1918         d.addCallback(_check_success)
1919         return d
1920
1921
1922     def serialize_blockhashes(self, blockhashes):
1923         return "".join(blockhashes)
1924
1925
1926     def serialize_sharehashes(self, sharehashes):
1927         ret = "".join([struct.pack(">H32s", i, sharehashes[i])
1928                         for i in sorted(sharehashes.keys())])
1929         return ret
1930
1931
1932     def test_write(self):
1933         # This translates to a file with 6 6-byte segments, and with 2-byte
1934         # blocks.
1935         mw = self._make_new_mw("si1", 0)
1936         # Test writing some blocks.
1937         read = self.ss.remote_slot_readv
1938         expected_private_key_offset = struct.calcsize(MDMFHEADER)
1939         expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
1940                                     PRIVATE_KEY_SIZE + \
1941                                     SIGNATURE_SIZE + \
1942                                     VERIFICATION_KEY_SIZE + \
1943                                     SHARE_HASH_CHAIN_SIZE
1944         written_block_size = 2 + len(self.salt)
1945         written_block = self.block + self.salt
1946         for i in xrange(6):
1947             mw.put_block(self.block, i, self.salt)
1948
1949         mw.put_encprivkey(self.encprivkey)
1950         mw.put_blockhashes(self.block_hash_tree)
1951         mw.put_sharehashes(self.share_hash_chain)
1952         mw.put_root_hash(self.root_hash)
1953         mw.put_signature(self.signature)
1954         mw.put_verification_key(self.verification_key)
1955         d = mw.finish_publishing()
1956         def _check_publish(results):
1957             self.failUnlessEqual(len(results), 2)
1958             result, ign = results
1959             self.failUnless(result, "publish failed")
1960             for i in xrange(6):
1961                 self.failUnlessEqual(read("si1", [0], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]),
1962                                 {0: [written_block]})
1963
1964             self.failUnlessEqual(len(self.encprivkey), 7)
1965             self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]),
1966                                  {0: [self.encprivkey]})
1967
1968             expected_block_hash_offset = expected_sharedata_offset + \
1969                         (6 * written_block_size)
1970             self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6)
1971             self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]),
1972                                  {0: [self.block_hash_tree_s]})
1973
1974             expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey)
1975             self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
1976                                  {0: [self.share_hash_chain_s]})
1977
1978             self.failUnlessEqual(read("si1", [0], [(9, 32)]),
1979                                  {0: [self.root_hash]})
1980             expected_signature_offset = expected_share_hash_offset + \
1981                 len(self.share_hash_chain_s)
1982             self.failUnlessEqual(len(self.signature), 9)
1983             self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]),
1984                                  {0: [self.signature]})
1985
1986             expected_verification_key_offset = expected_signature_offset + len(self.signature)
1987             self.failUnlessEqual(len(self.verification_key), 6)
1988             self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]),
1989                                  {0: [self.verification_key]})
1990
1991             signable = mw.get_signable()
1992             verno, seq, roothash, k, n, segsize, datalen = \
1993                                             struct.unpack(">BQ32sBBQQ",
1994                                                           signable)
1995             self.failUnlessEqual(verno, 1)
1996             self.failUnlessEqual(seq, 0)
1997             self.failUnlessEqual(roothash, self.root_hash)
1998             self.failUnlessEqual(k, 3)
1999             self.failUnlessEqual(n, 10)
2000             self.failUnlessEqual(segsize, 6)
2001             self.failUnlessEqual(datalen, 36)
2002             expected_eof_offset = expected_block_hash_offset + \
2003                 len(self.block_hash_tree_s)
2004
2005             # Check the version number to make sure that it is correct.
2006             expected_version_number = struct.pack(">B", 1)
2007             self.failUnlessEqual(read("si1", [0], [(0, 1)]),
2008                                  {0: [expected_version_number]})
2009             # Check the sequence number to make sure that it is correct
2010             expected_sequence_number = struct.pack(">Q", 0)
2011             self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2012                                  {0: [expected_sequence_number]})
2013             # Check that the encoding parameters (k, N, segement size, data
2014             # length) are what they should be. These are  3, 10, 6, 36
2015             expected_k = struct.pack(">B", 3)
2016             self.failUnlessEqual(read("si1", [0], [(41, 1)]),
2017                                  {0: [expected_k]})
2018             expected_n = struct.pack(">B", 10)
2019             self.failUnlessEqual(read("si1", [0], [(42, 1)]),
2020                                  {0: [expected_n]})
2021             expected_segment_size = struct.pack(">Q", 6)
2022             self.failUnlessEqual(read("si1", [0], [(43, 8)]),
2023                                  {0: [expected_segment_size]})
2024             expected_data_length = struct.pack(">Q", 36)
2025             self.failUnlessEqual(read("si1", [0], [(51, 8)]),
2026                                  {0: [expected_data_length]})
2027             expected_offset = struct.pack(">Q", expected_private_key_offset)
2028             self.failUnlessEqual(read("si1", [0], [(59, 8)]),
2029                                  {0: [expected_offset]})
2030             expected_offset = struct.pack(">Q", expected_share_hash_offset)
2031             self.failUnlessEqual(read("si1", [0], [(67, 8)]),
2032                                  {0: [expected_offset]})
2033             expected_offset = struct.pack(">Q", expected_signature_offset)
2034             self.failUnlessEqual(read("si1", [0], [(75, 8)]),
2035                                  {0: [expected_offset]})
2036             expected_offset = struct.pack(">Q", expected_verification_key_offset)
2037             self.failUnlessEqual(read("si1", [0], [(83, 8)]),
2038                                  {0: [expected_offset]})
2039             expected_offset = struct.pack(">Q", expected_verification_key_offset + len(self.verification_key))
2040             self.failUnlessEqual(read("si1", [0], [(91, 8)]),
2041                                  {0: [expected_offset]})
2042             expected_offset = struct.pack(">Q", expected_sharedata_offset)
2043             self.failUnlessEqual(read("si1", [0], [(99, 8)]),
2044                                  {0: [expected_offset]})
2045             expected_offset = struct.pack(">Q", expected_block_hash_offset)
2046             self.failUnlessEqual(read("si1", [0], [(107, 8)]),
2047                                  {0: [expected_offset]})
2048             expected_offset = struct.pack(">Q", expected_eof_offset)
2049             self.failUnlessEqual(read("si1", [0], [(115, 8)]),
2050                                  {0: [expected_offset]})
2051         d.addCallback(_check_publish)
2052         return d
2053
2054     def _make_new_mw(self, si, share, datalength=36):
2055         # This is a file of size 36 bytes. Since it has a segment
2056         # size of 6, we know that it has 6 byte segments, which will
2057         # be split into blocks of 2 bytes because our FEC k
2058         # parameter is 3.
2059         mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
2060                                 6, datalength)
2061         return mw
2062
2063
2064     def test_write_rejected_with_too_many_blocks(self):
2065         mw = self._make_new_mw("si0", 0)
2066
2067         # Try writing too many blocks. We should not be able to write
2068         # more than 6
2069         # blocks into each share.
2070         d = defer.succeed(None)
2071         for i in xrange(6):
2072             d.addCallback(lambda ignored, i=i:
2073                 mw.put_block(self.block, i, self.salt))
2074         d.addCallback(lambda ignored:
2075             self.shouldFail(LayoutInvalid, "too many blocks",
2076                             None,
2077                             mw.put_block, self.block, 7, self.salt))
2078         return d
2079
2080
2081     def test_write_rejected_with_invalid_salt(self):
2082         # Try writing an invalid salt. Salts are 16 bytes -- any more or
2083         # less should cause an error.
2084         mw = self._make_new_mw("si1", 0)
2085         bad_salt = "a" * 17 # 17 bytes
2086         d = defer.succeed(None)
2087         d.addCallback(lambda ignored:
2088             self.shouldFail(LayoutInvalid, "test_invalid_salt",
2089                             None, mw.put_block, self.block, 7, bad_salt))
2090         return d
2091
2092
2093     def test_write_rejected_with_invalid_root_hash(self):
2094         # Try writing an invalid root hash. This should be SHA256d, and
2095         # 32 bytes long as a result.
2096         mw = self._make_new_mw("si2", 0)
2097         # 17 bytes != 32 bytes
2098         invalid_root_hash = "a" * 17
2099         d = defer.succeed(None)
2100         # Before this test can work, we need to put some blocks + salts,
2101         # a block hash tree, and a share hash tree. Otherwise, we'll see
2102         # failures that match what we are looking for, but are caused by
2103         # the constraints imposed on operation ordering.
2104         for i in xrange(6):
2105             d.addCallback(lambda ignored, i=i:
2106                 mw.put_block(self.block, i, self.salt))
2107         d.addCallback(lambda ignored:
2108             mw.put_encprivkey(self.encprivkey))
2109         d.addCallback(lambda ignored:
2110             mw.put_blockhashes(self.block_hash_tree))
2111         d.addCallback(lambda ignored:
2112             mw.put_sharehashes(self.share_hash_chain))
2113         d.addCallback(lambda ignored:
2114             self.shouldFail(LayoutInvalid, "invalid root hash",
2115                             None, mw.put_root_hash, invalid_root_hash))
2116         return d
2117
2118
2119     def test_write_rejected_with_invalid_blocksize(self):
2120         # The blocksize implied by the writer that we get from
2121         # _make_new_mw is 2bytes -- any more or any less than this
2122         # should be cause for failure, unless it is the tail segment, in
2123         # which case it may not be failure.
2124         invalid_block = "a"
2125         mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
2126                                              # one byte blocks
2127         # 1 bytes != 2 bytes
2128         d = defer.succeed(None)
2129         d.addCallback(lambda ignored, invalid_block=invalid_block:
2130             self.shouldFail(LayoutInvalid, "test blocksize too small",
2131                             None, mw.put_block, invalid_block, 0,
2132                             self.salt))
2133         invalid_block = invalid_block * 3
2134         # 3 bytes != 2 bytes
2135         d.addCallback(lambda ignored:
2136             self.shouldFail(LayoutInvalid, "test blocksize too large",
2137                             None,
2138                             mw.put_block, invalid_block, 0, self.salt))
2139         for i in xrange(5):
2140             d.addCallback(lambda ignored, i=i:
2141                 mw.put_block(self.block, i, self.salt))
2142         # Try to put an invalid tail segment
2143         d.addCallback(lambda ignored:
2144             self.shouldFail(LayoutInvalid, "test invalid tail segment",
2145                             None,
2146                             mw.put_block, self.block, 5, self.salt))
2147         valid_block = "a"
2148         d.addCallback(lambda ignored:
2149             mw.put_block(valid_block, 5, self.salt))
2150         return d
2151
2152
2153     def test_write_enforces_order_constraints(self):
2154         # We require that the MDMFSlotWriteProxy be interacted with in a
2155         # specific way.
2156         # That way is:
2157         # 0: __init__
2158         # 1: write blocks and salts
2159         # 2: Write the encrypted private key
2160         # 3: Write the block hashes
2161         # 4: Write the share hashes
2162         # 5: Write the root hash and salt hash
2163         # 6: Write the signature and verification key
2164         # 7: Write the file.
2165         #
2166         # Some of these can be performed out-of-order, and some can't.
2167         # The dependencies that I want to test here are:
2168         #  - Private key before block hashes
2169         #  - share hashes and block hashes before root hash
2170         #  - root hash before signature
2171         #  - signature before verification key
2172         mw0 = self._make_new_mw("si0", 0)
2173         # Write some shares
2174         d = defer.succeed(None)
2175         for i in xrange(6):
2176             d.addCallback(lambda ignored, i=i:
2177                 mw0.put_block(self.block, i, self.salt))
2178
2179         # Try to write the share hash chain without writing the
2180         # encrypted private key
2181         d.addCallback(lambda ignored:
2182             self.shouldFail(LayoutInvalid, "share hash chain before "
2183                                            "private key",
2184                             None,
2185                             mw0.put_sharehashes, self.share_hash_chain))
2186         # Write the private key.
2187         d.addCallback(lambda ignored:
2188             mw0.put_encprivkey(self.encprivkey))
2189
2190         # Now write the block hashes and try again
2191         d.addCallback(lambda ignored:
2192             mw0.put_blockhashes(self.block_hash_tree))
2193
2194         # We haven't yet put the root hash on the share, so we shouldn't
2195         # be able to sign it.
2196         d.addCallback(lambda ignored:
2197             self.shouldFail(LayoutInvalid, "signature before root hash",
2198                             None, mw0.put_signature, self.signature))
2199
2200         d.addCallback(lambda ignored:
2201             self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
2202
2203         # ..and, since that fails, we also shouldn't be able to put the
2204         # verification key.
2205         d.addCallback(lambda ignored:
2206             self.shouldFail(LayoutInvalid, "key before signature",
2207                             None, mw0.put_verification_key,
2208                             self.verification_key))
2209
2210         # Now write the share hashes.
2211         d.addCallback(lambda ignored:
2212             mw0.put_sharehashes(self.share_hash_chain))
2213         # We should be able to write the root hash now too
2214         d.addCallback(lambda ignored:
2215             mw0.put_root_hash(self.root_hash))
2216
2217         # We should still be unable to put the verification key
2218         d.addCallback(lambda ignored:
2219             self.shouldFail(LayoutInvalid, "key before signature",
2220                             None, mw0.put_verification_key,
2221                             self.verification_key))
2222
2223         d.addCallback(lambda ignored:
2224             mw0.put_signature(self.signature))
2225
2226         # We shouldn't be able to write the offsets to the remote server
2227         # until the offset table is finished; IOW, until we have written
2228         # the verification key.
2229         d.addCallback(lambda ignored:
2230             self.shouldFail(LayoutInvalid, "offsets before verification key",
2231                             None,
2232                             mw0.finish_publishing))
2233
2234         d.addCallback(lambda ignored:
2235             mw0.put_verification_key(self.verification_key))
2236         return d
2237
2238
2239     def test_end_to_end(self):
2240         mw = self._make_new_mw("si1", 0)
2241         # Write a share using the mutable writer, and make sure that the
2242         # reader knows how to read everything back to us.
2243         d = defer.succeed(None)
2244         for i in xrange(6):
2245             d.addCallback(lambda ignored, i=i:
2246                 mw.put_block(self.block, i, self.salt))
2247         d.addCallback(lambda ignored:
2248             mw.put_encprivkey(self.encprivkey))
2249         d.addCallback(lambda ignored:
2250             mw.put_blockhashes(self.block_hash_tree))
2251         d.addCallback(lambda ignored:
2252             mw.put_sharehashes(self.share_hash_chain))
2253         d.addCallback(lambda ignored:
2254             mw.put_root_hash(self.root_hash))
2255         d.addCallback(lambda ignored:
2256             mw.put_signature(self.signature))
2257         d.addCallback(lambda ignored:
2258             mw.put_verification_key(self.verification_key))
2259         d.addCallback(lambda ignored:
2260             mw.finish_publishing())
2261
2262         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2263         def _check_block_and_salt((block, salt)):
2264             self.failUnlessEqual(block, self.block)
2265             self.failUnlessEqual(salt, self.salt)
2266
2267         for i in xrange(6):
2268             d.addCallback(lambda ignored, i=i:
2269                 mr.get_block_and_salt(i))
2270             d.addCallback(_check_block_and_salt)
2271
2272         d.addCallback(lambda ignored:
2273             mr.get_encprivkey())
2274         d.addCallback(lambda encprivkey:
2275             self.failUnlessEqual(self.encprivkey, encprivkey))
2276
2277         d.addCallback(lambda ignored:
2278             mr.get_blockhashes())
2279         d.addCallback(lambda blockhashes:
2280             self.failUnlessEqual(self.block_hash_tree, blockhashes))
2281
2282         d.addCallback(lambda ignored:
2283             mr.get_sharehashes())
2284         d.addCallback(lambda sharehashes:
2285             self.failUnlessEqual(self.share_hash_chain, sharehashes))
2286
2287         d.addCallback(lambda ignored:
2288             mr.get_signature())
2289         d.addCallback(lambda signature:
2290             self.failUnlessEqual(signature, self.signature))
2291
2292         d.addCallback(lambda ignored:
2293             mr.get_verification_key())
2294         d.addCallback(lambda verification_key:
2295             self.failUnlessEqual(verification_key, self.verification_key))
2296
2297         d.addCallback(lambda ignored:
2298             mr.get_seqnum())
2299         d.addCallback(lambda seqnum:
2300             self.failUnlessEqual(seqnum, 0))
2301
2302         d.addCallback(lambda ignored:
2303             mr.get_root_hash())
2304         d.addCallback(lambda root_hash:
2305             self.failUnlessEqual(self.root_hash, root_hash))
2306
2307         d.addCallback(lambda ignored:
2308             mr.get_encoding_parameters())
2309         def _check_encoding_parameters((k, n, segsize, datalen)):
2310             self.failUnlessEqual(k, 3)
2311             self.failUnlessEqual(n, 10)
2312             self.failUnlessEqual(segsize, 6)
2313             self.failUnlessEqual(datalen, 36)
2314         d.addCallback(_check_encoding_parameters)
2315
2316         d.addCallback(lambda ignored:
2317             mr.get_checkstring())
2318         d.addCallback(lambda checkstring:
2319             self.failUnlessEqual(checkstring, mw.get_checkstring()))
2320         return d
2321
2322
2323     def test_is_sdmf(self):
2324         # The MDMFSlotReadProxy should also know how to read SDMF files,
2325         # since it will encounter them on the grid. Callers use the
2326         # is_sdmf method to test this.
2327         self.write_sdmf_share_to_server("si1")
2328         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2329         d = mr.is_sdmf()
2330         d.addCallback(lambda issdmf:
2331             self.failUnless(issdmf))
2332         return d
2333
2334
2335     def test_reads_sdmf(self):
2336         # The slot read proxy should, naturally, know how to tell us
2337         # about data in the SDMF format
2338         self.write_sdmf_share_to_server("si1")
2339         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2340         d = defer.succeed(None)
2341         d.addCallback(lambda ignored:
2342             mr.is_sdmf())
2343         d.addCallback(lambda issdmf:
2344             self.failUnless(issdmf))
2345
2346         # What do we need to read?
2347         #  - The sharedata
2348         #  - The salt
2349         d.addCallback(lambda ignored:
2350             mr.get_block_and_salt(0))
2351         def _check_block_and_salt(results):
2352             block, salt = results
2353             # Our original file is 36 bytes long. Then each share is 12
2354             # bytes in size. The share is composed entirely of the
2355             # letter a. self.block contains 2 as, so 6 * self.block is
2356             # what we are looking for.
2357             self.failUnlessEqual(block, self.block * 6)
2358             self.failUnlessEqual(salt, self.salt)
2359         d.addCallback(_check_block_and_salt)
2360
2361         #  - The blockhashes
2362         d.addCallback(lambda ignored:
2363             mr.get_blockhashes())
2364         d.addCallback(lambda blockhashes:
2365             self.failUnlessEqual(self.block_hash_tree,
2366                                  blockhashes,
2367                                  blockhashes))
2368         #  - The sharehashes
2369         d.addCallback(lambda ignored:
2370             mr.get_sharehashes())
2371         d.addCallback(lambda sharehashes:
2372             self.failUnlessEqual(self.share_hash_chain,
2373                                  sharehashes))
2374         #  - The keys
2375         d.addCallback(lambda ignored:
2376             mr.get_encprivkey())
2377         d.addCallback(lambda encprivkey:
2378             self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
2379         d.addCallback(lambda ignored:
2380             mr.get_verification_key())
2381         d.addCallback(lambda verification_key:
2382             self.failUnlessEqual(verification_key,
2383                                  self.verification_key,
2384                                  verification_key))
2385         #  - The signature
2386         d.addCallback(lambda ignored:
2387             mr.get_signature())
2388         d.addCallback(lambda signature:
2389             self.failUnlessEqual(signature, self.signature, signature))
2390
2391         #  - The sequence number
2392         d.addCallback(lambda ignored:
2393             mr.get_seqnum())
2394         d.addCallback(lambda seqnum:
2395             self.failUnlessEqual(seqnum, 0, seqnum))
2396
2397         #  - The root hash
2398         d.addCallback(lambda ignored:
2399             mr.get_root_hash())
2400         d.addCallback(lambda root_hash:
2401             self.failUnlessEqual(root_hash, self.root_hash, root_hash))
2402         return d
2403
2404
2405     def test_only_reads_one_segment_sdmf(self):
2406         # SDMF shares have only one segment, so it doesn't make sense to
2407         # read more segments than that. The reader should know this and
2408         # complain if we try to do that.
2409         self.write_sdmf_share_to_server("si1")
2410         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2411         d = defer.succeed(None)
2412         d.addCallback(lambda ignored:
2413             mr.is_sdmf())
2414         d.addCallback(lambda issdmf:
2415             self.failUnless(issdmf))
2416         d.addCallback(lambda ignored:
2417             self.shouldFail(LayoutInvalid, "test bad segment",
2418                             None,
2419                             mr.get_block_and_salt, 1))
2420         return d
2421
2422
2423     def test_read_with_prefetched_mdmf_data(self):
2424         # The MDMFSlotReadProxy will prefill certain fields if you pass
2425         # it data that you have already fetched. This is useful for
2426         # cases like the Servermap, which prefetches ~2kb of data while
2427         # finding out which shares are on the remote peer so that it
2428         # doesn't waste round trips.
2429         mdmf_data = self.build_test_mdmf_share()
2430         self.write_test_share_to_server("si1")
2431         def _make_mr(ignored, length):
2432             mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
2433             return mr
2434
2435         d = defer.succeed(None)
2436         # This should be enough to fill in both the encoding parameters
2437         # and the table of offsets, which will complete the version
2438         # information tuple.
2439         d.addCallback(_make_mr, 123)
2440         d.addCallback(lambda mr:
2441             mr.get_verinfo())
2442         def _check_verinfo(verinfo):
2443             self.failUnless(verinfo)
2444             self.failUnlessEqual(len(verinfo), 9)
2445             (seqnum,
2446              root_hash,
2447              salt_hash,
2448              segsize,
2449              datalen,
2450              k,
2451              n,
2452              prefix,
2453              offsets) = verinfo
2454             self.failUnlessEqual(seqnum, 0)
2455             self.failUnlessEqual(root_hash, self.root_hash)
2456             self.failUnlessEqual(segsize, 6)
2457             self.failUnlessEqual(datalen, 36)
2458             self.failUnlessEqual(k, 3)
2459             self.failUnlessEqual(n, 10)
2460             expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
2461                                           1,
2462                                           seqnum,
2463                                           root_hash,
2464                                           k,
2465                                           n,
2466                                           segsize,
2467                                           datalen)
2468             self.failUnlessEqual(expected_prefix, prefix)
2469             self.failUnlessEqual(self.rref.read_count, 0)
2470         d.addCallback(_check_verinfo)
2471         # This is not enough data to read a block and a share, so the
2472         # wrapper should attempt to read this from the remote server.
2473         d.addCallback(_make_mr, 123)
2474         d.addCallback(lambda mr:
2475             mr.get_block_and_salt(0))
2476         def _check_block_and_salt((block, salt)):
2477             self.failUnlessEqual(block, self.block)
2478             self.failUnlessEqual(salt, self.salt)
2479             self.failUnlessEqual(self.rref.read_count, 1)
2480         # This should be enough data to read one block.
2481         d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140)
2482         d.addCallback(lambda mr:
2483             mr.get_block_and_salt(0))
2484         d.addCallback(_check_block_and_salt)
2485         return d
2486
2487
2488     def test_read_with_prefetched_sdmf_data(self):
2489         sdmf_data = self.build_test_sdmf_share()
2490         self.write_sdmf_share_to_server("si1")
2491         def _make_mr(ignored, length):
2492             mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
2493             return mr
2494
2495         d = defer.succeed(None)
2496         # This should be enough to get us the encoding parameters,
2497         # offset table, and everything else we need to build a verinfo
2498         # string.
2499         d.addCallback(_make_mr, 123)
2500         d.addCallback(lambda mr:
2501             mr.get_verinfo())
2502         def _check_verinfo(verinfo):
2503             self.failUnless(verinfo)
2504             self.failUnlessEqual(len(verinfo), 9)
2505             (seqnum,
2506              root_hash,
2507              salt,
2508              segsize,
2509              datalen,
2510              k,
2511              n,
2512              prefix,
2513              offsets) = verinfo
2514             self.failUnlessEqual(seqnum, 0)
2515             self.failUnlessEqual(root_hash, self.root_hash)
2516             self.failUnlessEqual(salt, self.salt)
2517             self.failUnlessEqual(segsize, 36)
2518             self.failUnlessEqual(datalen, 36)
2519             self.failUnlessEqual(k, 3)
2520             self.failUnlessEqual(n, 10)
2521             expected_prefix = struct.pack(SIGNED_PREFIX,
2522                                           0,
2523                                           seqnum,
2524                                           root_hash,
2525                                           salt,
2526                                           k,
2527                                           n,
2528                                           segsize,
2529                                           datalen)
2530             self.failUnlessEqual(expected_prefix, prefix)
2531             self.failUnlessEqual(self.rref.read_count, 0)
2532         d.addCallback(_check_verinfo)
2533         # This shouldn't be enough to read any share data.
2534         d.addCallback(_make_mr, 123)
2535         d.addCallback(lambda mr:
2536             mr.get_block_and_salt(0))
2537         def _check_block_and_salt((block, salt)):
2538             self.failUnlessEqual(block, self.block * 6)
2539             self.failUnlessEqual(salt, self.salt)
2540             # TODO: Fix the read routine so that it reads only the data
2541             #       that it has cached if it can't read all of it.
2542             self.failUnlessEqual(self.rref.read_count, 2)
2543
2544         # This should be enough to read share data.
2545         d.addCallback(_make_mr, self.offsets['share_data'])
2546         d.addCallback(lambda mr:
2547             mr.get_block_and_salt(0))
2548         d.addCallback(_check_block_and_salt)
2549         return d
2550
2551
2552     def test_read_with_empty_mdmf_file(self):
2553         # Some tests upload a file with no contents to test things
2554         # unrelated to the actual handling of the content of the file.
2555         # The reader should behave intelligently in these cases.
2556         self.write_test_share_to_server("si1", empty=True)
2557         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2558         # We should be able to get the encoding parameters, and they
2559         # should be correct.
2560         d = defer.succeed(None)
2561         d.addCallback(lambda ignored:
2562             mr.get_encoding_parameters())
2563         def _check_encoding_parameters(params):
2564             self.failUnlessEqual(len(params), 4)
2565             k, n, segsize, datalen = params
2566             self.failUnlessEqual(k, 3)
2567             self.failUnlessEqual(n, 10)
2568             self.failUnlessEqual(segsize, 0)
2569             self.failUnlessEqual(datalen, 0)
2570         d.addCallback(_check_encoding_parameters)
2571
2572         # We should not be able to fetch a block, since there are no
2573         # blocks to fetch
2574         d.addCallback(lambda ignored:
2575             self.shouldFail(LayoutInvalid, "get block on empty file",
2576                             None,
2577                             mr.get_block_and_salt, 0))
2578         return d
2579
2580
2581     def test_read_with_empty_sdmf_file(self):
2582         self.write_sdmf_share_to_server("si1", empty=True)
2583         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2584         # We should be able to get the encoding parameters, and they
2585         # should be correct
2586         d = defer.succeed(None)
2587         d.addCallback(lambda ignored:
2588             mr.get_encoding_parameters())
2589         def _check_encoding_parameters(params):
2590             self.failUnlessEqual(len(params), 4)
2591             k, n, segsize, datalen = params
2592             self.failUnlessEqual(k, 3)
2593             self.failUnlessEqual(n, 10)
2594             self.failUnlessEqual(segsize, 0)
2595             self.failUnlessEqual(datalen, 0)
2596         d.addCallback(_check_encoding_parameters)
2597
2598         # It does not make sense to get a block in this format, so we
2599         # should not be able to.
2600         d.addCallback(lambda ignored:
2601             self.shouldFail(LayoutInvalid, "get block on an empty file",
2602                             None,
2603                             mr.get_block_and_salt, 0))
2604         return d
2605
2606
2607     def test_verinfo_with_sdmf_file(self):
2608         self.write_sdmf_share_to_server("si1")
2609         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2610         # We should be able to get the version information.
2611         d = defer.succeed(None)
2612         d.addCallback(lambda ignored:
2613             mr.get_verinfo())
2614         def _check_verinfo(verinfo):
2615             self.failUnless(verinfo)
2616             self.failUnlessEqual(len(verinfo), 9)
2617             (seqnum,
2618              root_hash,
2619              salt,
2620              segsize,
2621              datalen,
2622              k,
2623              n,
2624              prefix,
2625              offsets) = verinfo
2626             self.failUnlessEqual(seqnum, 0)
2627             self.failUnlessEqual(root_hash, self.root_hash)
2628             self.failUnlessEqual(salt, self.salt)
2629             self.failUnlessEqual(segsize, 36)
2630             self.failUnlessEqual(datalen, 36)
2631             self.failUnlessEqual(k, 3)
2632             self.failUnlessEqual(n, 10)
2633             expected_prefix = struct.pack(">BQ32s16s BBQQ",
2634                                           0,
2635                                           seqnum,
2636                                           root_hash,
2637                                           salt,
2638                                           k,
2639                                           n,
2640                                           segsize,
2641                                           datalen)
2642             self.failUnlessEqual(prefix, expected_prefix)
2643             self.failUnlessEqual(offsets, self.offsets)
2644         d.addCallback(_check_verinfo)
2645         return d
2646
2647
2648     def test_verinfo_with_mdmf_file(self):
2649         self.write_test_share_to_server("si1")
2650         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2651         d = defer.succeed(None)
2652         d.addCallback(lambda ignored:
2653             mr.get_verinfo())
2654         def _check_verinfo(verinfo):
2655             self.failUnless(verinfo)
2656             self.failUnlessEqual(len(verinfo), 9)
2657             (seqnum,
2658              root_hash,
2659              IV,
2660              segsize,
2661              datalen,
2662              k,
2663              n,
2664              prefix,
2665              offsets) = verinfo
2666             self.failUnlessEqual(seqnum, 0)
2667             self.failUnlessEqual(root_hash, self.root_hash)
2668             self.failIf(IV)
2669             self.failUnlessEqual(segsize, 6)
2670             self.failUnlessEqual(datalen, 36)
2671             self.failUnlessEqual(k, 3)
2672             self.failUnlessEqual(n, 10)
2673             expected_prefix = struct.pack(">BQ32s BBQQ",
2674                                           1,
2675                                           seqnum,
2676                                           root_hash,
2677                                           k,
2678                                           n,
2679                                           segsize,
2680                                           datalen)
2681             self.failUnlessEqual(prefix, expected_prefix)
2682             self.failUnlessEqual(offsets, self.offsets)
2683         d.addCallback(_check_verinfo)
2684         return d
2685
2686
2687     def test_sdmf_writer(self):
2688         # Go through the motions of writing an SDMF share to the storage
2689         # server. Then read the storage server to see that the share got
2690         # written in the way that we think it should have.
2691
2692         # We do this first so that the necessary instance variables get
2693         # set the way we want them for the tests below.
2694         data = self.build_test_sdmf_share()
2695         sdmfr = SDMFSlotWriteProxy(0,
2696                                    self.rref,
2697                                    "si1",
2698                                    self.secrets,
2699                                    0, 3, 10, 36, 36)
2700         # Put the block and salt.
2701         sdmfr.put_block(self.blockdata, 0, self.salt)
2702
2703         # Put the encprivkey
2704         sdmfr.put_encprivkey(self.encprivkey)
2705
2706         # Put the block and share hash chains
2707         sdmfr.put_blockhashes(self.block_hash_tree)
2708         sdmfr.put_sharehashes(self.share_hash_chain)
2709         sdmfr.put_root_hash(self.root_hash)
2710
2711         # Put the signature
2712         sdmfr.put_signature(self.signature)
2713
2714         # Put the verification key
2715         sdmfr.put_verification_key(self.verification_key)
2716
2717         # Now check to make sure that nothing has been written yet.
2718         self.failUnlessEqual(self.rref.write_count, 0)
2719
2720         # Now finish publishing
2721         d = sdmfr.finish_publishing()
2722         def _then(ignored):
2723             self.failUnlessEqual(self.rref.write_count, 1)
2724             read = self.ss.remote_slot_readv
2725             self.failUnlessEqual(read("si1", [0], [(0, len(data))]),
2726                                  {0: [data]})
2727         d.addCallback(_then)
2728         return d
2729
2730
2731     def test_sdmf_writer_preexisting_share(self):
2732         data = self.build_test_sdmf_share()
2733         self.write_sdmf_share_to_server("si1")
2734
2735         # Now there is a share on the storage server. To successfully
2736         # write, we need to set the checkstring correctly. When we
2737         # don't, no write should occur.
2738         sdmfw = SDMFSlotWriteProxy(0,
2739                                    self.rref,
2740                                    "si1",
2741                                    self.secrets,
2742                                    1, 3, 10, 36, 36)
2743         sdmfw.put_block(self.blockdata, 0, self.salt)
2744
2745         # Put the encprivkey
2746         sdmfw.put_encprivkey(self.encprivkey)
2747
2748         # Put the block and share hash chains
2749         sdmfw.put_blockhashes(self.block_hash_tree)
2750         sdmfw.put_sharehashes(self.share_hash_chain)
2751
2752         # Put the root hash
2753         sdmfw.put_root_hash(self.root_hash)
2754
2755         # Put the signature
2756         sdmfw.put_signature(self.signature)
2757
2758         # Put the verification key
2759         sdmfw.put_verification_key(self.verification_key)
2760
2761         # We shouldn't have a checkstring yet
2762         self.failUnlessEqual(sdmfw.get_checkstring(), "")
2763
2764         d = sdmfw.finish_publishing()
2765         def _then(results):
2766             self.failIf(results[0])
2767             # this is the correct checkstring
2768             self._expected_checkstring = results[1][0][0]
2769             return self._expected_checkstring
2770
2771         d.addCallback(_then)
2772         d.addCallback(sdmfw.set_checkstring)
2773         d.addCallback(lambda ignored:
2774             sdmfw.get_checkstring())
2775         d.addCallback(lambda checkstring:
2776             self.failUnlessEqual(checkstring, self._expected_checkstring))
2777         d.addCallback(lambda ignored:
2778             sdmfw.finish_publishing())
2779         def _then_again(results):
2780             self.failUnless(results[0])
2781             read = self.ss.remote_slot_readv
2782             self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2783                                  {0: [struct.pack(">Q", 1)]})
2784             self.failUnlessEqual(read("si1", [0], [(9, len(data) - 9)]),
2785                                  {0: [data[9:]]})
2786         d.addCallback(_then_again)
2787         return d
2788
2789
2790 class Stats(unittest.TestCase):
2791
2792     def setUp(self):
2793         self.sparent = LoggingServiceParent()
2794         self._lease_secret = itertools.count()
2795     def tearDown(self):
2796         return self.sparent.stopService()
2797
2798     def workdir(self, name):
2799         basedir = os.path.join("storage", "Server", name)
2800         return basedir
2801
2802     def create(self, name):
2803         workdir = self.workdir(name)
2804         ss = StorageServer(workdir, "\x00" * 20)
2805         ss.setServiceParent(self.sparent)
2806         return ss
2807
2808     def test_latencies(self):
2809         ss = self.create("test_latencies")
2810         for i in range(10000):
2811             ss.add_latency("allocate", 1.0 * i)
2812         for i in range(1000):
2813             ss.add_latency("renew", 1.0 * i)
2814         for i in range(20):
2815             ss.add_latency("write", 1.0 * i)
2816         for i in range(10):
2817             ss.add_latency("cancel", 2.0 * i)
2818         ss.add_latency("get", 5.0)
2819
2820         output = ss.get_latencies()
2821
2822         self.failUnlessEqual(sorted(output.keys()),
2823                              sorted(["allocate", "renew", "cancel", "write", "get"]))
2824         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
2825         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
2826         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
2827         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
2828         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
2829         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
2830         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
2831         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
2832         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
2833
2834         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
2835         self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
2836         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1, output)
2837         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
2838         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
2839         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
2840         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
2841         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
2842         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
2843
2844         self.failUnlessEqual(len(ss.latencies["write"]), 20)
2845         self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
2846         self.failUnless(output["write"]["01_0_percentile"] is None, output)
2847         self.failUnless(abs(output["write"]["10_0_percentile"] -  2) < 1, output)
2848         self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
2849         self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
2850         self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
2851         self.failUnless(output["write"]["99_0_percentile"] is None, output)
2852         self.failUnless(output["write"]["99_9_percentile"] is None, output)
2853
2854         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
2855         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
2856         self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
2857         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1, output)
2858         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
2859         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
2860         self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
2861         self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
2862         self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
2863
2864         self.failUnlessEqual(len(ss.latencies["get"]), 1)
2865         self.failUnless(output["get"]["mean"] is None, output)
2866         self.failUnless(output["get"]["01_0_percentile"] is None, output)
2867         self.failUnless(output["get"]["10_0_percentile"] is None, output)
2868         self.failUnless(output["get"]["50_0_percentile"] is None, output)
2869         self.failUnless(output["get"]["90_0_percentile"] is None, output)
2870         self.failUnless(output["get"]["95_0_percentile"] is None, output)
2871         self.failUnless(output["get"]["99_0_percentile"] is None, output)
2872         self.failUnless(output["get"]["99_9_percentile"] is None, output)
2873
2874 def remove_tags(s):
2875     s = re.sub(r'<[^>]*>', ' ', s)
2876     s = re.sub(r'\s+', ' ', s)
2877     return s
2878
2879 class MyBucketCountingCrawler(BucketCountingCrawler):
2880     def finished_prefix(self, cycle, prefix):
2881         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
2882         if self.hook_ds:
2883             d = self.hook_ds.pop(0)
2884             d.callback(None)
2885
2886 class MyStorageServer(StorageServer):
2887     def add_bucket_counter(self):
2888         statefile = os.path.join(self.storedir, "bucket_counter.state")
2889         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
2890         self.bucket_counter.setServiceParent(self)
2891
2892 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
2893
2894     def setUp(self):
2895         self.s = service.MultiService()
2896         self.s.startService()
2897     def tearDown(self):
2898         return self.s.stopService()
2899
2900     def test_bucket_counter(self):
2901         basedir = "storage/BucketCounter/bucket_counter"
2902         fileutil.make_dirs(basedir)
2903         ss = StorageServer(basedir, "\x00" * 20)
2904         # to make sure we capture the bucket-counting-crawler in the middle
2905         # of a cycle, we reach in and reduce its maximum slice time to 0. We
2906         # also make it start sooner than usual.
2907         ss.bucket_counter.slow_start = 0
2908         orig_cpu_slice = ss.bucket_counter.cpu_slice
2909         ss.bucket_counter.cpu_slice = 0
2910         ss.setServiceParent(self.s)
2911
2912         w = StorageStatus(ss)
2913
2914         # this sample is before the crawler has started doing anything
2915         html = w.renderSynchronously()
2916         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2917         s = remove_tags(html)
2918         self.failUnlessIn("Accepting new shares: Yes", s)
2919         self.failUnlessIn("Reserved space: - 0 B (0)", s)
2920         self.failUnlessIn("Total buckets: Not computed yet", s)
2921         self.failUnlessIn("Next crawl in", s)
2922
2923         # give the bucket-counting-crawler one tick to get started. The
2924         # cpu_slice=0 will force it to yield right after it processes the
2925         # first prefix
2926
2927         d = fireEventually()
2928         def _check(ignored):
2929             # are we really right after the first prefix?
2930             state = ss.bucket_counter.get_state()
2931             if state["last-complete-prefix"] is None:
2932                 d2 = fireEventually()
2933                 d2.addCallback(_check)
2934                 return d2
2935             self.failUnlessEqual(state["last-complete-prefix"],
2936                                  ss.bucket_counter.prefixes[0])
2937             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2938             html = w.renderSynchronously()
2939             s = remove_tags(html)
2940             self.failUnlessIn(" Current crawl ", s)
2941             self.failUnlessIn(" (next work in ", s)
2942         d.addCallback(_check)
2943
2944         # now give it enough time to complete a full cycle
2945         def _watch():
2946             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2947         d.addCallback(lambda ignored: self.poll(_watch))
2948         def _check2(ignored):
2949             ss.bucket_counter.cpu_slice = orig_cpu_slice
2950             html = w.renderSynchronously()
2951             s = remove_tags(html)
2952             self.failUnlessIn("Total buckets: 0 (the number of", s)
2953             self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
2954         d.addCallback(_check2)
2955         return d
2956
2957     def test_bucket_counter_cleanup(self):
2958         basedir = "storage/BucketCounter/bucket_counter_cleanup"
2959         fileutil.make_dirs(basedir)
2960         ss = StorageServer(basedir, "\x00" * 20)
2961         # to make sure we capture the bucket-counting-crawler in the middle
2962         # of a cycle, we reach in and reduce its maximum slice time to 0.
2963         ss.bucket_counter.slow_start = 0
2964         orig_cpu_slice = ss.bucket_counter.cpu_slice
2965         ss.bucket_counter.cpu_slice = 0
2966         ss.setServiceParent(self.s)
2967
2968         d = fireEventually()
2969
2970         def _after_first_prefix(ignored):
2971             state = ss.bucket_counter.state
2972             if state["last-complete-prefix"] is None:
2973                 d2 = fireEventually()
2974                 d2.addCallback(_after_first_prefix)
2975                 return d2
2976             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2977             # now sneak in and mess with its state, to make sure it cleans up
2978             # properly at the end of the cycle
2979             self.failUnlessEqual(state["last-complete-prefix"],
2980                                  ss.bucket_counter.prefixes[0])
2981             state["bucket-counts"][-12] = {}
2982             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
2983             ss.bucket_counter.save_state()
2984         d.addCallback(_after_first_prefix)
2985
2986         # now give it enough time to complete a cycle
2987         def _watch():
2988             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2989         d.addCallback(lambda ignored: self.poll(_watch))
2990         def _check2(ignored):
2991             ss.bucket_counter.cpu_slice = orig_cpu_slice
2992             s = ss.bucket_counter.get_state()
2993             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
2994             self.failIf("bogusprefix!" in s["storage-index-samples"],
2995                         s["storage-index-samples"].keys())
2996         d.addCallback(_check2)
2997         return d
2998
2999     def test_bucket_counter_eta(self):
3000         basedir = "storage/BucketCounter/bucket_counter_eta"
3001         fileutil.make_dirs(basedir)
3002         ss = MyStorageServer(basedir, "\x00" * 20)
3003         ss.bucket_counter.slow_start = 0
3004         # these will be fired inside finished_prefix()
3005         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
3006         w = StorageStatus(ss)
3007
3008         d = defer.Deferred()
3009
3010         def _check_1(ignored):
3011             # no ETA is available yet
3012             html = w.renderSynchronously()
3013             s = remove_tags(html)
3014             self.failUnlessIn("complete (next work", s)
3015
3016         def _check_2(ignored):
3017             # one prefix has finished, so an ETA based upon that elapsed time
3018             # should be available.
3019             html = w.renderSynchronously()
3020             s = remove_tags(html)
3021             self.failUnlessIn("complete (ETA ", s)
3022
3023         def _check_3(ignored):
3024             # two prefixes have finished
3025             html = w.renderSynchronously()
3026             s = remove_tags(html)
3027             self.failUnlessIn("complete (ETA ", s)
3028             d.callback("done")
3029
3030         hooks[0].addCallback(_check_1).addErrback(d.errback)
3031         hooks[1].addCallback(_check_2).addErrback(d.errback)
3032         hooks[2].addCallback(_check_3).addErrback(d.errback)
3033
3034         ss.setServiceParent(self.s)
3035         return d
3036
3037 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
3038     stop_after_first_bucket = False
3039     def process_bucket(self, *args, **kwargs):
3040         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
3041         if self.stop_after_first_bucket:
3042             self.stop_after_first_bucket = False
3043             self.cpu_slice = -1.0
3044     def yielding(self, sleep_time):
3045         if not self.stop_after_first_bucket:
3046             self.cpu_slice = 500
3047
3048 class BrokenStatResults:
3049     pass
3050 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
3051     def stat(self, fn):
3052         s = os.stat(fn)
3053         bsr = BrokenStatResults()
3054         for attrname in dir(s):
3055             if attrname.startswith("_"):
3056                 continue
3057             if attrname == "st_blocks":
3058                 continue
3059             setattr(bsr, attrname, getattr(s, attrname))
3060         return bsr
3061
3062 class InstrumentedStorageServer(StorageServer):
3063     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
3064 class No_ST_BLOCKS_StorageServer(StorageServer):
3065     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
3066
3067 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3068
3069     def setUp(self):
3070         self.s = service.MultiService()
3071         self.s.startService()
3072     def tearDown(self):
3073         return self.s.stopService()
3074
3075     def make_shares(self, ss):
3076         def make(si):
3077             return (si, hashutil.tagged_hash("renew", si),
3078                     hashutil.tagged_hash("cancel", si))
3079         def make_mutable(si):
3080             return (si, hashutil.tagged_hash("renew", si),
3081                     hashutil.tagged_hash("cancel", si),
3082                     hashutil.tagged_hash("write-enabler", si))
3083         def make_extra_lease(si, num):
3084             return (hashutil.tagged_hash("renew-%d" % num, si),
3085                     hashutil.tagged_hash("cancel-%d" % num, si))
3086
3087         immutable_si_0, rs0, cs0 = make("\x00" * 16)
3088         immutable_si_1, rs1, cs1 = make("\x01" * 16)
3089         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
3090         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
3091         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
3092         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
3093         sharenums = [0]
3094         canary = FakeCanary()
3095         # note: 'tahoe debug dump-share' will not handle this file, since the
3096         # inner contents are not a valid CHK share
3097         data = "\xff" * 1000
3098
3099         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
3100                                          1000, canary)
3101         w[0].remote_write(0, data)
3102         w[0].remote_close()
3103
3104         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
3105                                          1000, canary)
3106         w[0].remote_write(0, data)
3107         w[0].remote_close()
3108         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
3109
3110         writev = ss.remote_slot_testv_and_readv_and_writev
3111         writev(mutable_si_2, (we2, rs2, cs2),
3112                {0: ([], [(0,data)], len(data))}, [])
3113         writev(mutable_si_3, (we3, rs3, cs3),
3114                {0: ([], [(0,data)], len(data))}, [])
3115         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
3116
3117         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
3118         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
3119         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
3120
3121     def test_basic(self):
3122         basedir = "storage/LeaseCrawler/basic"
3123         fileutil.make_dirs(basedir)
3124         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3125         # make it start sooner than usual.
3126         lc = ss.lease_checker
3127         lc.slow_start = 0
3128         lc.cpu_slice = 500
3129         lc.stop_after_first_bucket = True
3130         webstatus = StorageStatus(ss)
3131
3132         # create a few shares, with some leases on them
3133         self.make_shares(ss)
3134         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3135
3136         # add a non-sharefile to exercise another code path
3137         fn = os.path.join(ss.sharedir,
3138                           storage_index_to_dir(immutable_si_0),
3139                           "not-a-share")
3140         f = open(fn, "wb")
3141         f.write("I am not a share.\n")
3142         f.close()
3143
3144         # this is before the crawl has started, so we're not in a cycle yet
3145         initial_state = lc.get_state()
3146         self.failIf(lc.get_progress()["cycle-in-progress"])
3147         self.failIfIn("cycle-to-date", initial_state)
3148         self.failIfIn("estimated-remaining-cycle", initial_state)
3149         self.failIfIn("estimated-current-cycle", initial_state)
3150         self.failUnlessIn("history", initial_state)
3151         self.failUnlessEqual(initial_state["history"], {})
3152
3153         ss.setServiceParent(self.s)
3154
3155         DAY = 24*60*60
3156
3157         d = fireEventually()
3158
3159         # now examine the state right after the first bucket has been
3160         # processed.
3161         def _after_first_bucket(ignored):
3162             initial_state = lc.get_state()
3163             if "cycle-to-date" not in initial_state:
3164                 d2 = fireEventually()
3165                 d2.addCallback(_after_first_bucket)
3166                 return d2
3167             self.failUnlessIn("cycle-to-date", initial_state)
3168             self.failUnlessIn("estimated-remaining-cycle", initial_state)
3169             self.failUnlessIn("estimated-current-cycle", initial_state)
3170             self.failUnlessIn("history", initial_state)
3171             self.failUnlessEqual(initial_state["history"], {})
3172
3173             so_far = initial_state["cycle-to-date"]
3174             self.failUnlessEqual(so_far["expiration-enabled"], False)
3175             self.failUnlessIn("configured-expiration-mode", so_far)
3176             self.failUnlessIn("lease-age-histogram", so_far)
3177             lah = so_far["lease-age-histogram"]
3178             self.failUnlessEqual(type(lah), list)
3179             self.failUnlessEqual(len(lah), 1)
3180             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
3181             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
3182             self.failUnlessEqual(so_far["corrupt-shares"], [])
3183             sr1 = so_far["space-recovered"]
3184             self.failUnlessEqual(sr1["examined-buckets"], 1)
3185             self.failUnlessEqual(sr1["examined-shares"], 1)
3186             self.failUnlessEqual(sr1["actual-shares"], 0)
3187             self.failUnlessEqual(sr1["configured-diskbytes"], 0)
3188             self.failUnlessEqual(sr1["original-sharebytes"], 0)
3189             left = initial_state["estimated-remaining-cycle"]
3190             sr2 = left["space-recovered"]
3191             self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
3192             self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
3193             self.failIfEqual(sr2["actual-shares"], None)
3194             self.failIfEqual(sr2["configured-diskbytes"], None)
3195             self.failIfEqual(sr2["original-sharebytes"], None)
3196         d.addCallback(_after_first_bucket)
3197         d.addCallback(lambda ign: self.render1(webstatus))
3198         def _check_html_in_cycle(html):
3199             s = remove_tags(html)
3200             self.failUnlessIn("So far, this cycle has examined "
3201                               "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
3202             self.failUnlessIn("and has recovered: "
3203                               "0 shares, 0 buckets (0 mutable / 0 immutable), "
3204                               "0 B (0 B / 0 B)", s)
3205             self.failUnlessIn("If expiration were enabled, "
3206                               "we would have recovered: "
3207                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3208                               " 0 B (0 B / 0 B) by now", s)
3209             self.failUnlessIn("and the remainder of this cycle "
3210                               "would probably recover: "
3211                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3212                               " 0 B (0 B / 0 B)", s)
3213             self.failUnlessIn("and the whole cycle would probably recover: "
3214                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3215                               " 0 B (0 B / 0 B)", s)
3216             self.failUnlessIn("if we were strictly using each lease's default "
3217                               "31-day lease lifetime", s)
3218             self.failUnlessIn("this cycle would be expected to recover: ", s)
3219         d.addCallback(_check_html_in_cycle)
3220
3221         # wait for the crawler to finish the first cycle. Nothing should have
3222         # been removed.
3223         def _wait():
3224             return bool(lc.get_state()["last-cycle-finished"] is not None)
3225         d.addCallback(lambda ign: self.poll(_wait))
3226
3227         def _after_first_cycle(ignored):
3228             s = lc.get_state()
3229             self.failIf("cycle-to-date" in s)
3230             self.failIf("estimated-remaining-cycle" in s)
3231             self.failIf("estimated-current-cycle" in s)
3232             last = s["history"][0]
3233             self.failUnlessIn("cycle-start-finish-times", last)
3234             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
3235             self.failUnlessEqual(last["expiration-enabled"], False)
3236             self.failUnlessIn("configured-expiration-mode", last)
3237
3238             self.failUnlessIn("lease-age-histogram", last)
3239             lah = last["lease-age-histogram"]
3240             self.failUnlessEqual(type(lah), list)
3241             self.failUnlessEqual(len(lah), 1)
3242             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
3243
3244             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3245             self.failUnlessEqual(last["corrupt-shares"], [])
3246
3247             rec = last["space-recovered"]
3248             self.failUnlessEqual(rec["examined-buckets"], 4)
3249             self.failUnlessEqual(rec["examined-shares"], 4)
3250             self.failUnlessEqual(rec["actual-buckets"], 0)
3251             self.failUnlessEqual(rec["original-buckets"], 0)
3252             self.failUnlessEqual(rec["configured-buckets"], 0)
3253             self.failUnlessEqual(rec["actual-shares"], 0)
3254             self.failUnlessEqual(rec["original-shares"], 0)
3255             self.failUnlessEqual(rec["configured-shares"], 0)
3256             self.failUnlessEqual(rec["actual-diskbytes"], 0)
3257             self.failUnlessEqual(rec["original-diskbytes"], 0)
3258             self.failUnlessEqual(rec["configured-diskbytes"], 0)
3259             self.failUnlessEqual(rec["actual-sharebytes"], 0)
3260             self.failUnlessEqual(rec["original-sharebytes"], 0)
3261             self.failUnlessEqual(rec["configured-sharebytes"], 0)
3262
3263             def _get_sharefile(si):
3264                 return list(ss._iter_share_files(si))[0]
3265             def count_leases(si):
3266                 return len(list(_get_sharefile(si).get_leases()))
3267             self.failUnlessEqual(count_leases(immutable_si_0), 1)
3268             self.failUnlessEqual(count_leases(immutable_si_1), 2)
3269             self.failUnlessEqual(count_leases(mutable_si_2), 1)
3270             self.failUnlessEqual(count_leases(mutable_si_3), 2)
3271         d.addCallback(_after_first_cycle)
3272         d.addCallback(lambda ign: self.render1(webstatus))
3273         def _check_html(html):
3274             s = remove_tags(html)
3275             self.failUnlessIn("recovered: 0 shares, 0 buckets "
3276                               "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
3277             self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
3278                               "(2 mutable / 2 immutable),", s)
3279             self.failUnlessIn("but expiration was not enabled", s)
3280         d.addCallback(_check_html)
3281         d.addCallback(lambda ign: self.render_json(webstatus))
3282         def _check_json(json):
3283             data = simplejson.loads(json)
3284             self.failUnlessIn("lease-checker", data)
3285             self.failUnlessIn("lease-checker-progress", data)
3286         d.addCallback(_check_json)
3287         return d
3288
3289     def backdate_lease(self, sf, renew_secret, new_expire_time):
3290         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
3291         # "renew" a lease with a new_expire_time that is older than what the
3292         # current lease has), so we have to reach inside it.
3293         for i,lease in enumerate(sf.get_leases()):
3294             if lease.renew_secret == renew_secret:
3295                 lease.expiration_time = new_expire_time
3296                 f = open(sf.home, 'rb+')
3297                 sf._write_lease_record(f, i, lease)
3298                 f.close()
3299                 return
3300         raise IndexError("unable to renew non-existent lease")
3301
3302     def test_expire_age(self):
3303         basedir = "storage/LeaseCrawler/expire_age"
3304         fileutil.make_dirs(basedir)
3305         # setting expiration_time to 2000 means that any lease which is more
3306         # than 2000s old will be expired.
3307         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3308                                        expiration_enabled=True,
3309                                        expiration_mode="age",
3310                                        expiration_override_lease_duration=2000)
3311         # make it start sooner than usual.
3312         lc = ss.lease_checker
3313         lc.slow_start = 0
3314         lc.stop_after_first_bucket = True
3315         webstatus = StorageStatus(ss)
3316
3317         # create a few shares, with some leases on them
3318         self.make_shares(ss)
3319         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3320
3321         def count_shares(si):
3322             return len(list(ss._iter_share_files(si)))
3323         def _get_sharefile(si):
3324             return list(ss._iter_share_files(si))[0]
3325         def count_leases(si):
3326             return len(list(_get_sharefile(si).get_leases()))
3327
3328         self.failUnlessEqual(count_shares(immutable_si_0), 1)
3329         self.failUnlessEqual(count_leases(immutable_si_0), 1)
3330         self.failUnlessEqual(count_shares(immutable_si_1), 1)
3331         self.failUnlessEqual(count_leases(immutable_si_1), 2)
3332         self.failUnlessEqual(count_shares(mutable_si_2), 1)
3333         self.failUnlessEqual(count_leases(mutable_si_2), 1)
3334         self.failUnlessEqual(count_shares(mutable_si_3), 1)
3335         self.failUnlessEqual(count_leases(mutable_si_3), 2)
3336
3337         # artificially crank back the expiration time on the first lease of
3338         # each share, to make it look like it expired already (age=1000s).
3339         # Some shares have an extra lease which is set to expire at the
3340         # default time in 31 days from now (age=31days). We then run the
3341         # crawler, which will expire the first lease, making some shares get
3342         # deleted and others stay alive (with one remaining lease)
3343         now = time.time()
3344
3345         sf0 = _get_sharefile(immutable_si_0)
3346         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
3347         sf0_size = os.stat(sf0.home).st_size
3348
3349         # immutable_si_1 gets an extra lease
3350         sf1 = _get_sharefile(immutable_si_1)
3351         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
3352
3353         sf2 = _get_sharefile(mutable_si_2)
3354         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
3355         sf2_size = os.stat(sf2.home).st_size
3356
3357         # mutable_si_3 gets an extra lease
3358         sf3 = _get_sharefile(mutable_si_3)
3359         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
3360
3361         ss.setServiceParent(self.s)
3362
3363         d = fireEventually()
3364         # examine the state right after the first bucket has been processed
3365         def _after_first_bucket(ignored):
3366             p = lc.get_progress()
3367             if not p["cycle-in-progress"]:
3368                 d2 = fireEventually()
3369                 d2.addCallback(_after_first_bucket)
3370                 return d2
3371         d.addCallback(_after_first_bucket)
3372         d.addCallback(lambda ign: self.render1(webstatus))
3373         def _check_html_in_cycle(html):
3374             s = remove_tags(html)
3375             # the first bucket encountered gets deleted, and its prefix
3376             # happens to be about 1/5th of the way through the ring, so the
3377             # predictor thinks we'll have 5 shares and that we'll delete them
3378             # all. This part of the test depends upon the SIs landing right
3379             # where they do now.
3380             self.failUnlessIn("The remainder of this cycle is expected to "
3381                               "recover: 4 shares, 4 buckets", s)
3382             self.failUnlessIn("The whole cycle is expected to examine "
3383                               "5 shares in 5 buckets and to recover: "
3384                               "5 shares, 5 buckets", s)
3385         d.addCallback(_check_html_in_cycle)
3386
3387         # wait for the crawler to finish the first cycle. Two shares should
3388         # have been removed
3389         def _wait():
3390             return bool(lc.get_state()["last-cycle-finished"] is not None)
3391         d.addCallback(lambda ign: self.poll(_wait))
3392
3393         def _after_first_cycle(ignored):
3394             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3395             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3396             self.failUnlessEqual(count_leases(immutable_si_1), 1)
3397             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3398             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3399             self.failUnlessEqual(count_leases(mutable_si_3), 1)
3400
3401             s = lc.get_state()
3402             last = s["history"][0]
3403
3404             self.failUnlessEqual(last["expiration-enabled"], True)
3405             self.failUnlessEqual(last["configured-expiration-mode"],
3406                                  ("age", 2000, None, ("mutable", "immutable")))
3407             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3408
3409             rec = last["space-recovered"]
3410             self.failUnlessEqual(rec["examined-buckets"], 4)
3411             self.failUnlessEqual(rec["examined-shares"], 4)
3412             self.failUnlessEqual(rec["actual-buckets"], 2)
3413             self.failUnlessEqual(rec["original-buckets"], 2)
3414             self.failUnlessEqual(rec["configured-buckets"], 2)
3415             self.failUnlessEqual(rec["actual-shares"], 2)
3416             self.failUnlessEqual(rec["original-shares"], 2)
3417             self.failUnlessEqual(rec["configured-shares"], 2)
3418             size = sf0_size + sf2_size
3419             self.failUnlessEqual(rec["actual-sharebytes"], size)
3420             self.failUnlessEqual(rec["original-sharebytes"], size)
3421             self.failUnlessEqual(rec["configured-sharebytes"], size)
3422             # different platforms have different notions of "blocks used by
3423             # this file", so merely assert that it's a number
3424             self.failUnless(rec["actual-diskbytes"] >= 0,
3425                             rec["actual-diskbytes"])
3426             self.failUnless(rec["original-diskbytes"] >= 0,
3427                             rec["original-diskbytes"])
3428             self.failUnless(rec["configured-diskbytes"] >= 0,
3429                             rec["configured-diskbytes"])
3430         d.addCallback(_after_first_cycle)
3431         d.addCallback(lambda ign: self.render1(webstatus))
3432         def _check_html(html):
3433             s = remove_tags(html)
3434             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
3435             self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
3436             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3437         d.addCallback(_check_html)
3438         return d
3439
3440     def test_expire_cutoff_date(self):
3441         basedir = "storage/LeaseCrawler/expire_cutoff_date"
3442         fileutil.make_dirs(basedir)
3443         # setting cutoff-date to 2000 seconds ago means that any lease which
3444         # is more than 2000s old will be expired.
3445         now = time.time()
3446         then = int(now - 2000)
3447         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3448                                        expiration_enabled=True,
3449                                        expiration_mode="cutoff-date",
3450                                        expiration_cutoff_date=then)
3451         # make it start sooner than usual.
3452         lc = ss.lease_checker
3453         lc.slow_start = 0
3454         lc.stop_after_first_bucket = True
3455         webstatus = StorageStatus(ss)
3456
3457         # create a few shares, with some leases on them
3458         self.make_shares(ss)
3459         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3460
3461         def count_shares(si):
3462             return len(list(ss._iter_share_files(si)))
3463         def _get_sharefile(si):
3464             return list(ss._iter_share_files(si))[0]
3465         def count_leases(si):
3466             return len(list(_get_sharefile(si).get_leases()))
3467
3468         self.failUnlessEqual(count_shares(immutable_si_0), 1)
3469         self.failUnlessEqual(count_leases(immutable_si_0), 1)
3470         self.failUnlessEqual(count_shares(immutable_si_1), 1)
3471         self.failUnlessEqual(count_leases(immutable_si_1), 2)
3472         self.failUnlessEqual(count_shares(mutable_si_2), 1)
3473         self.failUnlessEqual(count_leases(mutable_si_2), 1)
3474         self.failUnlessEqual(count_shares(mutable_si_3), 1)
3475         self.failUnlessEqual(count_leases(mutable_si_3), 2)
3476
3477         # artificially crank back the expiration time on the first lease of
3478         # each share, to make it look like was renewed 3000s ago. To achieve
3479         # this, we need to set the expiration time to now-3000+31days. This
3480         # will change when the lease format is improved to contain both
3481         # create/renew time and duration.
3482         new_expiration_time = now - 3000 + 31*24*60*60
3483
3484         # Some shares have an extra lease which is set to expire at the
3485         # default time in 31 days from now (age=31days). We then run the
3486         # crawler, which will expire the first lease, making some shares get
3487         # deleted and others stay alive (with one remaining lease)
3488
3489         sf0 = _get_sharefile(immutable_si_0)
3490         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3491         sf0_size = os.stat(sf0.home).st_size
3492
3493         # immutable_si_1 gets an extra lease
3494         sf1 = _get_sharefile(immutable_si_1)
3495         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3496
3497         sf2 = _get_sharefile(mutable_si_2)
3498         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3499         sf2_size = os.stat(sf2.home).st_size
3500
3501         # mutable_si_3 gets an extra lease
3502         sf3 = _get_sharefile(mutable_si_3)
3503         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3504
3505         ss.setServiceParent(self.s)
3506
3507         d = fireEventually()
3508         # examine the state right after the first bucket has been processed
3509         def _after_first_bucket(ignored):
3510             p = lc.get_progress()
3511             if not p["cycle-in-progress"]:
3512                 d2 = fireEventually()
3513                 d2.addCallback(_after_first_bucket)
3514                 return d2
3515         d.addCallback(_after_first_bucket)
3516         d.addCallback(lambda ign: self.render1(webstatus))
3517         def _check_html_in_cycle(html):
3518             s = remove_tags(html)
3519             # the first bucket encountered gets deleted, and its prefix
3520             # happens to be about 1/5th of the way through the ring, so the
3521             # predictor thinks we'll have 5 shares and that we'll delete them
3522             # all. This part of the test depends upon the SIs landing right
3523             # where they do now.
3524             self.failUnlessIn("The remainder of this cycle is expected to "
3525                               "recover: 4 shares, 4 buckets", s)
3526             self.failUnlessIn("The whole cycle is expected to examine "
3527                               "5 shares in 5 buckets and to recover: "
3528                               "5 shares, 5 buckets", s)
3529         d.addCallback(_check_html_in_cycle)
3530
3531         # wait for the crawler to finish the first cycle. Two shares should
3532         # have been removed
3533         def _wait():
3534             return bool(lc.get_state()["last-cycle-finished"] is not None)
3535         d.addCallback(lambda ign: self.poll(_wait))
3536
3537         def _after_first_cycle(ignored):
3538             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3539             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3540             self.failUnlessEqual(count_leases(immutable_si_1), 1)
3541             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3542             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3543             self.failUnlessEqual(count_leases(mutable_si_3), 1)
3544
3545             s = lc.get_state()
3546             last = s["history"][0]
3547
3548             self.failUnlessEqual(last["expiration-enabled"], True)
3549             self.failUnlessEqual(last["configured-expiration-mode"],
3550                                  ("cutoff-date", None, then,
3551                                   ("mutable", "immutable")))
3552             self.failUnlessEqual(last["leases-per-share-histogram"],
3553                                  {1: 2, 2: 2})
3554
3555             rec = last["space-recovered"]
3556             self.failUnlessEqual(rec["examined-buckets"], 4)
3557             self.failUnlessEqual(rec["examined-shares"], 4)
3558             self.failUnlessEqual(rec["actual-buckets"], 2)
3559             self.failUnlessEqual(rec["original-buckets"], 0)
3560             self.failUnlessEqual(rec["configured-buckets"], 2)
3561             self.failUnlessEqual(rec["actual-shares"], 2)
3562             self.failUnlessEqual(rec["original-shares"], 0)
3563             self.failUnlessEqual(rec["configured-shares"], 2)
3564             size = sf0_size + sf2_size
3565             self.failUnlessEqual(rec["actual-sharebytes"], size)
3566             self.failUnlessEqual(rec["original-sharebytes"], 0)
3567             self.failUnlessEqual(rec["configured-sharebytes"], size)
3568             # different platforms have different notions of "blocks used by
3569             # this file", so merely assert that it's a number
3570             self.failUnless(rec["actual-diskbytes"] >= 0,
3571                             rec["actual-diskbytes"])
3572             self.failUnless(rec["original-diskbytes"] >= 0,
3573                             rec["original-diskbytes"])
3574             self.failUnless(rec["configured-diskbytes"] >= 0,
3575                             rec["configured-diskbytes"])
3576         d.addCallback(_after_first_cycle)
3577         d.addCallback(lambda ign: self.render1(webstatus))
3578         def _check_html(html):
3579             s = remove_tags(html)
3580             self.failUnlessIn("Expiration Enabled:"
3581                               " expired leases will be removed", s)
3582             date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
3583             substr = "Leases created or last renewed before %s will be considered expired." % date
3584             self.failUnlessIn(substr, s)
3585             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3586         d.addCallback(_check_html)
3587         return d
3588
3589     def test_only_immutable(self):
3590         basedir = "storage/LeaseCrawler/only_immutable"
3591         fileutil.make_dirs(basedir)
3592         now = time.time()
3593         then = int(now - 2000)
3594         ss = StorageServer(basedir, "\x00" * 20,
3595                            expiration_enabled=True,
3596                            expiration_mode="cutoff-date",
3597                            expiration_cutoff_date=then,
3598                            expiration_sharetypes=("immutable",))
3599         lc = ss.lease_checker
3600         lc.slow_start = 0
3601         webstatus = StorageStatus(ss)
3602
3603         self.make_shares(ss)
3604         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3605         # set all leases to be expirable
3606         new_expiration_time = now - 3000 + 31*24*60*60
3607
3608         def count_shares(si):
3609             return len(list(ss._iter_share_files(si)))
3610         def _get_sharefile(si):
3611             return list(ss._iter_share_files(si))[0]
3612         def count_leases(si):
3613             return len(list(_get_sharefile(si).get_leases()))
3614
3615         sf0 = _get_sharefile(immutable_si_0)
3616         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3617         sf1 = _get_sharefile(immutable_si_1)
3618         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3619         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3620         sf2 = _get_sharefile(mutable_si_2)
3621         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3622         sf3 = _get_sharefile(mutable_si_3)
3623         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3624         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3625
3626         ss.setServiceParent(self.s)
3627         def _wait():
3628             return bool(lc.get_state()["last-cycle-finished"] is not None)
3629         d = self.poll(_wait)
3630
3631         def _after_first_cycle(ignored):
3632             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3633             self.failUnlessEqual(count_shares(immutable_si_1), 0)
3634             self.failUnlessEqual(count_shares(mutable_si_2), 1)
3635             self.failUnlessEqual(count_leases(mutable_si_2), 1)
3636             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3637             self.failUnlessEqual(count_leases(mutable_si_3), 2)
3638         d.addCallback(_after_first_cycle)
3639         d.addCallback(lambda ign: self.render1(webstatus))
3640         def _check_html(html):
3641             s = remove_tags(html)
3642             self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
3643         d.addCallback(_check_html)
3644         return d
3645
3646     def test_only_mutable(self):
3647         basedir = "storage/LeaseCrawler/only_mutable"
3648         fileutil.make_dirs(basedir)
3649         now = time.time()
3650         then = int(now - 2000)
3651         ss = StorageServer(basedir, "\x00" * 20,
3652                            expiration_enabled=True,
3653                            expiration_mode="cutoff-date",
3654                            expiration_cutoff_date=then,
3655                            expiration_sharetypes=("mutable",))
3656         lc = ss.lease_checker
3657         lc.slow_start = 0
3658         webstatus = StorageStatus(ss)
3659
3660         self.make_shares(ss)
3661         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3662         # set all leases to be expirable
3663         new_expiration_time = now - 3000 + 31*24*60*60
3664
3665         def count_shares(si):
3666             return len(list(ss._iter_share_files(si)))
3667         def _get_sharefile(si):
3668             return list(ss._iter_share_files(si))[0]
3669         def count_leases(si):
3670             return len(list(_get_sharefile(si).get_leases()))
3671
3672         sf0 = _get_sharefile(immutable_si_0)
3673         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3674         sf1 = _get_sharefile(immutable_si_1)
3675         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3676         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3677         sf2 = _get_sharefile(mutable_si_2)
3678         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3679         sf3 = _get_sharefile(mutable_si_3)
3680         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3681         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3682
3683         ss.setServiceParent(self.s)
3684         def _wait():
3685             return bool(lc.get_state()["last-cycle-finished"] is not None)
3686         d = self.poll(_wait)
3687
3688         def _after_first_cycle(ignored):
3689             self.failUnlessEqual(count_shares(immutable_si_0), 1)
3690             self.failUnlessEqual(count_leases(immutable_si_0), 1)
3691             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3692             self.failUnlessEqual(count_leases(immutable_si_1), 2)
3693             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3694             self.failUnlessEqual(count_shares(mutable_si_3), 0)
3695         d.addCallback(_after_first_cycle)
3696         d.addCallback(lambda ign: self.render1(webstatus))
3697         def _check_html(html):
3698             s = remove_tags(html)
3699             self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
3700         d.addCallback(_check_html)
3701         return d
3702
3703     def test_bad_mode(self):
3704         basedir = "storage/LeaseCrawler/bad_mode"
3705         fileutil.make_dirs(basedir)
3706         e = self.failUnlessRaises(ValueError,
3707                                   StorageServer, basedir, "\x00" * 20,
3708                                   expiration_mode="bogus")
3709         self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
3710
3711     def test_parse_duration(self):
3712         DAY = 24*60*60
3713         MONTH = 31*DAY
3714         YEAR = 365*DAY
3715         p = time_format.parse_duration
3716         self.failUnlessEqual(p("7days"), 7*DAY)
3717         self.failUnlessEqual(p("31day"), 31*DAY)
3718         self.failUnlessEqual(p("60 days"), 60*DAY)
3719         self.failUnlessEqual(p("2mo"), 2*MONTH)
3720         self.failUnlessEqual(p("3 month"), 3*MONTH)
3721         self.failUnlessEqual(p("2years"), 2*YEAR)
3722         e = self.failUnlessRaises(ValueError, p, "2kumquats")
3723         self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
3724
3725     def test_parse_date(self):
3726         p = time_format.parse_date
3727         self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
3728         self.failUnlessEqual(p("2009-03-18"), 1237334400)
3729
3730     def test_limited_history(self):
3731         basedir = "storage/LeaseCrawler/limited_history"
3732         fileutil.make_dirs(basedir)
3733         ss = StorageServer(basedir, "\x00" * 20)
3734         # make it start sooner than usual.
3735         lc = ss.lease_checker
3736         lc.slow_start = 0
3737         lc.cpu_slice = 500
3738
3739         # create a few shares, with some leases on them
3740         self.make_shares(ss)
3741
3742         ss.setServiceParent(self.s)
3743
3744         def _wait_until_15_cycles_done():
3745             last = lc.state["last-cycle-finished"]
3746             if last is not None and last >= 15:
3747                 return True
3748             if lc.timer:
3749                 lc.timer.reset(0)
3750             return False
3751         d = self.poll(_wait_until_15_cycles_done)
3752
3753         def _check(ignored):
3754             s = lc.get_state()
3755             h = s["history"]
3756             self.failUnlessEqual(len(h), 10)
3757             self.failUnlessEqual(max(h.keys()), 15)
3758             self.failUnlessEqual(min(h.keys()), 6)
3759         d.addCallback(_check)
3760         return d
3761
3762     def test_unpredictable_future(self):
3763         basedir = "storage/LeaseCrawler/unpredictable_future"
3764         fileutil.make_dirs(basedir)
3765         ss = StorageServer(basedir, "\x00" * 20)
3766         # make it start sooner than usual.
3767         lc = ss.lease_checker
3768         lc.slow_start = 0
3769         lc.cpu_slice = -1.0 # stop quickly
3770
3771         self.make_shares(ss)
3772
3773         ss.setServiceParent(self.s)
3774
3775         d = fireEventually()
3776         def _check(ignored):
3777             # this should fire after the first bucket is complete, but before
3778             # the first prefix is complete, so the progress-measurer won't
3779             # think we've gotten far enough to raise our percent-complete
3780             # above 0%, triggering the cannot-predict-the-future code in
3781             # expirer.py . This will have to change if/when the
3782             # progress-measurer gets smart enough to count buckets (we'll
3783             # have to interrupt it even earlier, before it's finished the
3784             # first bucket).
3785             s = lc.get_state()
3786             if "cycle-to-date" not in s:
3787                 d2 = fireEventually()
3788                 d2.addCallback(_check)
3789                 return d2
3790             self.failUnlessIn("cycle-to-date", s)
3791             self.failUnlessIn("estimated-remaining-cycle", s)
3792             self.failUnlessIn("estimated-current-cycle", s)
3793
3794             left = s["estimated-remaining-cycle"]["space-recovered"]
3795             self.failUnlessEqual(left["actual-buckets"], None)
3796             self.failUnlessEqual(left["original-buckets"], None)
3797             self.failUnlessEqual(left["configured-buckets"], None)
3798             self.failUnlessEqual(left["actual-shares"], None)
3799             self.failUnlessEqual(left["original-shares"], None)
3800             self.failUnlessEqual(left["configured-shares"], None)
3801             self.failUnlessEqual(left["actual-diskbytes"], None)
3802             self.failUnlessEqual(left["original-diskbytes"], None)
3803             self.failUnlessEqual(left["configured-diskbytes"], None)
3804             self.failUnlessEqual(left["actual-sharebytes"], None)
3805             self.failUnlessEqual(left["original-sharebytes"], None)
3806             self.failUnlessEqual(left["configured-sharebytes"], None)
3807
3808             full = s["estimated-remaining-cycle"]["space-recovered"]
3809             self.failUnlessEqual(full["actual-buckets"], None)
3810             self.failUnlessEqual(full["original-buckets"], None)
3811             self.failUnlessEqual(full["configured-buckets"], None)
3812             self.failUnlessEqual(full["actual-shares"], None)
3813             self.failUnlessEqual(full["original-shares"], None)
3814             self.failUnlessEqual(full["configured-shares"], None)
3815             self.failUnlessEqual(full["actual-diskbytes"], None)
3816             self.failUnlessEqual(full["original-diskbytes"], None)
3817             self.failUnlessEqual(full["configured-diskbytes"], None)
3818             self.failUnlessEqual(full["actual-sharebytes"], None)
3819             self.failUnlessEqual(full["original-sharebytes"], None)
3820             self.failUnlessEqual(full["configured-sharebytes"], None)
3821
3822         d.addCallback(_check)
3823         return d
3824
3825     def test_no_st_blocks(self):
3826         basedir = "storage/LeaseCrawler/no_st_blocks"
3827         fileutil.make_dirs(basedir)
3828         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
3829                                         expiration_mode="age",
3830                                         expiration_override_lease_duration=-1000)
3831         # a negative expiration_time= means the "configured-"
3832         # space-recovered counts will be non-zero, since all shares will have
3833         # expired by then
3834
3835         # make it start sooner than usual.
3836         lc = ss.lease_checker
3837         lc.slow_start = 0
3838
3839         self.make_shares(ss)
3840         ss.setServiceParent(self.s)
3841         def _wait():
3842             return bool(lc.get_state()["last-cycle-finished"] is not None)
3843         d = self.poll(_wait)
3844
3845         def _check(ignored):
3846             s = lc.get_state()
3847             last = s["history"][0]
3848             rec = last["space-recovered"]
3849             self.failUnlessEqual(rec["configured-buckets"], 4)
3850             self.failUnlessEqual(rec["configured-shares"], 4)
3851             self.failUnless(rec["configured-sharebytes"] > 0,
3852                             rec["configured-sharebytes"])
3853             # without the .st_blocks field in os.stat() results, we should be
3854             # reporting diskbytes==sharebytes
3855             self.failUnlessEqual(rec["configured-sharebytes"],
3856                                  rec["configured-diskbytes"])
3857         d.addCallback(_check)
3858         return d
3859
3860     def test_share_corruption(self):
3861         self._poll_should_ignore_these_errors = [
3862             UnknownMutableContainerVersionError,
3863             UnknownImmutableContainerVersionError,
3864             ]
3865         basedir = "storage/LeaseCrawler/share_corruption"
3866         fileutil.make_dirs(basedir)
3867         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3868         w = StorageStatus(ss)
3869         # make it start sooner than usual.
3870         lc = ss.lease_checker
3871         lc.stop_after_first_bucket = True
3872         lc.slow_start = 0
3873         lc.cpu_slice = 500
3874
3875         # create a few shares, with some leases on them
3876         self.make_shares(ss)
3877
3878         # now corrupt one, and make sure the lease-checker keeps going
3879         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3880         first = min(self.sis)
3881         first_b32 = base32.b2a(first)
3882         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
3883         f = open(fn, "rb+")
3884         f.seek(0)
3885         f.write("BAD MAGIC")
3886         f.close()
3887         # if get_share_file() doesn't see the correct mutable magic, it
3888         # assumes the file is an immutable share, and then
3889         # immutable.ShareFile sees a bad version. So regardless of which kind
3890         # of share we corrupted, this will trigger an
3891         # UnknownImmutableContainerVersionError.
3892
3893         # also create an empty bucket
3894         empty_si = base32.b2a("\x04"*16)
3895         empty_bucket_dir = os.path.join(ss.sharedir,
3896                                         storage_index_to_dir(empty_si))
3897         fileutil.make_dirs(empty_bucket_dir)
3898
3899         ss.setServiceParent(self.s)
3900
3901         d = fireEventually()
3902
3903         # now examine the state right after the first bucket has been
3904         # processed.
3905         def _after_first_bucket(ignored):
3906             s = lc.get_state()
3907             if "cycle-to-date" not in s:
3908                 d2 = fireEventually()
3909                 d2.addCallback(_after_first_bucket)
3910                 return d2
3911             so_far = s["cycle-to-date"]
3912             rec = so_far["space-recovered"]
3913             self.failUnlessEqual(rec["examined-buckets"], 1)
3914             self.failUnlessEqual(rec["examined-shares"], 0)
3915             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
3916         d.addCallback(_after_first_bucket)
3917
3918         d.addCallback(lambda ign: self.render_json(w))
3919         def _check_json(json):
3920             data = simplejson.loads(json)
3921             # grr. json turns all dict keys into strings.
3922             so_far = data["lease-checker"]["cycle-to-date"]
3923             corrupt_shares = so_far["corrupt-shares"]
3924             # it also turns all tuples into lists
3925             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3926         d.addCallback(_check_json)
3927         d.addCallback(lambda ign: self.render1(w))
3928         def _check_html(html):
3929             s = remove_tags(html)
3930             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3931         d.addCallback(_check_html)
3932
3933         def _wait():
3934             return bool(lc.get_state()["last-cycle-finished"] is not None)
3935         d.addCallback(lambda ign: self.poll(_wait))
3936
3937         def _after_first_cycle(ignored):
3938             s = lc.get_state()
3939             last = s["history"][0]
3940             rec = last["space-recovered"]
3941             self.failUnlessEqual(rec["examined-buckets"], 5)
3942             self.failUnlessEqual(rec["examined-shares"], 3)
3943             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
3944         d.addCallback(_after_first_cycle)
3945         d.addCallback(lambda ign: self.render_json(w))
3946         def _check_json_history(json):
3947             data = simplejson.loads(json)
3948             last = data["lease-checker"]["history"]["0"]
3949             corrupt_shares = last["corrupt-shares"]
3950             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3951         d.addCallback(_check_json_history)
3952         d.addCallback(lambda ign: self.render1(w))
3953         def _check_html_history(html):
3954             s = remove_tags(html)
3955             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3956         d.addCallback(_check_html_history)
3957
3958         def _cleanup(res):
3959             self.flushLoggedErrors(UnknownMutableContainerVersionError,
3960                                    UnknownImmutableContainerVersionError)
3961             return res
3962         d.addBoth(_cleanup)
3963         return d
3964
3965     def render_json(self, page):
3966         d = self.render1(page, args={"t": ["json"]})
3967         return d
3968
3969 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3970
3971     def setUp(self):
3972         self.s = service.MultiService()
3973         self.s.startService()
3974     def tearDown(self):
3975         return self.s.stopService()
3976
3977     def test_no_server(self):
3978         w = StorageStatus(None)
3979         html = w.renderSynchronously()
3980         self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
3981
3982     def test_status(self):
3983         basedir = "storage/WebStatus/status"
3984         fileutil.make_dirs(basedir)
3985         nodeid = "\x00" * 20
3986         ss = StorageServer(basedir, nodeid)
3987         ss.setServiceParent(self.s)
3988         w = StorageStatus(ss, "nickname")
3989         d = self.render1(w)
3990         def _check_html(html):
3991             self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3992             s = remove_tags(html)
3993             self.failUnlessIn("Server Nickname: nickname", s)
3994             self.failUnlessIn("Server Nodeid: %s"  % base32.b2a(nodeid), s)
3995             self.failUnlessIn("Accepting new shares: Yes", s)
3996             self.failUnlessIn("Reserved space: - 0 B (0)", s)
3997         d.addCallback(_check_html)
3998         d.addCallback(lambda ign: self.render_json(w))
3999         def _check_json(json):
4000             data = simplejson.loads(json)
4001             s = data["stats"]
4002             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
4003             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
4004             self.failUnlessIn("bucket-counter", data)
4005             self.failUnlessIn("lease-checker", data)
4006         d.addCallback(_check_json)
4007         return d
4008
4009     def render_json(self, page):
4010         d = self.render1(page, args={"t": ["json"]})
4011         return d
4012
4013     def test_status_no_disk_stats(self):
4014         def call_get_disk_stats(whichdir, reserved_space=0):
4015             raise AttributeError()
4016         self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
4017
4018         # Some platforms may have no disk stats API. Make sure the code can handle that
4019         # (test runs on all platforms).
4020         basedir = "storage/WebStatus/status_no_disk_stats"
4021         fileutil.make_dirs(basedir)
4022         ss = StorageServer(basedir, "\x00" * 20)
4023         ss.setServiceParent(self.s)
4024         w = StorageStatus(ss)
4025         html = w.renderSynchronously()
4026         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4027         s = remove_tags(html)
4028         self.failUnlessIn("Accepting new shares: Yes", s)
4029         self.failUnlessIn("Total disk space: ?", s)
4030         self.failUnlessIn("Space Available to Tahoe: ?", s)
4031         self.failUnless(ss.get_available_space() is None)
4032
4033     def test_status_bad_disk_stats(self):
4034         def call_get_disk_stats(whichdir, reserved_space=0):
4035             raise OSError()
4036         self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
4037
4038         # If the API to get disk stats exists but a call to it fails, then the status should
4039         # show that no shares will be accepted, and get_available_space() should be 0.
4040         basedir = "storage/WebStatus/status_bad_disk_stats"
4041         fileutil.make_dirs(basedir)
4042         ss = StorageServer(basedir, "\x00" * 20)
4043         ss.setServiceParent(self.s)
4044         w = StorageStatus(ss)
4045         html = w.renderSynchronously()
4046         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4047         s = remove_tags(html)
4048         self.failUnlessIn("Accepting new shares: No", s)
4049         self.failUnlessIn("Total disk space: ?", s)
4050         self.failUnlessIn("Space Available to Tahoe: ?", s)
4051         self.failUnlessEqual(ss.get_available_space(), 0)
4052
4053     def test_status_right_disk_stats(self):
4054         GB = 1000000000
4055         total            = 5*GB
4056         free_for_root    = 4*GB
4057         free_for_nonroot = 3*GB
4058         reserved         = 1*GB
4059
4060         basedir = "storage/WebStatus/status_right_disk_stats"
4061         fileutil.make_dirs(basedir)
4062         ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved)
4063         expecteddir = ss.sharedir
4064
4065         def call_get_disk_stats(whichdir, reserved_space=0):
4066             self.failUnlessEqual(whichdir, expecteddir)
4067             self.failUnlessEqual(reserved_space, reserved)
4068             used = total - free_for_root
4069             avail = max(free_for_nonroot - reserved_space, 0)
4070             return {
4071               'total': total,
4072               'free_for_root': free_for_root,
4073               'free_for_nonroot': free_for_nonroot,
4074               'used': used,
4075               'avail': avail,
4076             }
4077         self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
4078
4079         ss.setServiceParent(self.s)
4080         w = StorageStatus(ss)
4081         html = w.renderSynchronously()
4082
4083         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4084         s = remove_tags(html)
4085         self.failUnlessIn("Total disk space: 5.00 GB", s)
4086         self.failUnlessIn("Disk space used: - 1.00 GB", s)
4087         self.failUnlessIn("Disk space free (root): 4.00 GB", s)
4088         self.failUnlessIn("Disk space free (non-root): 3.00 GB", s)
4089         self.failUnlessIn("Reserved space: - 1.00 GB", s)
4090         self.failUnlessIn("Space Available to Tahoe: 2.00 GB", s)
4091         self.failUnlessEqual(ss.get_available_space(), 2*GB)
4092
4093     def test_readonly(self):
4094         basedir = "storage/WebStatus/readonly"
4095         fileutil.make_dirs(basedir)
4096         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
4097         ss.setServiceParent(self.s)
4098         w = StorageStatus(ss)
4099         html = w.renderSynchronously()
4100         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4101         s = remove_tags(html)
4102         self.failUnlessIn("Accepting new shares: No", s)
4103
4104     def test_reserved(self):
4105         basedir = "storage/WebStatus/reserved"
4106         fileutil.make_dirs(basedir)
4107         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4108         ss.setServiceParent(self.s)
4109         w = StorageStatus(ss)
4110         html = w.renderSynchronously()
4111         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4112         s = remove_tags(html)
4113         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4114
4115     def test_huge_reserved(self):
4116         basedir = "storage/WebStatus/reserved"
4117         fileutil.make_dirs(basedir)
4118         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4119         ss.setServiceParent(self.s)
4120         w = StorageStatus(ss)
4121         html = w.renderSynchronously()
4122         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4123         s = remove_tags(html)
4124         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4125
4126     def test_util(self):
4127         w = StorageStatus(None)
4128         self.failUnlessEqual(w.render_space(None, None), "?")
4129         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
4130         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
4131         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
4132         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
4133         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)