]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
549b839fa7bdb76787bf4758500a7f3cbb8948ff
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1 import time, os.path, platform, stat, re, simplejson, struct, shutil
2
3 import mock
4
5 from twisted.trial import unittest
6
7 from twisted.internet import defer
8 from twisted.application import service
9 from foolscap.api import fireEventually
10 import itertools
11 from allmydata import interfaces
12 from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
13 from allmydata.storage.server import StorageServer
14 from allmydata.storage.mutable import MutableShareFile
15 from allmydata.storage.immutable import BucketWriter, BucketReader
16 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
17      UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
18 from allmydata.storage.lease import LeaseInfo
19 from allmydata.storage.crawler import BucketCountingCrawler
20 from allmydata.storage.expirer import LeaseCheckingCrawler
21 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22      ReadBucketProxy
23 from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
24                                      LayoutInvalid, MDMFSIGNABLEHEADER, \
25                                      SIGNED_PREFIX, MDMFHEADER, \
26                                      MDMFOFFSETS, SDMFSlotWriteProxy, \
27                                      PRIVATE_KEY_SIZE, \
28                                      SIGNATURE_SIZE, \
29                                      VERIFICATION_KEY_SIZE, \
30                                      SHARE_HASH_CHAIN_SIZE
31 from allmydata.interfaces import BadWriteEnablerError
32 from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
33 from allmydata.test.common_web import WebRenderingMixin
34 from allmydata.test.no_network import NoNetworkServer
35 from allmydata.web.storage import StorageStatus, remove_prefix
36
37 class Marker:
38     pass
39 class FakeCanary:
40     def __init__(self, ignore_disconnectors=False):
41         self.ignore = ignore_disconnectors
42         self.disconnectors = {}
43     def notifyOnDisconnect(self, f, *args, **kwargs):
44         if self.ignore:
45             return
46         m = Marker()
47         self.disconnectors[m] = (f, args, kwargs)
48         return m
49     def dontNotifyOnDisconnect(self, marker):
50         if self.ignore:
51             return
52         del self.disconnectors[marker]
53
54 class FakeStatsProvider:
55     def count(self, name, delta=1):
56         pass
57     def register_producer(self, producer):
58         pass
59
60 class Bucket(unittest.TestCase):
61     def make_workdir(self, name):
62         basedir = os.path.join("storage", "Bucket", name)
63         incoming = os.path.join(basedir, "tmp", "bucket")
64         final = os.path.join(basedir, "bucket")
65         fileutil.make_dirs(basedir)
66         fileutil.make_dirs(os.path.join(basedir, "tmp"))
67         return incoming, final
68
69     def bucket_writer_closed(self, bw, consumed):
70         pass
71     def add_latency(self, category, latency):
72         pass
73     def count(self, name, delta=1):
74         pass
75
76     def make_lease(self):
77         owner_num = 0
78         renew_secret = os.urandom(32)
79         cancel_secret = os.urandom(32)
80         expiration_time = time.time() + 5000
81         return LeaseInfo(owner_num, renew_secret, cancel_secret,
82                          expiration_time, "\x00" * 20)
83
84     def test_create(self):
85         incoming, final = self.make_workdir("test_create")
86         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
87                           FakeCanary())
88         bw.remote_write(0, "a"*25)
89         bw.remote_write(25, "b"*25)
90         bw.remote_write(50, "c"*25)
91         bw.remote_write(75, "d"*7)
92         bw.remote_close()
93
94     def test_readwrite(self):
95         incoming, final = self.make_workdir("test_readwrite")
96         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
97                           FakeCanary())
98         bw.remote_write(0, "a"*25)
99         bw.remote_write(25, "b"*25)
100         bw.remote_write(50, "c"*7) # last block may be short
101         bw.remote_close()
102
103         # now read from it
104         br = BucketReader(self, bw.finalhome)
105         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
106         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
107         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
108
109 class RemoteBucket:
110
111     def __init__(self):
112         self.read_count = 0
113         self.write_count = 0
114
115     def callRemote(self, methname, *args, **kwargs):
116         def _call():
117             meth = getattr(self.target, "remote_" + methname)
118             return meth(*args, **kwargs)
119
120         if methname == "slot_readv":
121             self.read_count += 1
122         if "writev" in methname:
123             self.write_count += 1
124
125         return defer.maybeDeferred(_call)
126
127
128 class BucketProxy(unittest.TestCase):
129     def make_bucket(self, name, size):
130         basedir = os.path.join("storage", "BucketProxy", name)
131         incoming = os.path.join(basedir, "tmp", "bucket")
132         final = os.path.join(basedir, "bucket")
133         fileutil.make_dirs(basedir)
134         fileutil.make_dirs(os.path.join(basedir, "tmp"))
135         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
136                           FakeCanary())
137         rb = RemoteBucket()
138         rb.target = bw
139         return bw, rb, final
140
141     def make_lease(self):
142         owner_num = 0
143         renew_secret = os.urandom(32)
144         cancel_secret = os.urandom(32)
145         expiration_time = time.time() + 5000
146         return LeaseInfo(owner_num, renew_secret, cancel_secret,
147                          expiration_time, "\x00" * 20)
148
149     def bucket_writer_closed(self, bw, consumed):
150         pass
151     def add_latency(self, category, latency):
152         pass
153     def count(self, name, delta=1):
154         pass
155
156     def test_create(self):
157         bw, rb, sharefname = self.make_bucket("test_create", 500)
158         bp = WriteBucketProxy(rb, None,
159                               data_size=300,
160                               block_size=10,
161                               num_segments=5,
162                               num_share_hashes=3,
163                               uri_extension_size_max=500)
164         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
165
166     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
167         # Let's pretend each share has 100 bytes of data, and that there are
168         # 4 segments (25 bytes each), and 8 shares total. So the two
169         # per-segment merkle trees (crypttext_hash_tree,
170         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
171         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
172         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
173         # long. That should make the whole share:
174         #
175         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
176         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
177
178         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
179
180         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
181                             for i in range(7)]
182         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
183                         for i in range(7)]
184         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
185                         for i in (1,9,13)]
186         uri_extension = "s" + "E"*498 + "e"
187
188         bw, rb, sharefname = self.make_bucket(name, sharesize)
189         bp = wbp_class(rb, None,
190                        data_size=95,
191                        block_size=25,
192                        num_segments=4,
193                        num_share_hashes=3,
194                        uri_extension_size_max=len(uri_extension))
195
196         d = bp.put_header()
197         d.addCallback(lambda res: bp.put_block(0, "a"*25))
198         d.addCallback(lambda res: bp.put_block(1, "b"*25))
199         d.addCallback(lambda res: bp.put_block(2, "c"*25))
200         d.addCallback(lambda res: bp.put_block(3, "d"*20))
201         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
202         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
203         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
204         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
205         d.addCallback(lambda res: bp.close())
206
207         # now read everything back
208         def _start_reading(res):
209             br = BucketReader(self, sharefname)
210             rb = RemoteBucket()
211             rb.target = br
212             server = NoNetworkServer("abc", None)
213             rbp = rbp_class(rb, server, storage_index="")
214             self.failUnlessIn("to peer", repr(rbp))
215             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
216
217             d1 = rbp.get_block_data(0, 25, 25)
218             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
219             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
220             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
221             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
222             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
223             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
224             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
225
226             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
227             d1.addCallback(lambda res:
228                            self.failUnlessEqual(res, crypttext_hashes))
229             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
230             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
231             d1.addCallback(lambda res: rbp.get_share_hashes())
232             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
233             d1.addCallback(lambda res: rbp.get_uri_extension())
234             d1.addCallback(lambda res:
235                            self.failUnlessEqual(res, uri_extension))
236
237             return d1
238
239         d.addCallback(_start_reading)
240
241         return d
242
243     def test_readwrite_v1(self):
244         return self._do_test_readwrite("test_readwrite_v1",
245                                        0x24, WriteBucketProxy, ReadBucketProxy)
246
247     def test_readwrite_v2(self):
248         return self._do_test_readwrite("test_readwrite_v2",
249                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
250
251 class Server(unittest.TestCase):
252
253     def setUp(self):
254         self.sparent = LoggingServiceParent()
255         self.sparent.startService()
256         self._lease_secret = itertools.count()
257     def tearDown(self):
258         return self.sparent.stopService()
259
260     def workdir(self, name):
261         basedir = os.path.join("storage", "Server", name)
262         return basedir
263
264     def create(self, name, reserved_space=0, klass=StorageServer):
265         workdir = self.workdir(name)
266         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
267                    stats_provider=FakeStatsProvider())
268         ss.setServiceParent(self.sparent)
269         return ss
270
271     def test_create(self):
272         self.create("test_create")
273
274     def allocate(self, ss, storage_index, sharenums, size, canary=None):
275         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
276         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
277         if not canary:
278             canary = FakeCanary()
279         return ss.remote_allocate_buckets(storage_index,
280                                           renew_secret, cancel_secret,
281                                           sharenums, size, canary)
282
283     def test_large_share(self):
284         syslow = platform.system().lower()
285         if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
286             raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
287
288         avail = fileutil.get_available_space('.', 512*2**20)
289         if avail <= 4*2**30:
290             raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
291
292         ss = self.create("test_large_share")
293
294         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
295         self.failUnlessEqual(already, set())
296         self.failUnlessEqual(set(writers.keys()), set([0]))
297
298         shnum, bucket = writers.items()[0]
299         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
300         bucket.remote_write(2**32, "ab")
301         bucket.remote_close()
302
303         readers = ss.remote_get_buckets("allocate")
304         reader = readers[shnum]
305         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
306
307     def test_dont_overfill_dirs(self):
308         """
309         This test asserts that if you add a second share whose storage index
310         share lots of leading bits with an extant share (but isn't the exact
311         same storage index), this won't add an entry to the share directory.
312         """
313         ss = self.create("test_dont_overfill_dirs")
314         already, writers = self.allocate(ss, "storageindex", [0], 10)
315         for i, wb in writers.items():
316             wb.remote_write(0, "%10d" % i)
317             wb.remote_close()
318         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
319                                 "shares")
320         children_of_storedir = set(os.listdir(storedir))
321
322         # Now store another one under another storageindex that has leading
323         # chars the same as the first storageindex.
324         already, writers = self.allocate(ss, "storageindey", [0], 10)
325         for i, wb in writers.items():
326             wb.remote_write(0, "%10d" % i)
327             wb.remote_close()
328         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
329                                 "shares")
330         new_children_of_storedir = set(os.listdir(storedir))
331         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
332
333     def test_remove_incoming(self):
334         ss = self.create("test_remove_incoming")
335         already, writers = self.allocate(ss, "vid", range(3), 10)
336         for i,wb in writers.items():
337             wb.remote_write(0, "%10d" % i)
338             wb.remote_close()
339         incoming_share_dir = wb.incominghome
340         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
341         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
342         incoming_dir = os.path.dirname(incoming_prefix_dir)
343         self.failIf(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
344         self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
345         self.failUnless(os.path.exists(incoming_dir), incoming_dir)
346
347     def test_abort(self):
348         # remote_abort, when called on a writer, should make sure that
349         # the allocated size of the bucket is not counted by the storage
350         # server when accounting for space.
351         ss = self.create("test_abort")
352         already, writers = self.allocate(ss, "allocate", [0, 1, 2], 150)
353         self.failIfEqual(ss.allocated_size(), 0)
354
355         # Now abort the writers.
356         for writer in writers.itervalues():
357             writer.remote_abort()
358         self.failUnlessEqual(ss.allocated_size(), 0)
359
360
361     def test_allocate(self):
362         ss = self.create("test_allocate")
363
364         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
365
366         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
367         self.failUnlessEqual(already, set())
368         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
369
370         # while the buckets are open, they should not count as readable
371         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
372
373         # close the buckets
374         for i,wb in writers.items():
375             wb.remote_write(0, "%25d" % i)
376             wb.remote_close()
377             # aborting a bucket that was already closed is a no-op
378             wb.remote_abort()
379
380         # now they should be readable
381         b = ss.remote_get_buckets("allocate")
382         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
383         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
384         b_str = str(b[0])
385         self.failUnlessIn("BucketReader", b_str)
386         self.failUnlessIn("mfwgy33dmf2g 0", b_str)
387
388         # now if we ask about writing again, the server should offer those
389         # three buckets as already present. It should offer them even if we
390         # don't ask about those specific ones.
391         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
392         self.failUnlessEqual(already, set([0,1,2]))
393         self.failUnlessEqual(set(writers.keys()), set([3,4]))
394
395         # while those two buckets are open for writing, the server should
396         # refuse to offer them to uploaders
397
398         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
399         self.failUnlessEqual(already2, set([0,1,2]))
400         self.failUnlessEqual(set(writers2.keys()), set([5]))
401
402         # aborting the writes should remove the tempfiles
403         for i,wb in writers2.items():
404             wb.remote_abort()
405         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
406         self.failUnlessEqual(already2, set([0,1,2]))
407         self.failUnlessEqual(set(writers2.keys()), set([5]))
408
409         for i,wb in writers2.items():
410             wb.remote_abort()
411         for i,wb in writers.items():
412             wb.remote_abort()
413
414     def test_bad_container_version(self):
415         ss = self.create("test_bad_container_version")
416         a,w = self.allocate(ss, "si1", [0], 10)
417         w[0].remote_write(0, "\xff"*10)
418         w[0].remote_close()
419
420         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
421         f = open(fn, "rb+")
422         f.seek(0)
423         f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
424         f.close()
425
426         ss.remote_get_buckets("allocate")
427
428         e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
429                                   ss.remote_get_buckets, "si1")
430         self.failUnlessIn(" had version 0 but we wanted 1", str(e))
431
432     def test_disconnect(self):
433         # simulate a disconnection
434         ss = self.create("test_disconnect")
435         canary = FakeCanary()
436         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
437         self.failUnlessEqual(already, set())
438         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
439         for (f,args,kwargs) in canary.disconnectors.values():
440             f(*args, **kwargs)
441         del already
442         del writers
443
444         # that ought to delete the incoming shares
445         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
446         self.failUnlessEqual(already, set())
447         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
448
449     @mock.patch('allmydata.util.fileutil.get_disk_stats')
450     def test_reserved_space(self, mock_get_disk_stats):
451         reserved_space=10000
452         mock_get_disk_stats.return_value = {
453             'free_for_nonroot': 15000,
454             'avail': max(15000 - reserved_space, 0),
455             }
456
457         ss = self.create("test_reserved_space", reserved_space=reserved_space)
458         # 15k available, 10k reserved, leaves 5k for shares
459
460         # a newly created and filled share incurs this much overhead, beyond
461         # the size we request.
462         OVERHEAD = 3*4
463         LEASE_SIZE = 4+32+32+4
464         canary = FakeCanary(True)
465         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
466         self.failUnlessEqual(len(writers), 3)
467         # now the StorageServer should have 3000 bytes provisionally
468         # allocated, allowing only 2000 more to be claimed
469         self.failUnlessEqual(len(ss._active_writers), 3)
470
471         # allocating 1001-byte shares only leaves room for one
472         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
473         self.failUnlessEqual(len(writers2), 1)
474         self.failUnlessEqual(len(ss._active_writers), 4)
475
476         # we abandon the first set, so their provisional allocation should be
477         # returned
478         del already
479         del writers
480         self.failUnlessEqual(len(ss._active_writers), 1)
481         # now we have a provisional allocation of 1001 bytes
482
483         # and we close the second set, so their provisional allocation should
484         # become real, long-term allocation, and grows to include the
485         # overhead.
486         for bw in writers2.values():
487             bw.remote_write(0, "a"*25)
488             bw.remote_close()
489         del already2
490         del writers2
491         del bw
492         self.failUnlessEqual(len(ss._active_writers), 0)
493
494         allocated = 1001 + OVERHEAD + LEASE_SIZE
495
496         # we have to manually increase available, since we're not doing real
497         # disk measurements
498         mock_get_disk_stats.return_value = {
499             'free_for_nonroot': 15000 - allocated,
500             'avail': max(15000 - allocated - reserved_space, 0),
501             }
502
503         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
504         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
505         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
506         self.failUnlessEqual(len(writers3), 39)
507         self.failUnlessEqual(len(ss._active_writers), 39)
508
509         del already3
510         del writers3
511         self.failUnlessEqual(len(ss._active_writers), 0)
512         ss.disownServiceParent()
513         del ss
514
515     def test_seek(self):
516         basedir = self.workdir("test_seek_behavior")
517         fileutil.make_dirs(basedir)
518         filename = os.path.join(basedir, "testfile")
519         f = open(filename, "wb")
520         f.write("start")
521         f.close()
522         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
523         # files. mode="a" preserves previous contents but does not allow
524         # seeking-to-create-holes. mode="r+" allows both.
525         f = open(filename, "rb+")
526         f.seek(100)
527         f.write("100")
528         f.close()
529         filelen = os.stat(filename)[stat.ST_SIZE]
530         self.failUnlessEqual(filelen, 100+3)
531         f2 = open(filename, "rb")
532         self.failUnlessEqual(f2.read(5), "start")
533
534
535     def test_leases(self):
536         ss = self.create("test_leases")
537         canary = FakeCanary()
538         sharenums = range(5)
539         size = 100
540
541         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
542                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
543         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
544                                                      sharenums, size, canary)
545         self.failUnlessEqual(len(already), 0)
546         self.failUnlessEqual(len(writers), 5)
547         for wb in writers.values():
548             wb.remote_close()
549
550         leases = list(ss.get_leases("si0"))
551         self.failUnlessEqual(len(leases), 1)
552         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
553
554         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
555                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
556         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
557                                                      sharenums, size, canary)
558         for wb in writers.values():
559             wb.remote_close()
560
561         # take out a second lease on si1
562         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
563                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
564         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
565                                                      sharenums, size, canary)
566         self.failUnlessEqual(len(already), 5)
567         self.failUnlessEqual(len(writers), 0)
568
569         leases = list(ss.get_leases("si1"))
570         self.failUnlessEqual(len(leases), 2)
571         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
572
573         # and a third lease, using add-lease
574         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
575                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
576         ss.remote_add_lease("si1", rs2a, cs2a)
577         leases = list(ss.get_leases("si1"))
578         self.failUnlessEqual(len(leases), 3)
579         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
580
581         # add-lease on a missing storage index is silently ignored
582         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
583
584         # check that si0 is readable
585         readers = ss.remote_get_buckets("si0")
586         self.failUnlessEqual(len(readers), 5)
587
588         # renew the first lease. Only the proper renew_secret should work
589         ss.remote_renew_lease("si0", rs0)
590         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
591         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
592
593         # check that si0 is still readable
594         readers = ss.remote_get_buckets("si0")
595         self.failUnlessEqual(len(readers), 5)
596
597         # now cancel it
598         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
599         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
600         ss.remote_cancel_lease("si0", cs0)
601
602         # si0 should now be gone
603         readers = ss.remote_get_buckets("si0")
604         self.failUnlessEqual(len(readers), 0)
605         # and the renew should no longer work
606         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
607
608
609         # cancel the first lease on si1, leaving the second and third in place
610         ss.remote_cancel_lease("si1", cs1)
611         readers = ss.remote_get_buckets("si1")
612         self.failUnlessEqual(len(readers), 5)
613         # the corresponding renew should no longer work
614         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
615
616         leases = list(ss.get_leases("si1"))
617         self.failUnlessEqual(len(leases), 2)
618         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
619
620         ss.remote_renew_lease("si1", rs2)
621         # cancelling the second and third should make it go away
622         ss.remote_cancel_lease("si1", cs2)
623         ss.remote_cancel_lease("si1", cs2a)
624         readers = ss.remote_get_buckets("si1")
625         self.failUnlessEqual(len(readers), 0)
626         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
627         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
628         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
629
630         leases = list(ss.get_leases("si1"))
631         self.failUnlessEqual(len(leases), 0)
632
633
634         # test overlapping uploads
635         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
636                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
637         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
638                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
639         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
640                                                      sharenums, size, canary)
641         self.failUnlessEqual(len(already), 0)
642         self.failUnlessEqual(len(writers), 5)
643         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
644                                                        sharenums, size, canary)
645         self.failUnlessEqual(len(already2), 0)
646         self.failUnlessEqual(len(writers2), 0)
647         for wb in writers.values():
648             wb.remote_close()
649
650         leases = list(ss.get_leases("si3"))
651         self.failUnlessEqual(len(leases), 1)
652
653         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
654                                                        sharenums, size, canary)
655         self.failUnlessEqual(len(already3), 5)
656         self.failUnlessEqual(len(writers3), 0)
657
658         leases = list(ss.get_leases("si3"))
659         self.failUnlessEqual(len(leases), 2)
660
661     def test_readonly(self):
662         workdir = self.workdir("test_readonly")
663         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
664         ss.setServiceParent(self.sparent)
665
666         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
667         self.failUnlessEqual(already, set())
668         self.failUnlessEqual(writers, {})
669
670         stats = ss.get_stats()
671         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
672         if "storage_server.disk_avail" in stats:
673             # Some platforms may not have an API to get disk stats.
674             # But if there are stats, readonly_storage means disk_avail=0
675             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
676
677     def test_discard(self):
678         # discard is really only used for other tests, but we test it anyways
679         workdir = self.workdir("test_discard")
680         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
681         ss.setServiceParent(self.sparent)
682
683         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
684         self.failUnlessEqual(already, set())
685         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
686         for i,wb in writers.items():
687             wb.remote_write(0, "%25d" % i)
688             wb.remote_close()
689         # since we discard the data, the shares should be present but sparse.
690         # Since we write with some seeks, the data we read back will be all
691         # zeros.
692         b = ss.remote_get_buckets("vid")
693         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
694         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
695
696     def test_advise_corruption(self):
697         workdir = self.workdir("test_advise_corruption")
698         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
699         ss.setServiceParent(self.sparent)
700
701         si0_s = base32.b2a("si0")
702         ss.remote_advise_corrupt_share("immutable", "si0", 0,
703                                        "This share smells funny.\n")
704         reportdir = os.path.join(workdir, "corruption-advisories")
705         reports = os.listdir(reportdir)
706         self.failUnlessEqual(len(reports), 1)
707         report_si0 = reports[0]
708         self.failUnlessIn(si0_s, report_si0)
709         f = open(os.path.join(reportdir, report_si0), "r")
710         report = f.read()
711         f.close()
712         self.failUnlessIn("type: immutable", report)
713         self.failUnlessIn("storage_index: %s" % si0_s, report)
714         self.failUnlessIn("share_number: 0", report)
715         self.failUnlessIn("This share smells funny.", report)
716
717         # test the RIBucketWriter version too
718         si1_s = base32.b2a("si1")
719         already,writers = self.allocate(ss, "si1", [1], 75)
720         self.failUnlessEqual(already, set())
721         self.failUnlessEqual(set(writers.keys()), set([1]))
722         writers[1].remote_write(0, "data")
723         writers[1].remote_close()
724
725         b = ss.remote_get_buckets("si1")
726         self.failUnlessEqual(set(b.keys()), set([1]))
727         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
728
729         reports = os.listdir(reportdir)
730         self.failUnlessEqual(len(reports), 2)
731         report_si1 = [r for r in reports if si1_s in r][0]
732         f = open(os.path.join(reportdir, report_si1), "r")
733         report = f.read()
734         f.close()
735         self.failUnlessIn("type: immutable", report)
736         self.failUnlessIn("storage_index: %s" % si1_s, report)
737         self.failUnlessIn("share_number: 1", report)
738         self.failUnlessIn("This share tastes like dust.", report)
739
740
741
742 class MutableServer(unittest.TestCase):
743
744     def setUp(self):
745         self.sparent = LoggingServiceParent()
746         self._lease_secret = itertools.count()
747     def tearDown(self):
748         return self.sparent.stopService()
749
750     def workdir(self, name):
751         basedir = os.path.join("storage", "MutableServer", name)
752         return basedir
753
754     def create(self, name):
755         workdir = self.workdir(name)
756         ss = StorageServer(workdir, "\x00" * 20)
757         ss.setServiceParent(self.sparent)
758         return ss
759
760     def test_create(self):
761         self.create("test_create")
762
763     def write_enabler(self, we_tag):
764         return hashutil.tagged_hash("we_blah", we_tag)
765
766     def renew_secret(self, tag):
767         return hashutil.tagged_hash("renew_blah", str(tag))
768
769     def cancel_secret(self, tag):
770         return hashutil.tagged_hash("cancel_blah", str(tag))
771
772     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
773         write_enabler = self.write_enabler(we_tag)
774         renew_secret = self.renew_secret(lease_tag)
775         cancel_secret = self.cancel_secret(lease_tag)
776         rstaraw = ss.remote_slot_testv_and_readv_and_writev
777         testandwritev = dict( [ (shnum, ([], [], None) )
778                          for shnum in sharenums ] )
779         readv = []
780         rc = rstaraw(storage_index,
781                      (write_enabler, renew_secret, cancel_secret),
782                      testandwritev,
783                      readv)
784         (did_write, readv_data) = rc
785         self.failUnless(did_write)
786         self.failUnless(isinstance(readv_data, dict))
787         self.failUnlessEqual(len(readv_data), 0)
788
789     def test_bad_magic(self):
790         ss = self.create("test_bad_magic")
791         self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
792         fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
793         f = open(fn, "rb+")
794         f.seek(0)
795         f.write("BAD MAGIC")
796         f.close()
797         read = ss.remote_slot_readv
798         e = self.failUnlessRaises(UnknownMutableContainerVersionError,
799                                   read, "si1", [0], [(0,10)])
800         self.failUnlessIn(" had magic ", str(e))
801         self.failUnlessIn(" but we wanted ", str(e))
802
803     def test_container_size(self):
804         ss = self.create("test_container_size")
805         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
806                       set([0,1,2]), 100)
807         read = ss.remote_slot_readv
808         rstaraw = ss.remote_slot_testv_and_readv_and_writev
809         secrets = ( self.write_enabler("we1"),
810                     self.renew_secret("we1"),
811                     self.cancel_secret("we1") )
812         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
813         answer = rstaraw("si1", secrets,
814                          {0: ([], [(0,data)], len(data)+12)},
815                          [])
816         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
817
818         # trying to make the container too large will raise an exception
819         TOOBIG = MutableShareFile.MAX_SIZE + 10
820         self.failUnlessRaises(DataTooLargeError,
821                               rstaraw, "si1", secrets,
822                               {0: ([], [(0,data)], TOOBIG)},
823                               [])
824
825         # it should be possible to make the container smaller, although at
826         # the moment this doesn't actually affect the share, unless the
827         # container size is dropped to zero, in which case the share is
828         # deleted.
829         answer = rstaraw("si1", secrets,
830                          {0: ([], [(0,data)], len(data)+8)},
831                          [])
832         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
833
834         answer = rstaraw("si1", secrets,
835                          {0: ([], [(0,data)], 0)},
836                          [])
837         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
838
839         read_answer = read("si1", [0], [(0,10)])
840         self.failUnlessEqual(read_answer, {})
841
842     def test_allocate(self):
843         ss = self.create("test_allocate")
844         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
845                       set([0,1,2]), 100)
846
847         read = ss.remote_slot_readv
848         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
849                              {0: [""]})
850         self.failUnlessEqual(read("si1", [], [(0, 10)]),
851                              {0: [""], 1: [""], 2: [""]})
852         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
853                              {0: [""]})
854
855         # try writing to one
856         secrets = ( self.write_enabler("we1"),
857                     self.renew_secret("we1"),
858                     self.cancel_secret("we1") )
859         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
860         write = ss.remote_slot_testv_and_readv_and_writev
861         answer = write("si1", secrets,
862                        {0: ([], [(0,data)], None)},
863                        [])
864         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
865
866         self.failUnlessEqual(read("si1", [0], [(0,20)]),
867                              {0: ["00000000001111111111"]})
868         self.failUnlessEqual(read("si1", [0], [(95,10)]),
869                              {0: ["99999"]})
870         #self.failUnlessEqual(s0.remote_get_length(), 100)
871
872         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
873         f = self.failUnlessRaises(BadWriteEnablerError,
874                                   write, "si1", bad_secrets,
875                                   {}, [])
876         self.failUnlessIn("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.", f)
877
878         # this testv should fail
879         answer = write("si1", secrets,
880                        {0: ([(0, 12, "eq", "444444444444"),
881                              (20, 5, "eq", "22222"),
882                              ],
883                             [(0, "x"*100)],
884                             None),
885                         },
886                        [(0,12), (20,5)],
887                        )
888         self.failUnlessEqual(answer, (False,
889                                       {0: ["000000000011", "22222"],
890                                        1: ["", ""],
891                                        2: ["", ""],
892                                        }))
893         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
894
895         # as should this one
896         answer = write("si1", secrets,
897                        {0: ([(10, 5, "lt", "11111"),
898                              ],
899                             [(0, "x"*100)],
900                             None),
901                         },
902                        [(10,5)],
903                        )
904         self.failUnlessEqual(answer, (False,
905                                       {0: ["11111"],
906                                        1: [""],
907                                        2: [""]},
908                                       ))
909         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
910
911
912     def test_operators(self):
913         # test operators, the data we're comparing is '11111' in all cases.
914         # test both fail+pass, reset data after each one.
915         ss = self.create("test_operators")
916
917         secrets = ( self.write_enabler("we1"),
918                     self.renew_secret("we1"),
919                     self.cancel_secret("we1") )
920         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
921         write = ss.remote_slot_testv_and_readv_and_writev
922         read = ss.remote_slot_readv
923
924         def reset():
925             write("si1", secrets,
926                   {0: ([], [(0,data)], None)},
927                   [])
928
929         reset()
930
931         #  lt
932         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
933                                              ],
934                                             [(0, "x"*100)],
935                                             None,
936                                             )}, [(10,5)])
937         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
938         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
939         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
940         reset()
941
942         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
943                                              ],
944                                             [(0, "x"*100)],
945                                             None,
946                                             )}, [(10,5)])
947         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
948         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
949         reset()
950
951         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
952                                              ],
953                                             [(0, "y"*100)],
954                                             None,
955                                             )}, [(10,5)])
956         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
957         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
958         reset()
959
960         #  le
961         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
962                                              ],
963                                             [(0, "x"*100)],
964                                             None,
965                                             )}, [(10,5)])
966         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
967         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
968         reset()
969
970         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
971                                              ],
972                                             [(0, "y"*100)],
973                                             None,
974                                             )}, [(10,5)])
975         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
976         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
977         reset()
978
979         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
980                                              ],
981                                             [(0, "y"*100)],
982                                             None,
983                                             )}, [(10,5)])
984         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
985         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
986         reset()
987
988         #  eq
989         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
990                                              ],
991                                             [(0, "x"*100)],
992                                             None,
993                                             )}, [(10,5)])
994         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
995         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
996         reset()
997
998         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
999                                              ],
1000                                             [(0, "y"*100)],
1001                                             None,
1002                                             )}, [(10,5)])
1003         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1004         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1005         reset()
1006
1007         #  ne
1008         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
1009                                              ],
1010                                             [(0, "x"*100)],
1011                                             None,
1012                                             )}, [(10,5)])
1013         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1014         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1015         reset()
1016
1017         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
1018                                              ],
1019                                             [(0, "y"*100)],
1020                                             None,
1021                                             )}, [(10,5)])
1022         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1023         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1024         reset()
1025
1026         #  ge
1027         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
1028                                              ],
1029                                             [(0, "y"*100)],
1030                                             None,
1031                                             )}, [(10,5)])
1032         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1033         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1034         reset()
1035
1036         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1037                                              ],
1038                                             [(0, "y"*100)],
1039                                             None,
1040                                             )}, [(10,5)])
1041         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1042         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1043         reset()
1044
1045         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1046                                              ],
1047                                             [(0, "y"*100)],
1048                                             None,
1049                                             )}, [(10,5)])
1050         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1051         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1052         reset()
1053
1054         #  gt
1055         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1056                                              ],
1057                                             [(0, "y"*100)],
1058                                             None,
1059                                             )}, [(10,5)])
1060         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1061         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1062         reset()
1063
1064         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1065                                              ],
1066                                             [(0, "x"*100)],
1067                                             None,
1068                                             )}, [(10,5)])
1069         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1070         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1071         reset()
1072
1073         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1074                                              ],
1075                                             [(0, "x"*100)],
1076                                             None,
1077                                             )}, [(10,5)])
1078         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1079         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1080         reset()
1081
1082         # finally, test some operators against empty shares
1083         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1084                                              ],
1085                                             [(0, "x"*100)],
1086                                             None,
1087                                             )}, [(10,5)])
1088         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1089         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1090         reset()
1091
1092     def test_readv(self):
1093         ss = self.create("test_readv")
1094         secrets = ( self.write_enabler("we1"),
1095                     self.renew_secret("we1"),
1096                     self.cancel_secret("we1") )
1097         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1098         write = ss.remote_slot_testv_and_readv_and_writev
1099         read = ss.remote_slot_readv
1100         data = [("%d" % i) * 100 for i in range(3)]
1101         rc = write("si1", secrets,
1102                    {0: ([], [(0,data[0])], None),
1103                     1: ([], [(0,data[1])], None),
1104                     2: ([], [(0,data[2])], None),
1105                     }, [])
1106         self.failUnlessEqual(rc, (True, {}))
1107
1108         answer = read("si1", [], [(0, 10)])
1109         self.failUnlessEqual(answer, {0: ["0"*10],
1110                                       1: ["1"*10],
1111                                       2: ["2"*10]})
1112
1113     def compare_leases_without_timestamps(self, leases_a, leases_b):
1114         self.failUnlessEqual(len(leases_a), len(leases_b))
1115         for i in range(len(leases_a)):
1116             a = leases_a[i]
1117             b = leases_b[i]
1118             self.failUnlessEqual(a.owner_num,       b.owner_num)
1119             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1120             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1121             self.failUnlessEqual(a.nodeid,          b.nodeid)
1122
1123     def compare_leases(self, leases_a, leases_b):
1124         self.failUnlessEqual(len(leases_a), len(leases_b))
1125         for i in range(len(leases_a)):
1126             a = leases_a[i]
1127             b = leases_b[i]
1128             self.failUnlessEqual(a.owner_num,       b.owner_num)
1129             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1130             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1131             self.failUnlessEqual(a.nodeid,          b.nodeid)
1132             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1133
1134     def test_leases(self):
1135         ss = self.create("test_leases")
1136         def secrets(n):
1137             return ( self.write_enabler("we1"),
1138                      self.renew_secret("we1-%d" % n),
1139                      self.cancel_secret("we1-%d" % n) )
1140         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1141         write = ss.remote_slot_testv_and_readv_and_writev
1142         read = ss.remote_slot_readv
1143         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1144         self.failUnlessEqual(rc, (True, {}))
1145
1146         # create a random non-numeric file in the bucket directory, to
1147         # exercise the code that's supposed to ignore those.
1148         bucket_dir = os.path.join(self.workdir("test_leases"),
1149                                   "shares", storage_index_to_dir("si1"))
1150         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1151         f.write("you ought to be ignoring me\n")
1152         f.close()
1153
1154         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1155         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1156
1157         # add-lease on a missing storage index is silently ignored
1158         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1159
1160         # re-allocate the slots and use the same secrets, that should update
1161         # the lease
1162         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1163         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1164
1165         # renew it directly
1166         ss.remote_renew_lease("si1", secrets(0)[1])
1167         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1168
1169         # now allocate them with a bunch of different secrets, to trigger the
1170         # extended lease code. Use add_lease for one of them.
1171         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1172         self.failUnlessEqual(len(list(s0.get_leases())), 2)
1173         secrets2 = secrets(2)
1174         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1175         self.failUnlessEqual(len(list(s0.get_leases())), 3)
1176         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1177         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1178         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1179
1180         self.failUnlessEqual(len(list(s0.get_leases())), 6)
1181
1182         # cancel one of them
1183         ss.remote_cancel_lease("si1", secrets(5)[2])
1184         self.failUnlessEqual(len(list(s0.get_leases())), 5)
1185
1186         all_leases = list(s0.get_leases())
1187         # and write enough data to expand the container, forcing the server
1188         # to move the leases
1189         write("si1", secrets(0),
1190               {0: ([], [(0,data)], 200), },
1191               [])
1192
1193         # read back the leases, make sure they're still intact.
1194         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1195
1196         ss.remote_renew_lease("si1", secrets(0)[1])
1197         ss.remote_renew_lease("si1", secrets(1)[1])
1198         ss.remote_renew_lease("si1", secrets(2)[1])
1199         ss.remote_renew_lease("si1", secrets(3)[1])
1200         ss.remote_renew_lease("si1", secrets(4)[1])
1201         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1202         # get a new copy of the leases, with the current timestamps. Reading
1203         # data and failing to renew/cancel leases should leave the timestamps
1204         # alone.
1205         all_leases = list(s0.get_leases())
1206         # renewing with a bogus token should prompt an error message
1207
1208         # examine the exception thus raised, make sure the old nodeid is
1209         # present, to provide for share migration
1210         e = self.failUnlessRaises(IndexError,
1211                                   ss.remote_renew_lease, "si1",
1212                                   secrets(20)[1])
1213         e_s = str(e)
1214         self.failUnlessIn("Unable to renew non-existent lease", e_s)
1215         self.failUnlessIn("I have leases accepted by nodeids:", e_s)
1216         self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
1217
1218         # same for cancelling
1219         self.failUnlessRaises(IndexError,
1220                               ss.remote_cancel_lease, "si1",
1221                               secrets(20)[2])
1222         self.compare_leases(all_leases, list(s0.get_leases()))
1223
1224         # reading shares should not modify the timestamp
1225         read("si1", [], [(0,200)])
1226         self.compare_leases(all_leases, list(s0.get_leases()))
1227
1228         write("si1", secrets(0),
1229               {0: ([], [(200, "make me bigger")], None)}, [])
1230         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1231
1232         write("si1", secrets(0),
1233               {0: ([], [(500, "make me really bigger")], None)}, [])
1234         self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1235
1236         # now cancel them all
1237         ss.remote_cancel_lease("si1", secrets(0)[2])
1238         ss.remote_cancel_lease("si1", secrets(1)[2])
1239         ss.remote_cancel_lease("si1", secrets(2)[2])
1240         ss.remote_cancel_lease("si1", secrets(3)[2])
1241
1242         # the slot should still be there
1243         remaining_shares = read("si1", [], [(0,10)])
1244         self.failUnlessEqual(len(remaining_shares), 1)
1245         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1246
1247         # cancelling a non-existent lease should raise an IndexError
1248         self.failUnlessRaises(IndexError,
1249                               ss.remote_cancel_lease, "si1", "nonsecret")
1250
1251         # and the slot should still be there
1252         remaining_shares = read("si1", [], [(0,10)])
1253         self.failUnlessEqual(len(remaining_shares), 1)
1254         self.failUnlessEqual(len(list(s0.get_leases())), 1)
1255
1256         ss.remote_cancel_lease("si1", secrets(4)[2])
1257         # now the slot should be gone
1258         no_shares = read("si1", [], [(0,10)])
1259         self.failUnlessEqual(no_shares, {})
1260
1261         # cancelling a lease on a non-existent share should raise an IndexError
1262         self.failUnlessRaises(IndexError,
1263                               ss.remote_cancel_lease, "si2", "nonsecret")
1264
1265     def test_remove(self):
1266         ss = self.create("test_remove")
1267         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1268                       set([0,1,2]), 100)
1269         readv = ss.remote_slot_readv
1270         writev = ss.remote_slot_testv_and_readv_and_writev
1271         secrets = ( self.write_enabler("we1"),
1272                     self.renew_secret("we1"),
1273                     self.cancel_secret("we1") )
1274         # delete sh0 by setting its size to zero
1275         answer = writev("si1", secrets,
1276                         {0: ([], [], 0)},
1277                         [])
1278         # the answer should mention all the shares that existed before the
1279         # write
1280         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1281         # but a new read should show only sh1 and sh2
1282         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1283                              {1: [""], 2: [""]})
1284
1285         # delete sh1 by setting its size to zero
1286         answer = writev("si1", secrets,
1287                         {1: ([], [], 0)},
1288                         [])
1289         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1290         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1291                              {2: [""]})
1292
1293         # delete sh2 by setting its size to zero
1294         answer = writev("si1", secrets,
1295                         {2: ([], [], 0)},
1296                         [])
1297         self.failUnlessEqual(answer, (True, {2:[]}) )
1298         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1299                              {})
1300         # and the bucket directory should now be gone
1301         si = base32.b2a("si1")
1302         # note: this is a detail of the storage server implementation, and
1303         # may change in the future
1304         prefix = si[:2]
1305         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1306         bucketdir = os.path.join(prefixdir, si)
1307         self.failUnless(os.path.exists(prefixdir), prefixdir)
1308         self.failIf(os.path.exists(bucketdir), bucketdir)
1309
1310
1311 class MDMFProxies(unittest.TestCase, ShouldFailMixin):
1312     def setUp(self):
1313         self.sparent = LoggingServiceParent()
1314         self._lease_secret = itertools.count()
1315         self.ss = self.create("MDMFProxies storage test server")
1316         self.rref = RemoteBucket()
1317         self.rref.target = self.ss
1318         self.secrets = (self.write_enabler("we_secret"),
1319                         self.renew_secret("renew_secret"),
1320                         self.cancel_secret("cancel_secret"))
1321         self.segment = "aaaaaa"
1322         self.block = "aa"
1323         self.salt = "a" * 16
1324         self.block_hash = "a" * 32
1325         self.block_hash_tree = [self.block_hash for i in xrange(6)]
1326         self.share_hash = self.block_hash
1327         self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
1328         self.signature = "foobarbaz"
1329         self.verification_key = "vvvvvv"
1330         self.encprivkey = "private"
1331         self.root_hash = self.block_hash
1332         self.salt_hash = self.root_hash
1333         self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
1334         self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
1335         self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
1336         # blockhashes and salt hashes are serialized in the same way,
1337         # only we lop off the first element and store that in the
1338         # header.
1339         self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
1340
1341
1342     def tearDown(self):
1343         self.sparent.stopService()
1344         shutil.rmtree(self.workdir("MDMFProxies storage test server"))
1345
1346
1347     def write_enabler(self, we_tag):
1348         return hashutil.tagged_hash("we_blah", we_tag)
1349
1350
1351     def renew_secret(self, tag):
1352         return hashutil.tagged_hash("renew_blah", str(tag))
1353
1354
1355     def cancel_secret(self, tag):
1356         return hashutil.tagged_hash("cancel_blah", str(tag))
1357
1358
1359     def workdir(self, name):
1360         basedir = os.path.join("storage", "MutableServer", name)
1361         return basedir
1362
1363
1364     def create(self, name):
1365         workdir = self.workdir(name)
1366         ss = StorageServer(workdir, "\x00" * 20)
1367         ss.setServiceParent(self.sparent)
1368         return ss
1369
1370
1371     def build_test_mdmf_share(self, tail_segment=False, empty=False):
1372         # Start with the checkstring
1373         data = struct.pack(">BQ32s",
1374                            1,
1375                            0,
1376                            self.root_hash)
1377         self.checkstring = data
1378         # Next, the encoding parameters
1379         if tail_segment:
1380             data += struct.pack(">BBQQ",
1381                                 3,
1382                                 10,
1383                                 6,
1384                                 33)
1385         elif empty:
1386             data += struct.pack(">BBQQ",
1387                                 3,
1388                                 10,
1389                                 0,
1390                                 0)
1391         else:
1392             data += struct.pack(">BBQQ",
1393                                 3,
1394                                 10,
1395                                 6,
1396                                 36)
1397         # Now we'll build the offsets.
1398         sharedata = ""
1399         if not tail_segment and not empty:
1400             for i in xrange(6):
1401                 sharedata += self.salt + self.block
1402         elif tail_segment:
1403             for i in xrange(5):
1404                 sharedata += self.salt + self.block
1405             sharedata += self.salt + "a"
1406
1407         # The encrypted private key comes after the shares + salts
1408         offset_size = struct.calcsize(MDMFOFFSETS)
1409         encrypted_private_key_offset = len(data) + offset_size
1410         # The share has chain comes after the private key
1411         sharehashes_offset = encrypted_private_key_offset + \
1412             len(self.encprivkey)
1413
1414         # The signature comes after the share hash chain.
1415         signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
1416
1417         verification_key_offset = signature_offset + len(self.signature)
1418         verification_key_end = verification_key_offset + \
1419             len(self.verification_key)
1420
1421         share_data_offset = offset_size
1422         share_data_offset += PRIVATE_KEY_SIZE
1423         share_data_offset += SIGNATURE_SIZE
1424         share_data_offset += VERIFICATION_KEY_SIZE
1425         share_data_offset += SHARE_HASH_CHAIN_SIZE
1426
1427         blockhashes_offset = share_data_offset + len(sharedata)
1428         eof_offset = blockhashes_offset + len(self.block_hash_tree_s)
1429
1430         data += struct.pack(MDMFOFFSETS,
1431                             encrypted_private_key_offset,
1432                             sharehashes_offset,
1433                             signature_offset,
1434                             verification_key_offset,
1435                             verification_key_end,
1436                             share_data_offset,
1437                             blockhashes_offset,
1438                             eof_offset)
1439
1440         self.offsets = {}
1441         self.offsets['enc_privkey'] = encrypted_private_key_offset
1442         self.offsets['block_hash_tree'] = blockhashes_offset
1443         self.offsets['share_hash_chain'] = sharehashes_offset
1444         self.offsets['signature'] = signature_offset
1445         self.offsets['verification_key'] = verification_key_offset
1446         self.offsets['share_data'] = share_data_offset
1447         self.offsets['verification_key_end'] = verification_key_end
1448         self.offsets['EOF'] = eof_offset
1449
1450         # the private key,
1451         data += self.encprivkey
1452         # the sharehashes
1453         data += self.share_hash_chain_s
1454         # the signature,
1455         data += self.signature
1456         # and the verification key
1457         data += self.verification_key
1458         # Then we'll add in gibberish until we get to the right point.
1459         nulls = "".join([" " for i in xrange(len(data), share_data_offset)])
1460         data += nulls
1461
1462         # Then the share data
1463         data += sharedata
1464         # the blockhashes
1465         data += self.block_hash_tree_s
1466         return data
1467
1468
1469     def write_test_share_to_server(self,
1470                                    storage_index,
1471                                    tail_segment=False,
1472                                    empty=False):
1473         """
1474         I write some data for the read tests to read to self.ss
1475
1476         If tail_segment=True, then I will write a share that has a
1477         smaller tail segment than other segments.
1478         """
1479         write = self.ss.remote_slot_testv_and_readv_and_writev
1480         data = self.build_test_mdmf_share(tail_segment, empty)
1481         # Finally, we write the whole thing to the storage server in one
1482         # pass.
1483         testvs = [(0, 1, "eq", "")]
1484         tws = {}
1485         tws[0] = (testvs, [(0, data)], None)
1486         readv = [(0, 1)]
1487         results = write(storage_index, self.secrets, tws, readv)
1488         self.failUnless(results[0])
1489
1490
1491     def build_test_sdmf_share(self, empty=False):
1492         if empty:
1493             sharedata = ""
1494         else:
1495             sharedata = self.segment * 6
1496         self.sharedata = sharedata
1497         blocksize = len(sharedata) / 3
1498         block = sharedata[:blocksize]
1499         self.blockdata = block
1500         prefix = struct.pack(">BQ32s16s BBQQ",
1501                              0, # version,
1502                              0,
1503                              self.root_hash,
1504                              self.salt,
1505                              3,
1506                              10,
1507                              len(sharedata),
1508                              len(sharedata),
1509                             )
1510         post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
1511         signature_offset = post_offset + len(self.verification_key)
1512         sharehashes_offset = signature_offset + len(self.signature)
1513         blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
1514         sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
1515         encprivkey_offset = sharedata_offset + len(block)
1516         eof_offset = encprivkey_offset + len(self.encprivkey)
1517         offsets = struct.pack(">LLLLQQ",
1518                               signature_offset,
1519                               sharehashes_offset,
1520                               blockhashes_offset,
1521                               sharedata_offset,
1522                               encprivkey_offset,
1523                               eof_offset)
1524         final_share = "".join([prefix,
1525                            offsets,
1526                            self.verification_key,
1527                            self.signature,
1528                            self.share_hash_chain_s,
1529                            self.block_hash_tree_s,
1530                            block,
1531                            self.encprivkey])
1532         self.offsets = {}
1533         self.offsets['signature'] = signature_offset
1534         self.offsets['share_hash_chain'] = sharehashes_offset
1535         self.offsets['block_hash_tree'] = blockhashes_offset
1536         self.offsets['share_data'] = sharedata_offset
1537         self.offsets['enc_privkey'] = encprivkey_offset
1538         self.offsets['EOF'] = eof_offset
1539         return final_share
1540
1541
1542     def write_sdmf_share_to_server(self,
1543                                    storage_index,
1544                                    empty=False):
1545         # Some tests need SDMF shares to verify that we can still 
1546         # read them. This method writes one, which resembles but is not
1547         assert self.rref
1548         write = self.ss.remote_slot_testv_and_readv_and_writev
1549         share = self.build_test_sdmf_share(empty)
1550         testvs = [(0, 1, "eq", "")]
1551         tws = {}
1552         tws[0] = (testvs, [(0, share)], None)
1553         readv = []
1554         results = write(storage_index, self.secrets, tws, readv)
1555         self.failUnless(results[0])
1556
1557
1558     def test_read(self):
1559         self.write_test_share_to_server("si1")
1560         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1561         # Check that every method equals what we expect it to.
1562         d = defer.succeed(None)
1563         def _check_block_and_salt((block, salt)):
1564             self.failUnlessEqual(block, self.block)
1565             self.failUnlessEqual(salt, self.salt)
1566
1567         for i in xrange(6):
1568             d.addCallback(lambda ignored, i=i:
1569                 mr.get_block_and_salt(i))
1570             d.addCallback(_check_block_and_salt)
1571
1572         d.addCallback(lambda ignored:
1573             mr.get_encprivkey())
1574         d.addCallback(lambda encprivkey:
1575             self.failUnlessEqual(self.encprivkey, encprivkey))
1576
1577         d.addCallback(lambda ignored:
1578             mr.get_blockhashes())
1579         d.addCallback(lambda blockhashes:
1580             self.failUnlessEqual(self.block_hash_tree, blockhashes))
1581
1582         d.addCallback(lambda ignored:
1583             mr.get_sharehashes())
1584         d.addCallback(lambda sharehashes:
1585             self.failUnlessEqual(self.share_hash_chain, sharehashes))
1586
1587         d.addCallback(lambda ignored:
1588             mr.get_signature())
1589         d.addCallback(lambda signature:
1590             self.failUnlessEqual(signature, self.signature))
1591
1592         d.addCallback(lambda ignored:
1593             mr.get_verification_key())
1594         d.addCallback(lambda verification_key:
1595             self.failUnlessEqual(verification_key, self.verification_key))
1596
1597         d.addCallback(lambda ignored:
1598             mr.get_seqnum())
1599         d.addCallback(lambda seqnum:
1600             self.failUnlessEqual(seqnum, 0))
1601
1602         d.addCallback(lambda ignored:
1603             mr.get_root_hash())
1604         d.addCallback(lambda root_hash:
1605             self.failUnlessEqual(self.root_hash, root_hash))
1606
1607         d.addCallback(lambda ignored:
1608             mr.get_seqnum())
1609         d.addCallback(lambda seqnum:
1610             self.failUnlessEqual(0, seqnum))
1611
1612         d.addCallback(lambda ignored:
1613             mr.get_encoding_parameters())
1614         def _check_encoding_parameters((k, n, segsize, datalen)):
1615             self.failUnlessEqual(k, 3)
1616             self.failUnlessEqual(n, 10)
1617             self.failUnlessEqual(segsize, 6)
1618             self.failUnlessEqual(datalen, 36)
1619         d.addCallback(_check_encoding_parameters)
1620
1621         d.addCallback(lambda ignored:
1622             mr.get_checkstring())
1623         d.addCallback(lambda checkstring:
1624             self.failUnlessEqual(checkstring, checkstring))
1625         return d
1626
1627
1628     def test_read_with_different_tail_segment_size(self):
1629         self.write_test_share_to_server("si1", tail_segment=True)
1630         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1631         d = mr.get_block_and_salt(5)
1632         def _check_tail_segment(results):
1633             block, salt = results
1634             self.failUnlessEqual(len(block), 1)
1635             self.failUnlessEqual(block, "a")
1636         d.addCallback(_check_tail_segment)
1637         return d
1638
1639
1640     def test_get_block_with_invalid_segnum(self):
1641         self.write_test_share_to_server("si1")
1642         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1643         d = defer.succeed(None)
1644         d.addCallback(lambda ignored:
1645             self.shouldFail(LayoutInvalid, "test invalid segnum",
1646                             None,
1647                             mr.get_block_and_salt, 7))
1648         return d
1649
1650
1651     def test_get_encoding_parameters_first(self):
1652         self.write_test_share_to_server("si1")
1653         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1654         d = mr.get_encoding_parameters()
1655         def _check_encoding_parameters((k, n, segment_size, datalen)):
1656             self.failUnlessEqual(k, 3)
1657             self.failUnlessEqual(n, 10)
1658             self.failUnlessEqual(segment_size, 6)
1659             self.failUnlessEqual(datalen, 36)
1660         d.addCallback(_check_encoding_parameters)
1661         return d
1662
1663
1664     def test_get_seqnum_first(self):
1665         self.write_test_share_to_server("si1")
1666         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1667         d = mr.get_seqnum()
1668         d.addCallback(lambda seqnum:
1669             self.failUnlessEqual(seqnum, 0))
1670         return d
1671
1672
1673     def test_get_root_hash_first(self):
1674         self.write_test_share_to_server("si1")
1675         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1676         d = mr.get_root_hash()
1677         d.addCallback(lambda root_hash:
1678             self.failUnlessEqual(root_hash, self.root_hash))
1679         return d
1680
1681
1682     def test_get_checkstring_first(self):
1683         self.write_test_share_to_server("si1")
1684         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
1685         d = mr.get_checkstring()
1686         d.addCallback(lambda checkstring:
1687             self.failUnlessEqual(checkstring, self.checkstring))
1688         return d
1689
1690
1691     def test_write_read_vectors(self):
1692         # When writing for us, the storage server will return to us a
1693         # read vector, along with its result. If a write fails because
1694         # the test vectors failed, this read vector can help us to
1695         # diagnose the problem. This test ensures that the read vector
1696         # is working appropriately.
1697         mw = self._make_new_mw("si1", 0)
1698
1699         for i in xrange(6):
1700             mw.put_block(self.block, i, self.salt)
1701         mw.put_encprivkey(self.encprivkey)
1702         mw.put_blockhashes(self.block_hash_tree)
1703         mw.put_sharehashes(self.share_hash_chain)
1704         mw.put_root_hash(self.root_hash)
1705         mw.put_signature(self.signature)
1706         mw.put_verification_key(self.verification_key)
1707         d = mw.finish_publishing()
1708         def _then(results):
1709             self.failUnless(len(results), 2)
1710             result, readv = results
1711             self.failUnless(result)
1712             self.failIf(readv)
1713             self.old_checkstring = mw.get_checkstring()
1714             mw.set_checkstring("")
1715         d.addCallback(_then)
1716         d.addCallback(lambda ignored:
1717             mw.finish_publishing())
1718         def _then_again(results):
1719             self.failUnlessEqual(len(results), 2)
1720             result, readvs = results
1721             self.failIf(result)
1722             self.failUnlessIn(0, readvs)
1723             readv = readvs[0][0]
1724             self.failUnlessEqual(readv, self.old_checkstring)
1725         d.addCallback(_then_again)
1726         # The checkstring remains the same for the rest of the process.
1727         return d
1728
1729
1730     def test_private_key_after_share_hash_chain(self):
1731         mw = self._make_new_mw("si1", 0)
1732         d = defer.succeed(None)
1733         for i in xrange(6):
1734             d.addCallback(lambda ignored, i=i:
1735                 mw.put_block(self.block, i, self.salt))
1736         d.addCallback(lambda ignored:
1737             mw.put_encprivkey(self.encprivkey))
1738         d.addCallback(lambda ignored:
1739             mw.put_sharehashes(self.share_hash_chain))
1740
1741         # Now try to put the private key again.
1742         d.addCallback(lambda ignored:
1743             self.shouldFail(LayoutInvalid, "test repeat private key",
1744                             None,
1745                             mw.put_encprivkey, self.encprivkey))
1746         return d
1747
1748
1749     def test_signature_after_verification_key(self):
1750         mw = self._make_new_mw("si1", 0)
1751         d = defer.succeed(None)
1752         # Put everything up to and including the verification key.
1753         for i in xrange(6):
1754             d.addCallback(lambda ignored, i=i:
1755                 mw.put_block(self.block, i, self.salt))
1756         d.addCallback(lambda ignored:
1757             mw.put_encprivkey(self.encprivkey))
1758         d.addCallback(lambda ignored:
1759             mw.put_blockhashes(self.block_hash_tree))
1760         d.addCallback(lambda ignored:
1761             mw.put_sharehashes(self.share_hash_chain))
1762         d.addCallback(lambda ignored:
1763             mw.put_root_hash(self.root_hash))
1764         d.addCallback(lambda ignored:
1765             mw.put_signature(self.signature))
1766         d.addCallback(lambda ignored:
1767             mw.put_verification_key(self.verification_key))
1768         # Now try to put the signature again. This should fail
1769         d.addCallback(lambda ignored:
1770             self.shouldFail(LayoutInvalid, "signature after verification",
1771                             None,
1772                             mw.put_signature, self.signature))
1773         return d
1774
1775
1776     def test_uncoordinated_write(self):
1777         # Make two mutable writers, both pointing to the same storage
1778         # server, both at the same storage index, and try writing to the
1779         # same share.
1780         mw1 = self._make_new_mw("si1", 0)
1781         mw2 = self._make_new_mw("si1", 0)
1782
1783         def _check_success(results):
1784             result, readvs = results
1785             self.failUnless(result)
1786
1787         def _check_failure(results):
1788             result, readvs = results
1789             self.failIf(result)
1790
1791         def _write_share(mw):
1792             for i in xrange(6):
1793                 mw.put_block(self.block, i, self.salt)
1794             mw.put_encprivkey(self.encprivkey)
1795             mw.put_blockhashes(self.block_hash_tree)
1796             mw.put_sharehashes(self.share_hash_chain)
1797             mw.put_root_hash(self.root_hash)
1798             mw.put_signature(self.signature)
1799             mw.put_verification_key(self.verification_key)
1800             return mw.finish_publishing()
1801         d = _write_share(mw1)
1802         d.addCallback(_check_success)
1803         d.addCallback(lambda ignored:
1804             _write_share(mw2))
1805         d.addCallback(_check_failure)
1806         return d
1807
1808
1809     def test_invalid_salt_size(self):
1810         # Salts need to be 16 bytes in size. Writes that attempt to
1811         # write more or less than this should be rejected.
1812         mw = self._make_new_mw("si1", 0)
1813         invalid_salt = "a" * 17 # 17 bytes
1814         another_invalid_salt = "b" * 15 # 15 bytes
1815         d = defer.succeed(None)
1816         d.addCallback(lambda ignored:
1817             self.shouldFail(LayoutInvalid, "salt too big",
1818                             None,
1819                             mw.put_block, self.block, 0, invalid_salt))
1820         d.addCallback(lambda ignored:
1821             self.shouldFail(LayoutInvalid, "salt too small",
1822                             None,
1823                             mw.put_block, self.block, 0,
1824                             another_invalid_salt))
1825         return d
1826
1827
1828     def test_write_test_vectors(self):
1829         # If we give the write proxy a bogus test vector at 
1830         # any point during the process, it should fail to write when we 
1831         # tell it to write.
1832         def _check_failure(results):
1833             self.failUnlessEqual(len(results), 2)
1834             res, d = results
1835             self.failIf(res)
1836
1837         def _check_success(results):
1838             self.failUnlessEqual(len(results), 2)
1839             res, d = results
1840             self.failUnless(results)
1841
1842         mw = self._make_new_mw("si1", 0)
1843         mw.set_checkstring("this is a lie")
1844         for i in xrange(6):
1845             mw.put_block(self.block, i, self.salt)
1846         mw.put_encprivkey(self.encprivkey)
1847         mw.put_blockhashes(self.block_hash_tree)
1848         mw.put_sharehashes(self.share_hash_chain)
1849         mw.put_root_hash(self.root_hash)
1850         mw.put_signature(self.signature)
1851         mw.put_verification_key(self.verification_key)
1852         d = mw.finish_publishing()
1853         d.addCallback(_check_failure)
1854         d.addCallback(lambda ignored:
1855             mw.set_checkstring(""))
1856         d.addCallback(lambda ignored:
1857             mw.finish_publishing())
1858         d.addCallback(_check_success)
1859         return d
1860
1861
1862     def serialize_blockhashes(self, blockhashes):
1863         return "".join(blockhashes)
1864
1865
1866     def serialize_sharehashes(self, sharehashes):
1867         ret = "".join([struct.pack(">H32s", i, sharehashes[i])
1868                         for i in sorted(sharehashes.keys())])
1869         return ret
1870
1871
1872     def test_write(self):
1873         # This translates to a file with 6 6-byte segments, and with 2-byte
1874         # blocks.
1875         mw = self._make_new_mw("si1", 0)
1876         # Test writing some blocks.
1877         read = self.ss.remote_slot_readv
1878         expected_private_key_offset = struct.calcsize(MDMFHEADER)
1879         expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
1880                                     PRIVATE_KEY_SIZE + \
1881                                     SIGNATURE_SIZE + \
1882                                     VERIFICATION_KEY_SIZE + \
1883                                     SHARE_HASH_CHAIN_SIZE
1884         written_block_size = 2 + len(self.salt)
1885         written_block = self.block + self.salt
1886         for i in xrange(6):
1887             mw.put_block(self.block, i, self.salt)
1888
1889         mw.put_encprivkey(self.encprivkey)
1890         mw.put_blockhashes(self.block_hash_tree)
1891         mw.put_sharehashes(self.share_hash_chain)
1892         mw.put_root_hash(self.root_hash)
1893         mw.put_signature(self.signature)
1894         mw.put_verification_key(self.verification_key)
1895         d = mw.finish_publishing()
1896         def _check_publish(results):
1897             self.failUnlessEqual(len(results), 2)
1898             result, ign = results
1899             self.failUnless(result, "publish failed")
1900             for i in xrange(6):
1901                 self.failUnlessEqual(read("si1", [0], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]),
1902                                 {0: [written_block]})
1903
1904             self.failUnlessEqual(len(self.encprivkey), 7)
1905             self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]),
1906                                  {0: [self.encprivkey]})
1907
1908             expected_block_hash_offset = expected_sharedata_offset + \
1909                         (6 * written_block_size)
1910             self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6)
1911             self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]),
1912                                  {0: [self.block_hash_tree_s]})
1913
1914             expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey)
1915             self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
1916                                  {0: [self.share_hash_chain_s]})
1917
1918             self.failUnlessEqual(read("si1", [0], [(9, 32)]),
1919                                  {0: [self.root_hash]})
1920             expected_signature_offset = expected_share_hash_offset + \
1921                 len(self.share_hash_chain_s)
1922             self.failUnlessEqual(len(self.signature), 9)
1923             self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]),
1924                                  {0: [self.signature]})
1925
1926             expected_verification_key_offset = expected_signature_offset + len(self.signature)
1927             self.failUnlessEqual(len(self.verification_key), 6)
1928             self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]),
1929                                  {0: [self.verification_key]})
1930
1931             signable = mw.get_signable()
1932             verno, seq, roothash, k, n, segsize, datalen = \
1933                                             struct.unpack(">BQ32sBBQQ",
1934                                                           signable)
1935             self.failUnlessEqual(verno, 1)
1936             self.failUnlessEqual(seq, 0)
1937             self.failUnlessEqual(roothash, self.root_hash)
1938             self.failUnlessEqual(k, 3)
1939             self.failUnlessEqual(n, 10)
1940             self.failUnlessEqual(segsize, 6)
1941             self.failUnlessEqual(datalen, 36)
1942             expected_eof_offset = expected_block_hash_offset + \
1943                 len(self.block_hash_tree_s)
1944
1945             # Check the version number to make sure that it is correct.
1946             expected_version_number = struct.pack(">B", 1)
1947             self.failUnlessEqual(read("si1", [0], [(0, 1)]),
1948                                  {0: [expected_version_number]})
1949             # Check the sequence number to make sure that it is correct
1950             expected_sequence_number = struct.pack(">Q", 0)
1951             self.failUnlessEqual(read("si1", [0], [(1, 8)]),
1952                                  {0: [expected_sequence_number]})
1953             # Check that the encoding parameters (k, N, segement size, data
1954             # length) are what they should be. These are  3, 10, 6, 36
1955             expected_k = struct.pack(">B", 3)
1956             self.failUnlessEqual(read("si1", [0], [(41, 1)]),
1957                                  {0: [expected_k]})
1958             expected_n = struct.pack(">B", 10)
1959             self.failUnlessEqual(read("si1", [0], [(42, 1)]),
1960                                  {0: [expected_n]})
1961             expected_segment_size = struct.pack(">Q", 6)
1962             self.failUnlessEqual(read("si1", [0], [(43, 8)]),
1963                                  {0: [expected_segment_size]})
1964             expected_data_length = struct.pack(">Q", 36)
1965             self.failUnlessEqual(read("si1", [0], [(51, 8)]),
1966                                  {0: [expected_data_length]})
1967             expected_offset = struct.pack(">Q", expected_private_key_offset)
1968             self.failUnlessEqual(read("si1", [0], [(59, 8)]),
1969                                  {0: [expected_offset]})
1970             expected_offset = struct.pack(">Q", expected_share_hash_offset)
1971             self.failUnlessEqual(read("si1", [0], [(67, 8)]),
1972                                  {0: [expected_offset]})
1973             expected_offset = struct.pack(">Q", expected_signature_offset)
1974             self.failUnlessEqual(read("si1", [0], [(75, 8)]),
1975                                  {0: [expected_offset]})
1976             expected_offset = struct.pack(">Q", expected_verification_key_offset)
1977             self.failUnlessEqual(read("si1", [0], [(83, 8)]),
1978                                  {0: [expected_offset]})
1979             expected_offset = struct.pack(">Q", expected_verification_key_offset + len(self.verification_key))
1980             self.failUnlessEqual(read("si1", [0], [(91, 8)]),
1981                                  {0: [expected_offset]})
1982             expected_offset = struct.pack(">Q", expected_sharedata_offset)
1983             self.failUnlessEqual(read("si1", [0], [(99, 8)]),
1984                                  {0: [expected_offset]})
1985             expected_offset = struct.pack(">Q", expected_block_hash_offset)
1986             self.failUnlessEqual(read("si1", [0], [(107, 8)]),
1987                                  {0: [expected_offset]})
1988             expected_offset = struct.pack(">Q", expected_eof_offset)
1989             self.failUnlessEqual(read("si1", [0], [(115, 8)]),
1990                                  {0: [expected_offset]})
1991         d.addCallback(_check_publish)
1992         return d
1993
1994     def _make_new_mw(self, si, share, datalength=36):
1995         # This is a file of size 36 bytes. Since it has a segment
1996         # size of 6, we know that it has 6 byte segments, which will
1997         # be split into blocks of 2 bytes because our FEC k
1998         # parameter is 3.
1999         mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
2000                                 6, datalength)
2001         return mw
2002
2003
2004     def test_write_rejected_with_too_many_blocks(self):
2005         mw = self._make_new_mw("si0", 0)
2006
2007         # Try writing too many blocks. We should not be able to write
2008         # more than 6
2009         # blocks into each share.
2010         d = defer.succeed(None)
2011         for i in xrange(6):
2012             d.addCallback(lambda ignored, i=i:
2013                 mw.put_block(self.block, i, self.salt))
2014         d.addCallback(lambda ignored:
2015             self.shouldFail(LayoutInvalid, "too many blocks",
2016                             None,
2017                             mw.put_block, self.block, 7, self.salt))
2018         return d
2019
2020
2021     def test_write_rejected_with_invalid_salt(self):
2022         # Try writing an invalid salt. Salts are 16 bytes -- any more or
2023         # less should cause an error.
2024         mw = self._make_new_mw("si1", 0)
2025         bad_salt = "a" * 17 # 17 bytes
2026         d = defer.succeed(None)
2027         d.addCallback(lambda ignored:
2028             self.shouldFail(LayoutInvalid, "test_invalid_salt",
2029                             None, mw.put_block, self.block, 7, bad_salt))
2030         return d
2031
2032
2033     def test_write_rejected_with_invalid_root_hash(self):
2034         # Try writing an invalid root hash. This should be SHA256d, and
2035         # 32 bytes long as a result.
2036         mw = self._make_new_mw("si2", 0)
2037         # 17 bytes != 32 bytes
2038         invalid_root_hash = "a" * 17
2039         d = defer.succeed(None)
2040         # Before this test can work, we need to put some blocks + salts,
2041         # a block hash tree, and a share hash tree. Otherwise, we'll see
2042         # failures that match what we are looking for, but are caused by
2043         # the constraints imposed on operation ordering.
2044         for i in xrange(6):
2045             d.addCallback(lambda ignored, i=i:
2046                 mw.put_block(self.block, i, self.salt))
2047         d.addCallback(lambda ignored:
2048             mw.put_encprivkey(self.encprivkey))
2049         d.addCallback(lambda ignored:
2050             mw.put_blockhashes(self.block_hash_tree))
2051         d.addCallback(lambda ignored:
2052             mw.put_sharehashes(self.share_hash_chain))
2053         d.addCallback(lambda ignored:
2054             self.shouldFail(LayoutInvalid, "invalid root hash",
2055                             None, mw.put_root_hash, invalid_root_hash))
2056         return d
2057
2058
2059     def test_write_rejected_with_invalid_blocksize(self):
2060         # The blocksize implied by the writer that we get from
2061         # _make_new_mw is 2bytes -- any more or any less than this
2062         # should be cause for failure, unless it is the tail segment, in
2063         # which case it may not be failure.
2064         invalid_block = "a"
2065         mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
2066                                              # one byte blocks
2067         # 1 bytes != 2 bytes
2068         d = defer.succeed(None)
2069         d.addCallback(lambda ignored, invalid_block=invalid_block:
2070             self.shouldFail(LayoutInvalid, "test blocksize too small",
2071                             None, mw.put_block, invalid_block, 0,
2072                             self.salt))
2073         invalid_block = invalid_block * 3
2074         # 3 bytes != 2 bytes
2075         d.addCallback(lambda ignored:
2076             self.shouldFail(LayoutInvalid, "test blocksize too large",
2077                             None,
2078                             mw.put_block, invalid_block, 0, self.salt))
2079         for i in xrange(5):
2080             d.addCallback(lambda ignored, i=i:
2081                 mw.put_block(self.block, i, self.salt))
2082         # Try to put an invalid tail segment
2083         d.addCallback(lambda ignored:
2084             self.shouldFail(LayoutInvalid, "test invalid tail segment",
2085                             None,
2086                             mw.put_block, self.block, 5, self.salt))
2087         valid_block = "a"
2088         d.addCallback(lambda ignored:
2089             mw.put_block(valid_block, 5, self.salt))
2090         return d
2091
2092
2093     def test_write_enforces_order_constraints(self):
2094         # We require that the MDMFSlotWriteProxy be interacted with in a
2095         # specific way.
2096         # That way is:
2097         # 0: __init__
2098         # 1: write blocks and salts
2099         # 2: Write the encrypted private key
2100         # 3: Write the block hashes
2101         # 4: Write the share hashes
2102         # 5: Write the root hash and salt hash
2103         # 6: Write the signature and verification key
2104         # 7: Write the file.
2105         # 
2106         # Some of these can be performed out-of-order, and some can't.
2107         # The dependencies that I want to test here are:
2108         #  - Private key before block hashes
2109         #  - share hashes and block hashes before root hash
2110         #  - root hash before signature
2111         #  - signature before verification key
2112         mw0 = self._make_new_mw("si0", 0)
2113         # Write some shares
2114         d = defer.succeed(None)
2115         for i in xrange(6):
2116             d.addCallback(lambda ignored, i=i:
2117                 mw0.put_block(self.block, i, self.salt))
2118
2119         # Try to write the share hash chain without writing the
2120         # encrypted private key
2121         d.addCallback(lambda ignored:
2122             self.shouldFail(LayoutInvalid, "share hash chain before "
2123                                            "private key",
2124                             None,
2125                             mw0.put_sharehashes, self.share_hash_chain))
2126         # Write the private key.
2127         d.addCallback(lambda ignored:
2128             mw0.put_encprivkey(self.encprivkey))
2129
2130         # Now write the block hashes and try again
2131         d.addCallback(lambda ignored:
2132             mw0.put_blockhashes(self.block_hash_tree))
2133
2134         # We haven't yet put the root hash on the share, so we shouldn't
2135         # be able to sign it.
2136         d.addCallback(lambda ignored:
2137             self.shouldFail(LayoutInvalid, "signature before root hash",
2138                             None, mw0.put_signature, self.signature))
2139
2140         d.addCallback(lambda ignored:
2141             self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
2142
2143         # ..and, since that fails, we also shouldn't be able to put the
2144         # verification key.
2145         d.addCallback(lambda ignored:
2146             self.shouldFail(LayoutInvalid, "key before signature",
2147                             None, mw0.put_verification_key,
2148                             self.verification_key))
2149
2150         # Now write the share hashes.
2151         d.addCallback(lambda ignored:
2152             mw0.put_sharehashes(self.share_hash_chain))
2153         # We should be able to write the root hash now too
2154         d.addCallback(lambda ignored:
2155             mw0.put_root_hash(self.root_hash))
2156
2157         # We should still be unable to put the verification key
2158         d.addCallback(lambda ignored:
2159             self.shouldFail(LayoutInvalid, "key before signature",
2160                             None, mw0.put_verification_key,
2161                             self.verification_key))
2162
2163         d.addCallback(lambda ignored:
2164             mw0.put_signature(self.signature))
2165
2166         # We shouldn't be able to write the offsets to the remote server
2167         # until the offset table is finished; IOW, until we have written
2168         # the verification key.
2169         d.addCallback(lambda ignored:
2170             self.shouldFail(LayoutInvalid, "offsets before verification key",
2171                             None,
2172                             mw0.finish_publishing))
2173
2174         d.addCallback(lambda ignored:
2175             mw0.put_verification_key(self.verification_key))
2176         return d
2177
2178
2179     def test_end_to_end(self):
2180         mw = self._make_new_mw("si1", 0)
2181         # Write a share using the mutable writer, and make sure that the
2182         # reader knows how to read everything back to us.
2183         d = defer.succeed(None)
2184         for i in xrange(6):
2185             d.addCallback(lambda ignored, i=i:
2186                 mw.put_block(self.block, i, self.salt))
2187         d.addCallback(lambda ignored:
2188             mw.put_encprivkey(self.encprivkey))
2189         d.addCallback(lambda ignored:
2190             mw.put_blockhashes(self.block_hash_tree))
2191         d.addCallback(lambda ignored:
2192             mw.put_sharehashes(self.share_hash_chain))
2193         d.addCallback(lambda ignored:
2194             mw.put_root_hash(self.root_hash))
2195         d.addCallback(lambda ignored:
2196             mw.put_signature(self.signature))
2197         d.addCallback(lambda ignored:
2198             mw.put_verification_key(self.verification_key))
2199         d.addCallback(lambda ignored:
2200             mw.finish_publishing())
2201
2202         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2203         def _check_block_and_salt((block, salt)):
2204             self.failUnlessEqual(block, self.block)
2205             self.failUnlessEqual(salt, self.salt)
2206
2207         for i in xrange(6):
2208             d.addCallback(lambda ignored, i=i:
2209                 mr.get_block_and_salt(i))
2210             d.addCallback(_check_block_and_salt)
2211
2212         d.addCallback(lambda ignored:
2213             mr.get_encprivkey())
2214         d.addCallback(lambda encprivkey:
2215             self.failUnlessEqual(self.encprivkey, encprivkey))
2216
2217         d.addCallback(lambda ignored:
2218             mr.get_blockhashes())
2219         d.addCallback(lambda blockhashes:
2220             self.failUnlessEqual(self.block_hash_tree, blockhashes))
2221
2222         d.addCallback(lambda ignored:
2223             mr.get_sharehashes())
2224         d.addCallback(lambda sharehashes:
2225             self.failUnlessEqual(self.share_hash_chain, sharehashes))
2226
2227         d.addCallback(lambda ignored:
2228             mr.get_signature())
2229         d.addCallback(lambda signature:
2230             self.failUnlessEqual(signature, self.signature))
2231
2232         d.addCallback(lambda ignored:
2233             mr.get_verification_key())
2234         d.addCallback(lambda verification_key:
2235             self.failUnlessEqual(verification_key, self.verification_key))
2236
2237         d.addCallback(lambda ignored:
2238             mr.get_seqnum())
2239         d.addCallback(lambda seqnum:
2240             self.failUnlessEqual(seqnum, 0))
2241
2242         d.addCallback(lambda ignored:
2243             mr.get_root_hash())
2244         d.addCallback(lambda root_hash:
2245             self.failUnlessEqual(self.root_hash, root_hash))
2246
2247         d.addCallback(lambda ignored:
2248             mr.get_encoding_parameters())
2249         def _check_encoding_parameters((k, n, segsize, datalen)):
2250             self.failUnlessEqual(k, 3)
2251             self.failUnlessEqual(n, 10)
2252             self.failUnlessEqual(segsize, 6)
2253             self.failUnlessEqual(datalen, 36)
2254         d.addCallback(_check_encoding_parameters)
2255
2256         d.addCallback(lambda ignored:
2257             mr.get_checkstring())
2258         d.addCallback(lambda checkstring:
2259             self.failUnlessEqual(checkstring, mw.get_checkstring()))
2260         return d
2261
2262
2263     def test_is_sdmf(self):
2264         # The MDMFSlotReadProxy should also know how to read SDMF files,
2265         # since it will encounter them on the grid. Callers use the
2266         # is_sdmf method to test this.
2267         self.write_sdmf_share_to_server("si1")
2268         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2269         d = mr.is_sdmf()
2270         d.addCallback(lambda issdmf:
2271             self.failUnless(issdmf))
2272         return d
2273
2274
2275     def test_reads_sdmf(self):
2276         # The slot read proxy should, naturally, know how to tell us
2277         # about data in the SDMF format
2278         self.write_sdmf_share_to_server("si1")
2279         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2280         d = defer.succeed(None)
2281         d.addCallback(lambda ignored:
2282             mr.is_sdmf())
2283         d.addCallback(lambda issdmf:
2284             self.failUnless(issdmf))
2285
2286         # What do we need to read?
2287         #  - The sharedata
2288         #  - The salt
2289         d.addCallback(lambda ignored:
2290             mr.get_block_and_salt(0))
2291         def _check_block_and_salt(results):
2292             block, salt = results
2293             # Our original file is 36 bytes long. Then each share is 12
2294             # bytes in size. The share is composed entirely of the
2295             # letter a. self.block contains 2 as, so 6 * self.block is
2296             # what we are looking for.
2297             self.failUnlessEqual(block, self.block * 6)
2298             self.failUnlessEqual(salt, self.salt)
2299         d.addCallback(_check_block_and_salt)
2300
2301         #  - The blockhashes
2302         d.addCallback(lambda ignored:
2303             mr.get_blockhashes())
2304         d.addCallback(lambda blockhashes:
2305             self.failUnlessEqual(self.block_hash_tree,
2306                                  blockhashes,
2307                                  blockhashes))
2308         #  - The sharehashes
2309         d.addCallback(lambda ignored:
2310             mr.get_sharehashes())
2311         d.addCallback(lambda sharehashes:
2312             self.failUnlessEqual(self.share_hash_chain,
2313                                  sharehashes))
2314         #  - The keys
2315         d.addCallback(lambda ignored:
2316             mr.get_encprivkey())
2317         d.addCallback(lambda encprivkey:
2318             self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
2319         d.addCallback(lambda ignored:
2320             mr.get_verification_key())
2321         d.addCallback(lambda verification_key:
2322             self.failUnlessEqual(verification_key,
2323                                  self.verification_key,
2324                                  verification_key))
2325         #  - The signature
2326         d.addCallback(lambda ignored:
2327             mr.get_signature())
2328         d.addCallback(lambda signature:
2329             self.failUnlessEqual(signature, self.signature, signature))
2330
2331         #  - The sequence number
2332         d.addCallback(lambda ignored:
2333             mr.get_seqnum())
2334         d.addCallback(lambda seqnum:
2335             self.failUnlessEqual(seqnum, 0, seqnum))
2336
2337         #  - The root hash
2338         d.addCallback(lambda ignored:
2339             mr.get_root_hash())
2340         d.addCallback(lambda root_hash:
2341             self.failUnlessEqual(root_hash, self.root_hash, root_hash))
2342         return d
2343
2344
2345     def test_only_reads_one_segment_sdmf(self):
2346         # SDMF shares have only one segment, so it doesn't make sense to
2347         # read more segments than that. The reader should know this and
2348         # complain if we try to do that.
2349         self.write_sdmf_share_to_server("si1")
2350         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2351         d = defer.succeed(None)
2352         d.addCallback(lambda ignored:
2353             mr.is_sdmf())
2354         d.addCallback(lambda issdmf:
2355             self.failUnless(issdmf))
2356         d.addCallback(lambda ignored:
2357             self.shouldFail(LayoutInvalid, "test bad segment",
2358                             None,
2359                             mr.get_block_and_salt, 1))
2360         return d
2361
2362
2363     def test_read_with_prefetched_mdmf_data(self):
2364         # The MDMFSlotReadProxy will prefill certain fields if you pass
2365         # it data that you have already fetched. This is useful for
2366         # cases like the Servermap, which prefetches ~2kb of data while
2367         # finding out which shares are on the remote peer so that it
2368         # doesn't waste round trips.
2369         mdmf_data = self.build_test_mdmf_share()
2370         self.write_test_share_to_server("si1")
2371         def _make_mr(ignored, length):
2372             mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
2373             return mr
2374
2375         d = defer.succeed(None)
2376         # This should be enough to fill in both the encoding parameters
2377         # and the table of offsets, which will complete the version
2378         # information tuple.
2379         d.addCallback(_make_mr, 123)
2380         d.addCallback(lambda mr:
2381             mr.get_verinfo())
2382         def _check_verinfo(verinfo):
2383             self.failUnless(verinfo)
2384             self.failUnlessEqual(len(verinfo), 9)
2385             (seqnum,
2386              root_hash,
2387              salt_hash,
2388              segsize,
2389              datalen,
2390              k,
2391              n,
2392              prefix,
2393              offsets) = verinfo
2394             self.failUnlessEqual(seqnum, 0)
2395             self.failUnlessEqual(root_hash, self.root_hash)
2396             self.failUnlessEqual(segsize, 6)
2397             self.failUnlessEqual(datalen, 36)
2398             self.failUnlessEqual(k, 3)
2399             self.failUnlessEqual(n, 10)
2400             expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
2401                                           1,
2402                                           seqnum,
2403                                           root_hash,
2404                                           k,
2405                                           n,
2406                                           segsize,
2407                                           datalen)
2408             self.failUnlessEqual(expected_prefix, prefix)
2409             self.failUnlessEqual(self.rref.read_count, 0)
2410         d.addCallback(_check_verinfo)
2411         # This is not enough data to read a block and a share, so the
2412         # wrapper should attempt to read this from the remote server.
2413         d.addCallback(_make_mr, 123)
2414         d.addCallback(lambda mr:
2415             mr.get_block_and_salt(0))
2416         def _check_block_and_salt((block, salt)):
2417             self.failUnlessEqual(block, self.block)
2418             self.failUnlessEqual(salt, self.salt)
2419             self.failUnlessEqual(self.rref.read_count, 1)
2420         # This should be enough data to read one block.
2421         d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140)
2422         d.addCallback(lambda mr:
2423             mr.get_block_and_salt(0))
2424         d.addCallback(_check_block_and_salt)
2425         return d
2426
2427
2428     def test_read_with_prefetched_sdmf_data(self):
2429         sdmf_data = self.build_test_sdmf_share()
2430         self.write_sdmf_share_to_server("si1")
2431         def _make_mr(ignored, length):
2432             mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
2433             return mr
2434
2435         d = defer.succeed(None)
2436         # This should be enough to get us the encoding parameters,
2437         # offset table, and everything else we need to build a verinfo
2438         # string.
2439         d.addCallback(_make_mr, 123)
2440         d.addCallback(lambda mr:
2441             mr.get_verinfo())
2442         def _check_verinfo(verinfo):
2443             self.failUnless(verinfo)
2444             self.failUnlessEqual(len(verinfo), 9)
2445             (seqnum,
2446              root_hash,
2447              salt,
2448              segsize,
2449              datalen,
2450              k,
2451              n,
2452              prefix,
2453              offsets) = verinfo
2454             self.failUnlessEqual(seqnum, 0)
2455             self.failUnlessEqual(root_hash, self.root_hash)
2456             self.failUnlessEqual(salt, self.salt)
2457             self.failUnlessEqual(segsize, 36)
2458             self.failUnlessEqual(datalen, 36)
2459             self.failUnlessEqual(k, 3)
2460             self.failUnlessEqual(n, 10)
2461             expected_prefix = struct.pack(SIGNED_PREFIX,
2462                                           0,
2463                                           seqnum,
2464                                           root_hash,
2465                                           salt,
2466                                           k,
2467                                           n,
2468                                           segsize,
2469                                           datalen)
2470             self.failUnlessEqual(expected_prefix, prefix)
2471             self.failUnlessEqual(self.rref.read_count, 0)
2472         d.addCallback(_check_verinfo)
2473         # This shouldn't be enough to read any share data.
2474         d.addCallback(_make_mr, 123)
2475         d.addCallback(lambda mr:
2476             mr.get_block_and_salt(0))
2477         def _check_block_and_salt((block, salt)):
2478             self.failUnlessEqual(block, self.block * 6)
2479             self.failUnlessEqual(salt, self.salt)
2480             # TODO: Fix the read routine so that it reads only the data
2481             #       that it has cached if it can't read all of it.
2482             self.failUnlessEqual(self.rref.read_count, 2)
2483
2484         # This should be enough to read share data.
2485         d.addCallback(_make_mr, self.offsets['share_data'])
2486         d.addCallback(lambda mr:
2487             mr.get_block_and_salt(0))
2488         d.addCallback(_check_block_and_salt)
2489         return d
2490
2491
2492     def test_read_with_empty_mdmf_file(self):
2493         # Some tests upload a file with no contents to test things
2494         # unrelated to the actual handling of the content of the file.
2495         # The reader should behave intelligently in these cases.
2496         self.write_test_share_to_server("si1", empty=True)
2497         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2498         # We should be able to get the encoding parameters, and they
2499         # should be correct.
2500         d = defer.succeed(None)
2501         d.addCallback(lambda ignored:
2502             mr.get_encoding_parameters())
2503         def _check_encoding_parameters(params):
2504             self.failUnlessEqual(len(params), 4)
2505             k, n, segsize, datalen = params
2506             self.failUnlessEqual(k, 3)
2507             self.failUnlessEqual(n, 10)
2508             self.failUnlessEqual(segsize, 0)
2509             self.failUnlessEqual(datalen, 0)
2510         d.addCallback(_check_encoding_parameters)
2511
2512         # We should not be able to fetch a block, since there are no
2513         # blocks to fetch
2514         d.addCallback(lambda ignored:
2515             self.shouldFail(LayoutInvalid, "get block on empty file",
2516                             None,
2517                             mr.get_block_and_salt, 0))
2518         return d
2519
2520
2521     def test_read_with_empty_sdmf_file(self):
2522         self.write_sdmf_share_to_server("si1", empty=True)
2523         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2524         # We should be able to get the encoding parameters, and they
2525         # should be correct
2526         d = defer.succeed(None)
2527         d.addCallback(lambda ignored:
2528             mr.get_encoding_parameters())
2529         def _check_encoding_parameters(params):
2530             self.failUnlessEqual(len(params), 4)
2531             k, n, segsize, datalen = params
2532             self.failUnlessEqual(k, 3)
2533             self.failUnlessEqual(n, 10)
2534             self.failUnlessEqual(segsize, 0)
2535             self.failUnlessEqual(datalen, 0)
2536         d.addCallback(_check_encoding_parameters)
2537
2538         # It does not make sense to get a block in this format, so we
2539         # should not be able to.
2540         d.addCallback(lambda ignored:
2541             self.shouldFail(LayoutInvalid, "get block on an empty file",
2542                             None,
2543                             mr.get_block_and_salt, 0))
2544         return d
2545
2546
2547     def test_verinfo_with_sdmf_file(self):
2548         self.write_sdmf_share_to_server("si1")
2549         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2550         # We should be able to get the version information.
2551         d = defer.succeed(None)
2552         d.addCallback(lambda ignored:
2553             mr.get_verinfo())
2554         def _check_verinfo(verinfo):
2555             self.failUnless(verinfo)
2556             self.failUnlessEqual(len(verinfo), 9)
2557             (seqnum,
2558              root_hash,
2559              salt,
2560              segsize,
2561              datalen,
2562              k,
2563              n,
2564              prefix,
2565              offsets) = verinfo
2566             self.failUnlessEqual(seqnum, 0)
2567             self.failUnlessEqual(root_hash, self.root_hash)
2568             self.failUnlessEqual(salt, self.salt)
2569             self.failUnlessEqual(segsize, 36)
2570             self.failUnlessEqual(datalen, 36)
2571             self.failUnlessEqual(k, 3)
2572             self.failUnlessEqual(n, 10)
2573             expected_prefix = struct.pack(">BQ32s16s BBQQ",
2574                                           0,
2575                                           seqnum,
2576                                           root_hash,
2577                                           salt,
2578                                           k,
2579                                           n,
2580                                           segsize,
2581                                           datalen)
2582             self.failUnlessEqual(prefix, expected_prefix)
2583             self.failUnlessEqual(offsets, self.offsets)
2584         d.addCallback(_check_verinfo)
2585         return d
2586
2587
2588     def test_verinfo_with_mdmf_file(self):
2589         self.write_test_share_to_server("si1")
2590         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2591         d = defer.succeed(None)
2592         d.addCallback(lambda ignored:
2593             mr.get_verinfo())
2594         def _check_verinfo(verinfo):
2595             self.failUnless(verinfo)
2596             self.failUnlessEqual(len(verinfo), 9)
2597             (seqnum,
2598              root_hash,
2599              IV,
2600              segsize,
2601              datalen,
2602              k,
2603              n,
2604              prefix,
2605              offsets) = verinfo
2606             self.failUnlessEqual(seqnum, 0)
2607             self.failUnlessEqual(root_hash, self.root_hash)
2608             self.failIf(IV)
2609             self.failUnlessEqual(segsize, 6)
2610             self.failUnlessEqual(datalen, 36)
2611             self.failUnlessEqual(k, 3)
2612             self.failUnlessEqual(n, 10)
2613             expected_prefix = struct.pack(">BQ32s BBQQ",
2614                                           1,
2615                                           seqnum,
2616                                           root_hash,
2617                                           k,
2618                                           n,
2619                                           segsize,
2620                                           datalen)
2621             self.failUnlessEqual(prefix, expected_prefix)
2622             self.failUnlessEqual(offsets, self.offsets)
2623         d.addCallback(_check_verinfo)
2624         return d
2625
2626
2627     def test_reader_queue(self):
2628         self.write_test_share_to_server('si1')
2629         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2630         d1 = mr.get_block_and_salt(0, queue=True)
2631         d2 = mr.get_blockhashes(queue=True)
2632         d3 = mr.get_sharehashes(queue=True)
2633         d4 = mr.get_signature(queue=True)
2634         d5 = mr.get_verification_key(queue=True)
2635         dl = defer.DeferredList([d1, d2, d3, d4, d5])
2636         mr.flush()
2637         def _print(results):
2638             self.failUnlessEqual(len(results), 5)
2639             # We have one read for version information and offsets, and
2640             # one for everything else.
2641             self.failUnlessEqual(self.rref.read_count, 2)
2642             block, salt = results[0][1] # results[0] is a boolean that says
2643                                            # whether or not the operation
2644                                            # worked.
2645             self.failUnlessEqual(self.block, block)
2646             self.failUnlessEqual(self.salt, salt)
2647
2648             blockhashes = results[1][1]
2649             self.failUnlessEqual(self.block_hash_tree, blockhashes)
2650
2651             sharehashes = results[2][1]
2652             self.failUnlessEqual(self.share_hash_chain, sharehashes)
2653
2654             signature = results[3][1]
2655             self.failUnlessEqual(self.signature, signature)
2656
2657             verification_key = results[4][1]
2658             self.failUnlessEqual(self.verification_key, verification_key)
2659         dl.addCallback(_print)
2660         return dl
2661
2662
2663     def test_sdmf_writer(self):
2664         # Go through the motions of writing an SDMF share to the storage
2665         # server. Then read the storage server to see that the share got
2666         # written in the way that we think it should have. 
2667
2668         # We do this first so that the necessary instance variables get
2669         # set the way we want them for the tests below.
2670         data = self.build_test_sdmf_share()
2671         sdmfr = SDMFSlotWriteProxy(0,
2672                                    self.rref,
2673                                    "si1",
2674                                    self.secrets,
2675                                    0, 3, 10, 36, 36)
2676         # Put the block and salt.
2677         sdmfr.put_block(self.blockdata, 0, self.salt)
2678
2679         # Put the encprivkey
2680         sdmfr.put_encprivkey(self.encprivkey)
2681
2682         # Put the block and share hash chains
2683         sdmfr.put_blockhashes(self.block_hash_tree)
2684         sdmfr.put_sharehashes(self.share_hash_chain)
2685         sdmfr.put_root_hash(self.root_hash)
2686
2687         # Put the signature
2688         sdmfr.put_signature(self.signature)
2689
2690         # Put the verification key
2691         sdmfr.put_verification_key(self.verification_key)
2692
2693         # Now check to make sure that nothing has been written yet.
2694         self.failUnlessEqual(self.rref.write_count, 0)
2695
2696         # Now finish publishing
2697         d = sdmfr.finish_publishing()
2698         def _then(ignored):
2699             self.failUnlessEqual(self.rref.write_count, 1)
2700             read = self.ss.remote_slot_readv
2701             self.failUnlessEqual(read("si1", [0], [(0, len(data))]),
2702                                  {0: [data]})
2703         d.addCallback(_then)
2704         return d
2705
2706
2707     def test_sdmf_writer_preexisting_share(self):
2708         data = self.build_test_sdmf_share()
2709         self.write_sdmf_share_to_server("si1")
2710
2711         # Now there is a share on the storage server. To successfully
2712         # write, we need to set the checkstring correctly. When we
2713         # don't, no write should occur.
2714         sdmfw = SDMFSlotWriteProxy(0,
2715                                    self.rref,
2716                                    "si1",
2717                                    self.secrets,
2718                                    1, 3, 10, 36, 36)
2719         sdmfw.put_block(self.blockdata, 0, self.salt)
2720
2721         # Put the encprivkey
2722         sdmfw.put_encprivkey(self.encprivkey)
2723
2724         # Put the block and share hash chains
2725         sdmfw.put_blockhashes(self.block_hash_tree)
2726         sdmfw.put_sharehashes(self.share_hash_chain)
2727
2728         # Put the root hash
2729         sdmfw.put_root_hash(self.root_hash)
2730
2731         # Put the signature
2732         sdmfw.put_signature(self.signature)
2733
2734         # Put the verification key
2735         sdmfw.put_verification_key(self.verification_key)
2736
2737         # We shouldn't have a checkstring yet
2738         self.failUnlessEqual(sdmfw.get_checkstring(), "")
2739
2740         d = sdmfw.finish_publishing()
2741         def _then(results):
2742             self.failIf(results[0])
2743             # this is the correct checkstring
2744             self._expected_checkstring = results[1][0][0]
2745             return self._expected_checkstring
2746
2747         d.addCallback(_then)
2748         d.addCallback(sdmfw.set_checkstring)
2749         d.addCallback(lambda ignored:
2750             sdmfw.get_checkstring())
2751         d.addCallback(lambda checkstring:
2752             self.failUnlessEqual(checkstring, self._expected_checkstring))
2753         d.addCallback(lambda ignored:
2754             sdmfw.finish_publishing())
2755         def _then_again(results):
2756             self.failUnless(results[0])
2757             read = self.ss.remote_slot_readv
2758             self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2759                                  {0: [struct.pack(">Q", 1)]})
2760             self.failUnlessEqual(read("si1", [0], [(9, len(data) - 9)]),
2761                                  {0: [data[9:]]})
2762         d.addCallback(_then_again)
2763         return d
2764
2765
2766 class Stats(unittest.TestCase):
2767
2768     def setUp(self):
2769         self.sparent = LoggingServiceParent()
2770         self._lease_secret = itertools.count()
2771     def tearDown(self):
2772         return self.sparent.stopService()
2773
2774     def workdir(self, name):
2775         basedir = os.path.join("storage", "Server", name)
2776         return basedir
2777
2778     def create(self, name):
2779         workdir = self.workdir(name)
2780         ss = StorageServer(workdir, "\x00" * 20)
2781         ss.setServiceParent(self.sparent)
2782         return ss
2783
2784     def test_latencies(self):
2785         ss = self.create("test_latencies")
2786         for i in range(10000):
2787             ss.add_latency("allocate", 1.0 * i)
2788         for i in range(1000):
2789             ss.add_latency("renew", 1.0 * i)
2790         for i in range(20):
2791             ss.add_latency("write", 1.0 * i)
2792         for i in range(10):
2793             ss.add_latency("cancel", 2.0 * i)
2794         ss.add_latency("get", 5.0)
2795
2796         output = ss.get_latencies()
2797
2798         self.failUnlessEqual(sorted(output.keys()),
2799                              sorted(["allocate", "renew", "cancel", "write", "get"]))
2800         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
2801         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
2802         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
2803         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
2804         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
2805         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
2806         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
2807         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
2808         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
2809
2810         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
2811         self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
2812         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1, output)
2813         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
2814         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
2815         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
2816         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
2817         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
2818         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
2819
2820         self.failUnlessEqual(len(ss.latencies["write"]), 20)
2821         self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
2822         self.failUnless(output["write"]["01_0_percentile"] is None, output)
2823         self.failUnless(abs(output["write"]["10_0_percentile"] -  2) < 1, output)
2824         self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
2825         self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
2826         self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
2827         self.failUnless(output["write"]["99_0_percentile"] is None, output)
2828         self.failUnless(output["write"]["99_9_percentile"] is None, output)
2829
2830         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
2831         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
2832         self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
2833         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1, output)
2834         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
2835         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
2836         self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
2837         self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
2838         self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
2839
2840         self.failUnlessEqual(len(ss.latencies["get"]), 1)
2841         self.failUnless(output["get"]["mean"] is None, output)
2842         self.failUnless(output["get"]["01_0_percentile"] is None, output)
2843         self.failUnless(output["get"]["10_0_percentile"] is None, output)
2844         self.failUnless(output["get"]["50_0_percentile"] is None, output)
2845         self.failUnless(output["get"]["90_0_percentile"] is None, output)
2846         self.failUnless(output["get"]["95_0_percentile"] is None, output)
2847         self.failUnless(output["get"]["99_0_percentile"] is None, output)
2848         self.failUnless(output["get"]["99_9_percentile"] is None, output)
2849
2850 def remove_tags(s):
2851     s = re.sub(r'<[^>]*>', ' ', s)
2852     s = re.sub(r'\s+', ' ', s)
2853     return s
2854
2855 class MyBucketCountingCrawler(BucketCountingCrawler):
2856     def finished_prefix(self, cycle, prefix):
2857         BucketCountingCrawler.finished_prefix(self, cycle, prefix)
2858         if self.hook_ds:
2859             d = self.hook_ds.pop(0)
2860             d.callback(None)
2861
2862 class MyStorageServer(StorageServer):
2863     def add_bucket_counter(self):
2864         statefile = os.path.join(self.storedir, "bucket_counter.state")
2865         self.bucket_counter = MyBucketCountingCrawler(self, statefile)
2866         self.bucket_counter.setServiceParent(self)
2867
2868 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
2869
2870     def setUp(self):
2871         self.s = service.MultiService()
2872         self.s.startService()
2873     def tearDown(self):
2874         return self.s.stopService()
2875
2876     def test_bucket_counter(self):
2877         basedir = "storage/BucketCounter/bucket_counter"
2878         fileutil.make_dirs(basedir)
2879         ss = StorageServer(basedir, "\x00" * 20)
2880         # to make sure we capture the bucket-counting-crawler in the middle
2881         # of a cycle, we reach in and reduce its maximum slice time to 0. We
2882         # also make it start sooner than usual.
2883         ss.bucket_counter.slow_start = 0
2884         orig_cpu_slice = ss.bucket_counter.cpu_slice
2885         ss.bucket_counter.cpu_slice = 0
2886         ss.setServiceParent(self.s)
2887
2888         w = StorageStatus(ss)
2889
2890         # this sample is before the crawler has started doing anything
2891         html = w.renderSynchronously()
2892         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
2893         s = remove_tags(html)
2894         self.failUnlessIn("Accepting new shares: Yes", s)
2895         self.failUnlessIn("Reserved space: - 0 B (0)", s)
2896         self.failUnlessIn("Total buckets: Not computed yet", s)
2897         self.failUnlessIn("Next crawl in", s)
2898
2899         # give the bucket-counting-crawler one tick to get started. The
2900         # cpu_slice=0 will force it to yield right after it processes the
2901         # first prefix
2902
2903         d = fireEventually()
2904         def _check(ignored):
2905             # are we really right after the first prefix?
2906             state = ss.bucket_counter.get_state()
2907             if state["last-complete-prefix"] is None:
2908                 d2 = fireEventually()
2909                 d2.addCallback(_check)
2910                 return d2
2911             self.failUnlessEqual(state["last-complete-prefix"],
2912                                  ss.bucket_counter.prefixes[0])
2913             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2914             html = w.renderSynchronously()
2915             s = remove_tags(html)
2916             self.failUnlessIn(" Current crawl ", s)
2917             self.failUnlessIn(" (next work in ", s)
2918         d.addCallback(_check)
2919
2920         # now give it enough time to complete a full cycle
2921         def _watch():
2922             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2923         d.addCallback(lambda ignored: self.poll(_watch))
2924         def _check2(ignored):
2925             ss.bucket_counter.cpu_slice = orig_cpu_slice
2926             html = w.renderSynchronously()
2927             s = remove_tags(html)
2928             self.failUnlessIn("Total buckets: 0 (the number of", s)
2929             self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
2930         d.addCallback(_check2)
2931         return d
2932
2933     def test_bucket_counter_cleanup(self):
2934         basedir = "storage/BucketCounter/bucket_counter_cleanup"
2935         fileutil.make_dirs(basedir)
2936         ss = StorageServer(basedir, "\x00" * 20)
2937         # to make sure we capture the bucket-counting-crawler in the middle
2938         # of a cycle, we reach in and reduce its maximum slice time to 0.
2939         ss.bucket_counter.slow_start = 0
2940         orig_cpu_slice = ss.bucket_counter.cpu_slice
2941         ss.bucket_counter.cpu_slice = 0
2942         ss.setServiceParent(self.s)
2943
2944         d = fireEventually()
2945
2946         def _after_first_prefix(ignored):
2947             state = ss.bucket_counter.state
2948             if state["last-complete-prefix"] is None:
2949                 d2 = fireEventually()
2950                 d2.addCallback(_after_first_prefix)
2951                 return d2
2952             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
2953             # now sneak in and mess with its state, to make sure it cleans up
2954             # properly at the end of the cycle
2955             self.failUnlessEqual(state["last-complete-prefix"],
2956                                  ss.bucket_counter.prefixes[0])
2957             state["bucket-counts"][-12] = {}
2958             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
2959             ss.bucket_counter.save_state()
2960         d.addCallback(_after_first_prefix)
2961
2962         # now give it enough time to complete a cycle
2963         def _watch():
2964             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
2965         d.addCallback(lambda ignored: self.poll(_watch))
2966         def _check2(ignored):
2967             ss.bucket_counter.cpu_slice = orig_cpu_slice
2968             s = ss.bucket_counter.get_state()
2969             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
2970             self.failIf("bogusprefix!" in s["storage-index-samples"],
2971                         s["storage-index-samples"].keys())
2972         d.addCallback(_check2)
2973         return d
2974
2975     def test_bucket_counter_eta(self):
2976         basedir = "storage/BucketCounter/bucket_counter_eta"
2977         fileutil.make_dirs(basedir)
2978         ss = MyStorageServer(basedir, "\x00" * 20)
2979         ss.bucket_counter.slow_start = 0
2980         # these will be fired inside finished_prefix()
2981         hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
2982         w = StorageStatus(ss)
2983
2984         d = defer.Deferred()
2985
2986         def _check_1(ignored):
2987             # no ETA is available yet
2988             html = w.renderSynchronously()
2989             s = remove_tags(html)
2990             self.failUnlessIn("complete (next work", s)
2991
2992         def _check_2(ignored):
2993             # one prefix has finished, so an ETA based upon that elapsed time
2994             # should be available.
2995             html = w.renderSynchronously()
2996             s = remove_tags(html)
2997             self.failUnlessIn("complete (ETA ", s)
2998
2999         def _check_3(ignored):
3000             # two prefixes have finished
3001             html = w.renderSynchronously()
3002             s = remove_tags(html)
3003             self.failUnlessIn("complete (ETA ", s)
3004             d.callback("done")
3005
3006         hooks[0].addCallback(_check_1).addErrback(d.errback)
3007         hooks[1].addCallback(_check_2).addErrback(d.errback)
3008         hooks[2].addCallback(_check_3).addErrback(d.errback)
3009
3010         ss.setServiceParent(self.s)
3011         return d
3012
3013 class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
3014     stop_after_first_bucket = False
3015     def process_bucket(self, *args, **kwargs):
3016         LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
3017         if self.stop_after_first_bucket:
3018             self.stop_after_first_bucket = False
3019             self.cpu_slice = -1.0
3020     def yielding(self, sleep_time):
3021         if not self.stop_after_first_bucket:
3022             self.cpu_slice = 500
3023
3024 class BrokenStatResults:
3025     pass
3026 class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
3027     def stat(self, fn):
3028         s = os.stat(fn)
3029         bsr = BrokenStatResults()
3030         for attrname in dir(s):
3031             if attrname.startswith("_"):
3032                 continue
3033             if attrname == "st_blocks":
3034                 continue
3035             setattr(bsr, attrname, getattr(s, attrname))
3036         return bsr
3037
3038 class InstrumentedStorageServer(StorageServer):
3039     LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
3040 class No_ST_BLOCKS_StorageServer(StorageServer):
3041     LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
3042
3043 class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3044
3045     def setUp(self):
3046         self.s = service.MultiService()
3047         self.s.startService()
3048     def tearDown(self):
3049         return self.s.stopService()
3050
3051     def make_shares(self, ss):
3052         def make(si):
3053             return (si, hashutil.tagged_hash("renew", si),
3054                     hashutil.tagged_hash("cancel", si))
3055         def make_mutable(si):
3056             return (si, hashutil.tagged_hash("renew", si),
3057                     hashutil.tagged_hash("cancel", si),
3058                     hashutil.tagged_hash("write-enabler", si))
3059         def make_extra_lease(si, num):
3060             return (hashutil.tagged_hash("renew-%d" % num, si),
3061                     hashutil.tagged_hash("cancel-%d" % num, si))
3062
3063         immutable_si_0, rs0, cs0 = make("\x00" * 16)
3064         immutable_si_1, rs1, cs1 = make("\x01" * 16)
3065         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
3066         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
3067         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
3068         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
3069         sharenums = [0]
3070         canary = FakeCanary()
3071         # note: 'tahoe debug dump-share' will not handle this file, since the
3072         # inner contents are not a valid CHK share
3073         data = "\xff" * 1000
3074
3075         a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
3076                                          1000, canary)
3077         w[0].remote_write(0, data)
3078         w[0].remote_close()
3079
3080         a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
3081                                          1000, canary)
3082         w[0].remote_write(0, data)
3083         w[0].remote_close()
3084         ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
3085
3086         writev = ss.remote_slot_testv_and_readv_and_writev
3087         writev(mutable_si_2, (we2, rs2, cs2),
3088                {0: ([], [(0,data)], len(data))}, [])
3089         writev(mutable_si_3, (we3, rs3, cs3),
3090                {0: ([], [(0,data)], len(data))}, [])
3091         ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
3092
3093         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
3094         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
3095         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
3096
3097     def test_basic(self):
3098         basedir = "storage/LeaseCrawler/basic"
3099         fileutil.make_dirs(basedir)
3100         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3101         # make it start sooner than usual.
3102         lc = ss.lease_checker
3103         lc.slow_start = 0
3104         lc.cpu_slice = 500
3105         lc.stop_after_first_bucket = True
3106         webstatus = StorageStatus(ss)
3107
3108         # create a few shares, with some leases on them
3109         self.make_shares(ss)
3110         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3111
3112         # add a non-sharefile to exercise another code path
3113         fn = os.path.join(ss.sharedir,
3114                           storage_index_to_dir(immutable_si_0),
3115                           "not-a-share")
3116         f = open(fn, "wb")
3117         f.write("I am not a share.\n")
3118         f.close()
3119
3120         # this is before the crawl has started, so we're not in a cycle yet
3121         initial_state = lc.get_state()
3122         self.failIf(lc.get_progress()["cycle-in-progress"])
3123         self.failIfIn("cycle-to-date", initial_state)
3124         self.failIfIn("estimated-remaining-cycle", initial_state)
3125         self.failIfIn("estimated-current-cycle", initial_state)
3126         self.failUnlessIn("history", initial_state)
3127         self.failUnlessEqual(initial_state["history"], {})
3128
3129         ss.setServiceParent(self.s)
3130
3131         DAY = 24*60*60
3132
3133         d = fireEventually()
3134
3135         # now examine the state right after the first bucket has been
3136         # processed.
3137         def _after_first_bucket(ignored):
3138             initial_state = lc.get_state()
3139             if "cycle-to-date" not in initial_state:
3140                 d2 = fireEventually()
3141                 d2.addCallback(_after_first_bucket)
3142                 return d2
3143             self.failUnlessIn("cycle-to-date", initial_state)
3144             self.failUnlessIn("estimated-remaining-cycle", initial_state)
3145             self.failUnlessIn("estimated-current-cycle", initial_state)
3146             self.failUnlessIn("history", initial_state)
3147             self.failUnlessEqual(initial_state["history"], {})
3148
3149             so_far = initial_state["cycle-to-date"]
3150             self.failUnlessEqual(so_far["expiration-enabled"], False)
3151             self.failUnlessIn("configured-expiration-mode", so_far)
3152             self.failUnlessIn("lease-age-histogram", so_far)
3153             lah = so_far["lease-age-histogram"]
3154             self.failUnlessEqual(type(lah), list)
3155             self.failUnlessEqual(len(lah), 1)
3156             self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
3157             self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
3158             self.failUnlessEqual(so_far["corrupt-shares"], [])
3159             sr1 = so_far["space-recovered"]
3160             self.failUnlessEqual(sr1["examined-buckets"], 1)
3161             self.failUnlessEqual(sr1["examined-shares"], 1)
3162             self.failUnlessEqual(sr1["actual-shares"], 0)
3163             self.failUnlessEqual(sr1["configured-diskbytes"], 0)
3164             self.failUnlessEqual(sr1["original-sharebytes"], 0)
3165             left = initial_state["estimated-remaining-cycle"]
3166             sr2 = left["space-recovered"]
3167             self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
3168             self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
3169             self.failIfEqual(sr2["actual-shares"], None)
3170             self.failIfEqual(sr2["configured-diskbytes"], None)
3171             self.failIfEqual(sr2["original-sharebytes"], None)
3172         d.addCallback(_after_first_bucket)
3173         d.addCallback(lambda ign: self.render1(webstatus))
3174         def _check_html_in_cycle(html):
3175             s = remove_tags(html)
3176             self.failUnlessIn("So far, this cycle has examined "
3177                               "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
3178             self.failUnlessIn("and has recovered: "
3179                               "0 shares, 0 buckets (0 mutable / 0 immutable), "
3180                               "0 B (0 B / 0 B)", s)
3181             self.failUnlessIn("If expiration were enabled, "
3182                               "we would have recovered: "
3183                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3184                               " 0 B (0 B / 0 B) by now", s)
3185             self.failUnlessIn("and the remainder of this cycle "
3186                               "would probably recover: "
3187                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3188                               " 0 B (0 B / 0 B)", s)
3189             self.failUnlessIn("and the whole cycle would probably recover: "
3190                               "0 shares, 0 buckets (0 mutable / 0 immutable),"
3191                               " 0 B (0 B / 0 B)", s)
3192             self.failUnlessIn("if we were strictly using each lease's default "
3193                               "31-day lease lifetime", s)
3194             self.failUnlessIn("this cycle would be expected to recover: ", s)
3195         d.addCallback(_check_html_in_cycle)
3196
3197         # wait for the crawler to finish the first cycle. Nothing should have
3198         # been removed.
3199         def _wait():
3200             return bool(lc.get_state()["last-cycle-finished"] is not None)
3201         d.addCallback(lambda ign: self.poll(_wait))
3202
3203         def _after_first_cycle(ignored):
3204             s = lc.get_state()
3205             self.failIf("cycle-to-date" in s)
3206             self.failIf("estimated-remaining-cycle" in s)
3207             self.failIf("estimated-current-cycle" in s)
3208             last = s["history"][0]
3209             self.failUnlessIn("cycle-start-finish-times", last)
3210             self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
3211             self.failUnlessEqual(last["expiration-enabled"], False)
3212             self.failUnlessIn("configured-expiration-mode", last)
3213
3214             self.failUnlessIn("lease-age-histogram", last)
3215             lah = last["lease-age-histogram"]
3216             self.failUnlessEqual(type(lah), list)
3217             self.failUnlessEqual(len(lah), 1)
3218             self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
3219
3220             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3221             self.failUnlessEqual(last["corrupt-shares"], [])
3222
3223             rec = last["space-recovered"]
3224             self.failUnlessEqual(rec["examined-buckets"], 4)
3225             self.failUnlessEqual(rec["examined-shares"], 4)
3226             self.failUnlessEqual(rec["actual-buckets"], 0)
3227             self.failUnlessEqual(rec["original-buckets"], 0)
3228             self.failUnlessEqual(rec["configured-buckets"], 0)
3229             self.failUnlessEqual(rec["actual-shares"], 0)
3230             self.failUnlessEqual(rec["original-shares"], 0)
3231             self.failUnlessEqual(rec["configured-shares"], 0)
3232             self.failUnlessEqual(rec["actual-diskbytes"], 0)
3233             self.failUnlessEqual(rec["original-diskbytes"], 0)
3234             self.failUnlessEqual(rec["configured-diskbytes"], 0)
3235             self.failUnlessEqual(rec["actual-sharebytes"], 0)
3236             self.failUnlessEqual(rec["original-sharebytes"], 0)
3237             self.failUnlessEqual(rec["configured-sharebytes"], 0)
3238
3239             def _get_sharefile(si):
3240                 return list(ss._iter_share_files(si))[0]
3241             def count_leases(si):
3242                 return len(list(_get_sharefile(si).get_leases()))
3243             self.failUnlessEqual(count_leases(immutable_si_0), 1)
3244             self.failUnlessEqual(count_leases(immutable_si_1), 2)
3245             self.failUnlessEqual(count_leases(mutable_si_2), 1)
3246             self.failUnlessEqual(count_leases(mutable_si_3), 2)
3247         d.addCallback(_after_first_cycle)
3248         d.addCallback(lambda ign: self.render1(webstatus))
3249         def _check_html(html):
3250             s = remove_tags(html)
3251             self.failUnlessIn("recovered: 0 shares, 0 buckets "
3252                               "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
3253             self.failUnlessIn("and saw a total of 4 shares, 4 buckets "
3254                               "(2 mutable / 2 immutable),", s)
3255             self.failUnlessIn("but expiration was not enabled", s)
3256         d.addCallback(_check_html)
3257         d.addCallback(lambda ign: self.render_json(webstatus))
3258         def _check_json(json):
3259             data = simplejson.loads(json)
3260             self.failUnlessIn("lease-checker", data)
3261             self.failUnlessIn("lease-checker-progress", data)
3262         d.addCallback(_check_json)
3263         return d
3264
3265     def backdate_lease(self, sf, renew_secret, new_expire_time):
3266         # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
3267         # "renew" a lease with a new_expire_time that is older than what the
3268         # current lease has), so we have to reach inside it.
3269         for i,lease in enumerate(sf.get_leases()):
3270             if lease.renew_secret == renew_secret:
3271                 lease.expiration_time = new_expire_time
3272                 f = open(sf.home, 'rb+')
3273                 sf._write_lease_record(f, i, lease)
3274                 f.close()
3275                 return
3276         raise IndexError("unable to renew non-existent lease")
3277
3278     def test_expire_age(self):
3279         basedir = "storage/LeaseCrawler/expire_age"
3280         fileutil.make_dirs(basedir)
3281         # setting expiration_time to 2000 means that any lease which is more
3282         # than 2000s old will be expired.
3283         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3284                                        expiration_enabled=True,
3285                                        expiration_mode="age",
3286                                        expiration_override_lease_duration=2000)
3287         # make it start sooner than usual.
3288         lc = ss.lease_checker
3289         lc.slow_start = 0
3290         lc.stop_after_first_bucket = True
3291         webstatus = StorageStatus(ss)
3292
3293         # create a few shares, with some leases on them
3294         self.make_shares(ss)
3295         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3296
3297         def count_shares(si):
3298             return len(list(ss._iter_share_files(si)))
3299         def _get_sharefile(si):
3300             return list(ss._iter_share_files(si))[0]
3301         def count_leases(si):
3302             return len(list(_get_sharefile(si).get_leases()))
3303
3304         self.failUnlessEqual(count_shares(immutable_si_0), 1)
3305         self.failUnlessEqual(count_leases(immutable_si_0), 1)
3306         self.failUnlessEqual(count_shares(immutable_si_1), 1)
3307         self.failUnlessEqual(count_leases(immutable_si_1), 2)
3308         self.failUnlessEqual(count_shares(mutable_si_2), 1)
3309         self.failUnlessEqual(count_leases(mutable_si_2), 1)
3310         self.failUnlessEqual(count_shares(mutable_si_3), 1)
3311         self.failUnlessEqual(count_leases(mutable_si_3), 2)
3312
3313         # artificially crank back the expiration time on the first lease of
3314         # each share, to make it look like it expired already (age=1000s).
3315         # Some shares have an extra lease which is set to expire at the
3316         # default time in 31 days from now (age=31days). We then run the
3317         # crawler, which will expire the first lease, making some shares get
3318         # deleted and others stay alive (with one remaining lease)
3319         now = time.time()
3320
3321         sf0 = _get_sharefile(immutable_si_0)
3322         self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
3323         sf0_size = os.stat(sf0.home).st_size
3324
3325         # immutable_si_1 gets an extra lease
3326         sf1 = _get_sharefile(immutable_si_1)
3327         self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
3328
3329         sf2 = _get_sharefile(mutable_si_2)
3330         self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
3331         sf2_size = os.stat(sf2.home).st_size
3332
3333         # mutable_si_3 gets an extra lease
3334         sf3 = _get_sharefile(mutable_si_3)
3335         self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
3336
3337         ss.setServiceParent(self.s)
3338
3339         d = fireEventually()
3340         # examine the state right after the first bucket has been processed
3341         def _after_first_bucket(ignored):
3342             p = lc.get_progress()
3343             if not p["cycle-in-progress"]:
3344                 d2 = fireEventually()
3345                 d2.addCallback(_after_first_bucket)
3346                 return d2
3347         d.addCallback(_after_first_bucket)
3348         d.addCallback(lambda ign: self.render1(webstatus))
3349         def _check_html_in_cycle(html):
3350             s = remove_tags(html)
3351             # the first bucket encountered gets deleted, and its prefix
3352             # happens to be about 1/5th of the way through the ring, so the
3353             # predictor thinks we'll have 5 shares and that we'll delete them
3354             # all. This part of the test depends upon the SIs landing right
3355             # where they do now.
3356             self.failUnlessIn("The remainder of this cycle is expected to "
3357                               "recover: 4 shares, 4 buckets", s)
3358             self.failUnlessIn("The whole cycle is expected to examine "
3359                               "5 shares in 5 buckets and to recover: "
3360                               "5 shares, 5 buckets", s)
3361         d.addCallback(_check_html_in_cycle)
3362
3363         # wait for the crawler to finish the first cycle. Two shares should
3364         # have been removed
3365         def _wait():
3366             return bool(lc.get_state()["last-cycle-finished"] is not None)
3367         d.addCallback(lambda ign: self.poll(_wait))
3368
3369         def _after_first_cycle(ignored):
3370             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3371             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3372             self.failUnlessEqual(count_leases(immutable_si_1), 1)
3373             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3374             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3375             self.failUnlessEqual(count_leases(mutable_si_3), 1)
3376
3377             s = lc.get_state()
3378             last = s["history"][0]
3379
3380             self.failUnlessEqual(last["expiration-enabled"], True)
3381             self.failUnlessEqual(last["configured-expiration-mode"],
3382                                  ("age", 2000, None, ("mutable", "immutable")))
3383             self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
3384
3385             rec = last["space-recovered"]
3386             self.failUnlessEqual(rec["examined-buckets"], 4)
3387             self.failUnlessEqual(rec["examined-shares"], 4)
3388             self.failUnlessEqual(rec["actual-buckets"], 2)
3389             self.failUnlessEqual(rec["original-buckets"], 2)
3390             self.failUnlessEqual(rec["configured-buckets"], 2)
3391             self.failUnlessEqual(rec["actual-shares"], 2)
3392             self.failUnlessEqual(rec["original-shares"], 2)
3393             self.failUnlessEqual(rec["configured-shares"], 2)
3394             size = sf0_size + sf2_size
3395             self.failUnlessEqual(rec["actual-sharebytes"], size)
3396             self.failUnlessEqual(rec["original-sharebytes"], size)
3397             self.failUnlessEqual(rec["configured-sharebytes"], size)
3398             # different platforms have different notions of "blocks used by
3399             # this file", so merely assert that it's a number
3400             self.failUnless(rec["actual-diskbytes"] >= 0,
3401                             rec["actual-diskbytes"])
3402             self.failUnless(rec["original-diskbytes"] >= 0,
3403                             rec["original-diskbytes"])
3404             self.failUnless(rec["configured-diskbytes"] >= 0,
3405                             rec["configured-diskbytes"])
3406         d.addCallback(_after_first_cycle)
3407         d.addCallback(lambda ign: self.render1(webstatus))
3408         def _check_html(html):
3409             s = remove_tags(html)
3410             self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
3411             self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
3412             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3413         d.addCallback(_check_html)
3414         return d
3415
3416     def test_expire_cutoff_date(self):
3417         basedir = "storage/LeaseCrawler/expire_cutoff_date"
3418         fileutil.make_dirs(basedir)
3419         # setting cutoff-date to 2000 seconds ago means that any lease which
3420         # is more than 2000s old will be expired.
3421         now = time.time()
3422         then = int(now - 2000)
3423         ss = InstrumentedStorageServer(basedir, "\x00" * 20,
3424                                        expiration_enabled=True,
3425                                        expiration_mode="cutoff-date",
3426                                        expiration_cutoff_date=then)
3427         # make it start sooner than usual.
3428         lc = ss.lease_checker
3429         lc.slow_start = 0
3430         lc.stop_after_first_bucket = True
3431         webstatus = StorageStatus(ss)
3432
3433         # create a few shares, with some leases on them
3434         self.make_shares(ss)
3435         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3436
3437         def count_shares(si):
3438             return len(list(ss._iter_share_files(si)))
3439         def _get_sharefile(si):
3440             return list(ss._iter_share_files(si))[0]
3441         def count_leases(si):
3442             return len(list(_get_sharefile(si).get_leases()))
3443
3444         self.failUnlessEqual(count_shares(immutable_si_0), 1)
3445         self.failUnlessEqual(count_leases(immutable_si_0), 1)
3446         self.failUnlessEqual(count_shares(immutable_si_1), 1)
3447         self.failUnlessEqual(count_leases(immutable_si_1), 2)
3448         self.failUnlessEqual(count_shares(mutable_si_2), 1)
3449         self.failUnlessEqual(count_leases(mutable_si_2), 1)
3450         self.failUnlessEqual(count_shares(mutable_si_3), 1)
3451         self.failUnlessEqual(count_leases(mutable_si_3), 2)
3452
3453         # artificially crank back the expiration time on the first lease of
3454         # each share, to make it look like was renewed 3000s ago. To achieve
3455         # this, we need to set the expiration time to now-3000+31days. This
3456         # will change when the lease format is improved to contain both
3457         # create/renew time and duration.
3458         new_expiration_time = now - 3000 + 31*24*60*60
3459
3460         # Some shares have an extra lease which is set to expire at the
3461         # default time in 31 days from now (age=31days). We then run the
3462         # crawler, which will expire the first lease, making some shares get
3463         # deleted and others stay alive (with one remaining lease)
3464
3465         sf0 = _get_sharefile(immutable_si_0)
3466         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3467         sf0_size = os.stat(sf0.home).st_size
3468
3469         # immutable_si_1 gets an extra lease
3470         sf1 = _get_sharefile(immutable_si_1)
3471         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3472
3473         sf2 = _get_sharefile(mutable_si_2)
3474         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3475         sf2_size = os.stat(sf2.home).st_size
3476
3477         # mutable_si_3 gets an extra lease
3478         sf3 = _get_sharefile(mutable_si_3)
3479         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3480
3481         ss.setServiceParent(self.s)
3482
3483         d = fireEventually()
3484         # examine the state right after the first bucket has been processed
3485         def _after_first_bucket(ignored):
3486             p = lc.get_progress()
3487             if not p["cycle-in-progress"]:
3488                 d2 = fireEventually()
3489                 d2.addCallback(_after_first_bucket)
3490                 return d2
3491         d.addCallback(_after_first_bucket)
3492         d.addCallback(lambda ign: self.render1(webstatus))
3493         def _check_html_in_cycle(html):
3494             s = remove_tags(html)
3495             # the first bucket encountered gets deleted, and its prefix
3496             # happens to be about 1/5th of the way through the ring, so the
3497             # predictor thinks we'll have 5 shares and that we'll delete them
3498             # all. This part of the test depends upon the SIs landing right
3499             # where they do now.
3500             self.failUnlessIn("The remainder of this cycle is expected to "
3501                               "recover: 4 shares, 4 buckets", s)
3502             self.failUnlessIn("The whole cycle is expected to examine "
3503                               "5 shares in 5 buckets and to recover: "
3504                               "5 shares, 5 buckets", s)
3505         d.addCallback(_check_html_in_cycle)
3506
3507         # wait for the crawler to finish the first cycle. Two shares should
3508         # have been removed
3509         def _wait():
3510             return bool(lc.get_state()["last-cycle-finished"] is not None)
3511         d.addCallback(lambda ign: self.poll(_wait))
3512
3513         def _after_first_cycle(ignored):
3514             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3515             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3516             self.failUnlessEqual(count_leases(immutable_si_1), 1)
3517             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3518             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3519             self.failUnlessEqual(count_leases(mutable_si_3), 1)
3520
3521             s = lc.get_state()
3522             last = s["history"][0]
3523
3524             self.failUnlessEqual(last["expiration-enabled"], True)
3525             self.failUnlessEqual(last["configured-expiration-mode"],
3526                                  ("cutoff-date", None, then,
3527                                   ("mutable", "immutable")))
3528             self.failUnlessEqual(last["leases-per-share-histogram"],
3529                                  {1: 2, 2: 2})
3530
3531             rec = last["space-recovered"]
3532             self.failUnlessEqual(rec["examined-buckets"], 4)
3533             self.failUnlessEqual(rec["examined-shares"], 4)
3534             self.failUnlessEqual(rec["actual-buckets"], 2)
3535             self.failUnlessEqual(rec["original-buckets"], 0)
3536             self.failUnlessEqual(rec["configured-buckets"], 2)
3537             self.failUnlessEqual(rec["actual-shares"], 2)
3538             self.failUnlessEqual(rec["original-shares"], 0)
3539             self.failUnlessEqual(rec["configured-shares"], 2)
3540             size = sf0_size + sf2_size
3541             self.failUnlessEqual(rec["actual-sharebytes"], size)
3542             self.failUnlessEqual(rec["original-sharebytes"], 0)
3543             self.failUnlessEqual(rec["configured-sharebytes"], size)
3544             # different platforms have different notions of "blocks used by
3545             # this file", so merely assert that it's a number
3546             self.failUnless(rec["actual-diskbytes"] >= 0,
3547                             rec["actual-diskbytes"])
3548             self.failUnless(rec["original-diskbytes"] >= 0,
3549                             rec["original-diskbytes"])
3550             self.failUnless(rec["configured-diskbytes"] >= 0,
3551                             rec["configured-diskbytes"])
3552         d.addCallback(_after_first_cycle)
3553         d.addCallback(lambda ign: self.render1(webstatus))
3554         def _check_html(html):
3555             s = remove_tags(html)
3556             self.failUnlessIn("Expiration Enabled:"
3557                               " expired leases will be removed", s)
3558             date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
3559             substr = "Leases created or last renewed before %s will be considered expired." % date
3560             self.failUnlessIn(substr, s)
3561             self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
3562         d.addCallback(_check_html)
3563         return d
3564
3565     def test_only_immutable(self):
3566         basedir = "storage/LeaseCrawler/only_immutable"
3567         fileutil.make_dirs(basedir)
3568         now = time.time()
3569         then = int(now - 2000)
3570         ss = StorageServer(basedir, "\x00" * 20,
3571                            expiration_enabled=True,
3572                            expiration_mode="cutoff-date",
3573                            expiration_cutoff_date=then,
3574                            expiration_sharetypes=("immutable",))
3575         lc = ss.lease_checker
3576         lc.slow_start = 0
3577         webstatus = StorageStatus(ss)
3578
3579         self.make_shares(ss)
3580         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3581         # set all leases to be expirable
3582         new_expiration_time = now - 3000 + 31*24*60*60
3583
3584         def count_shares(si):
3585             return len(list(ss._iter_share_files(si)))
3586         def _get_sharefile(si):
3587             return list(ss._iter_share_files(si))[0]
3588         def count_leases(si):
3589             return len(list(_get_sharefile(si).get_leases()))
3590
3591         sf0 = _get_sharefile(immutable_si_0)
3592         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3593         sf1 = _get_sharefile(immutable_si_1)
3594         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3595         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3596         sf2 = _get_sharefile(mutable_si_2)
3597         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3598         sf3 = _get_sharefile(mutable_si_3)
3599         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3600         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3601
3602         ss.setServiceParent(self.s)
3603         def _wait():
3604             return bool(lc.get_state()["last-cycle-finished"] is not None)
3605         d = self.poll(_wait)
3606
3607         def _after_first_cycle(ignored):
3608             self.failUnlessEqual(count_shares(immutable_si_0), 0)
3609             self.failUnlessEqual(count_shares(immutable_si_1), 0)
3610             self.failUnlessEqual(count_shares(mutable_si_2), 1)
3611             self.failUnlessEqual(count_leases(mutable_si_2), 1)
3612             self.failUnlessEqual(count_shares(mutable_si_3), 1)
3613             self.failUnlessEqual(count_leases(mutable_si_3), 2)
3614         d.addCallback(_after_first_cycle)
3615         d.addCallback(lambda ign: self.render1(webstatus))
3616         def _check_html(html):
3617             s = remove_tags(html)
3618             self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
3619         d.addCallback(_check_html)
3620         return d
3621
3622     def test_only_mutable(self):
3623         basedir = "storage/LeaseCrawler/only_mutable"
3624         fileutil.make_dirs(basedir)
3625         now = time.time()
3626         then = int(now - 2000)
3627         ss = StorageServer(basedir, "\x00" * 20,
3628                            expiration_enabled=True,
3629                            expiration_mode="cutoff-date",
3630                            expiration_cutoff_date=then,
3631                            expiration_sharetypes=("mutable",))
3632         lc = ss.lease_checker
3633         lc.slow_start = 0
3634         webstatus = StorageStatus(ss)
3635
3636         self.make_shares(ss)
3637         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3638         # set all leases to be expirable
3639         new_expiration_time = now - 3000 + 31*24*60*60
3640
3641         def count_shares(si):
3642             return len(list(ss._iter_share_files(si)))
3643         def _get_sharefile(si):
3644             return list(ss._iter_share_files(si))[0]
3645         def count_leases(si):
3646             return len(list(_get_sharefile(si).get_leases()))
3647
3648         sf0 = _get_sharefile(immutable_si_0)
3649         self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
3650         sf1 = _get_sharefile(immutable_si_1)
3651         self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
3652         self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
3653         sf2 = _get_sharefile(mutable_si_2)
3654         self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
3655         sf3 = _get_sharefile(mutable_si_3)
3656         self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
3657         self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
3658
3659         ss.setServiceParent(self.s)
3660         def _wait():
3661             return bool(lc.get_state()["last-cycle-finished"] is not None)
3662         d = self.poll(_wait)
3663
3664         def _after_first_cycle(ignored):
3665             self.failUnlessEqual(count_shares(immutable_si_0), 1)
3666             self.failUnlessEqual(count_leases(immutable_si_0), 1)
3667             self.failUnlessEqual(count_shares(immutable_si_1), 1)
3668             self.failUnlessEqual(count_leases(immutable_si_1), 2)
3669             self.failUnlessEqual(count_shares(mutable_si_2), 0)
3670             self.failUnlessEqual(count_shares(mutable_si_3), 0)
3671         d.addCallback(_after_first_cycle)
3672         d.addCallback(lambda ign: self.render1(webstatus))
3673         def _check_html(html):
3674             s = remove_tags(html)
3675             self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
3676         d.addCallback(_check_html)
3677         return d
3678
3679     def test_bad_mode(self):
3680         basedir = "storage/LeaseCrawler/bad_mode"
3681         fileutil.make_dirs(basedir)
3682         e = self.failUnlessRaises(ValueError,
3683                                   StorageServer, basedir, "\x00" * 20,
3684                                   expiration_mode="bogus")
3685         self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
3686
3687     def test_parse_duration(self):
3688         DAY = 24*60*60
3689         MONTH = 31*DAY
3690         YEAR = 365*DAY
3691         p = time_format.parse_duration
3692         self.failUnlessEqual(p("7days"), 7*DAY)
3693         self.failUnlessEqual(p("31day"), 31*DAY)
3694         self.failUnlessEqual(p("60 days"), 60*DAY)
3695         self.failUnlessEqual(p("2mo"), 2*MONTH)
3696         self.failUnlessEqual(p("3 month"), 3*MONTH)
3697         self.failUnlessEqual(p("2years"), 2*YEAR)
3698         e = self.failUnlessRaises(ValueError, p, "2kumquats")
3699         self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
3700
3701     def test_parse_date(self):
3702         p = time_format.parse_date
3703         self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
3704         self.failUnlessEqual(p("2009-03-18"), 1237334400)
3705
3706     def test_limited_history(self):
3707         basedir = "storage/LeaseCrawler/limited_history"
3708         fileutil.make_dirs(basedir)
3709         ss = StorageServer(basedir, "\x00" * 20)
3710         # make it start sooner than usual.
3711         lc = ss.lease_checker
3712         lc.slow_start = 0
3713         lc.cpu_slice = 500
3714
3715         # create a few shares, with some leases on them
3716         self.make_shares(ss)
3717
3718         ss.setServiceParent(self.s)
3719
3720         def _wait_until_15_cycles_done():
3721             last = lc.state["last-cycle-finished"]
3722             if last is not None and last >= 15:
3723                 return True
3724             if lc.timer:
3725                 lc.timer.reset(0)
3726             return False
3727         d = self.poll(_wait_until_15_cycles_done)
3728
3729         def _check(ignored):
3730             s = lc.get_state()
3731             h = s["history"]
3732             self.failUnlessEqual(len(h), 10)
3733             self.failUnlessEqual(max(h.keys()), 15)
3734             self.failUnlessEqual(min(h.keys()), 6)
3735         d.addCallback(_check)
3736         return d
3737
3738     def test_unpredictable_future(self):
3739         basedir = "storage/LeaseCrawler/unpredictable_future"
3740         fileutil.make_dirs(basedir)
3741         ss = StorageServer(basedir, "\x00" * 20)
3742         # make it start sooner than usual.
3743         lc = ss.lease_checker
3744         lc.slow_start = 0
3745         lc.cpu_slice = -1.0 # stop quickly
3746
3747         self.make_shares(ss)
3748
3749         ss.setServiceParent(self.s)
3750
3751         d = fireEventually()
3752         def _check(ignored):
3753             # this should fire after the first bucket is complete, but before
3754             # the first prefix is complete, so the progress-measurer won't
3755             # think we've gotten far enough to raise our percent-complete
3756             # above 0%, triggering the cannot-predict-the-future code in
3757             # expirer.py . This will have to change if/when the
3758             # progress-measurer gets smart enough to count buckets (we'll
3759             # have to interrupt it even earlier, before it's finished the
3760             # first bucket).
3761             s = lc.get_state()
3762             if "cycle-to-date" not in s:
3763                 d2 = fireEventually()
3764                 d2.addCallback(_check)
3765                 return d2
3766             self.failUnlessIn("cycle-to-date", s)
3767             self.failUnlessIn("estimated-remaining-cycle", s)
3768             self.failUnlessIn("estimated-current-cycle", s)
3769
3770             left = s["estimated-remaining-cycle"]["space-recovered"]
3771             self.failUnlessEqual(left["actual-buckets"], None)
3772             self.failUnlessEqual(left["original-buckets"], None)
3773             self.failUnlessEqual(left["configured-buckets"], None)
3774             self.failUnlessEqual(left["actual-shares"], None)
3775             self.failUnlessEqual(left["original-shares"], None)
3776             self.failUnlessEqual(left["configured-shares"], None)
3777             self.failUnlessEqual(left["actual-diskbytes"], None)
3778             self.failUnlessEqual(left["original-diskbytes"], None)
3779             self.failUnlessEqual(left["configured-diskbytes"], None)
3780             self.failUnlessEqual(left["actual-sharebytes"], None)
3781             self.failUnlessEqual(left["original-sharebytes"], None)
3782             self.failUnlessEqual(left["configured-sharebytes"], None)
3783
3784             full = s["estimated-remaining-cycle"]["space-recovered"]
3785             self.failUnlessEqual(full["actual-buckets"], None)
3786             self.failUnlessEqual(full["original-buckets"], None)
3787             self.failUnlessEqual(full["configured-buckets"], None)
3788             self.failUnlessEqual(full["actual-shares"], None)
3789             self.failUnlessEqual(full["original-shares"], None)
3790             self.failUnlessEqual(full["configured-shares"], None)
3791             self.failUnlessEqual(full["actual-diskbytes"], None)
3792             self.failUnlessEqual(full["original-diskbytes"], None)
3793             self.failUnlessEqual(full["configured-diskbytes"], None)
3794             self.failUnlessEqual(full["actual-sharebytes"], None)
3795             self.failUnlessEqual(full["original-sharebytes"], None)
3796             self.failUnlessEqual(full["configured-sharebytes"], None)
3797
3798         d.addCallback(_check)
3799         return d
3800
3801     def test_no_st_blocks(self):
3802         basedir = "storage/LeaseCrawler/no_st_blocks"
3803         fileutil.make_dirs(basedir)
3804         ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
3805                                         expiration_mode="age",
3806                                         expiration_override_lease_duration=-1000)
3807         # a negative expiration_time= means the "configured-"
3808         # space-recovered counts will be non-zero, since all shares will have
3809         # expired by then
3810
3811         # make it start sooner than usual.
3812         lc = ss.lease_checker
3813         lc.slow_start = 0
3814
3815         self.make_shares(ss)
3816         ss.setServiceParent(self.s)
3817         def _wait():
3818             return bool(lc.get_state()["last-cycle-finished"] is not None)
3819         d = self.poll(_wait)
3820
3821         def _check(ignored):
3822             s = lc.get_state()
3823             last = s["history"][0]
3824             rec = last["space-recovered"]
3825             self.failUnlessEqual(rec["configured-buckets"], 4)
3826             self.failUnlessEqual(rec["configured-shares"], 4)
3827             self.failUnless(rec["configured-sharebytes"] > 0,
3828                             rec["configured-sharebytes"])
3829             # without the .st_blocks field in os.stat() results, we should be
3830             # reporting diskbytes==sharebytes
3831             self.failUnlessEqual(rec["configured-sharebytes"],
3832                                  rec["configured-diskbytes"])
3833         d.addCallback(_check)
3834         return d
3835
3836     def test_share_corruption(self):
3837         self._poll_should_ignore_these_errors = [
3838             UnknownMutableContainerVersionError,
3839             UnknownImmutableContainerVersionError,
3840             ]
3841         basedir = "storage/LeaseCrawler/share_corruption"
3842         fileutil.make_dirs(basedir)
3843         ss = InstrumentedStorageServer(basedir, "\x00" * 20)
3844         w = StorageStatus(ss)
3845         # make it start sooner than usual.
3846         lc = ss.lease_checker
3847         lc.stop_after_first_bucket = True
3848         lc.slow_start = 0
3849         lc.cpu_slice = 500
3850
3851         # create a few shares, with some leases on them
3852         self.make_shares(ss)
3853
3854         # now corrupt one, and make sure the lease-checker keeps going
3855         [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
3856         first = min(self.sis)
3857         first_b32 = base32.b2a(first)
3858         fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
3859         f = open(fn, "rb+")
3860         f.seek(0)
3861         f.write("BAD MAGIC")
3862         f.close()
3863         # if get_share_file() doesn't see the correct mutable magic, it
3864         # assumes the file is an immutable share, and then
3865         # immutable.ShareFile sees a bad version. So regardless of which kind
3866         # of share we corrupted, this will trigger an
3867         # UnknownImmutableContainerVersionError.
3868
3869         # also create an empty bucket
3870         empty_si = base32.b2a("\x04"*16)
3871         empty_bucket_dir = os.path.join(ss.sharedir,
3872                                         storage_index_to_dir(empty_si))
3873         fileutil.make_dirs(empty_bucket_dir)
3874
3875         ss.setServiceParent(self.s)
3876
3877         d = fireEventually()
3878
3879         # now examine the state right after the first bucket has been
3880         # processed.
3881         def _after_first_bucket(ignored):
3882             s = lc.get_state()
3883             if "cycle-to-date" not in s:
3884                 d2 = fireEventually()
3885                 d2.addCallback(_after_first_bucket)
3886                 return d2
3887             so_far = s["cycle-to-date"]
3888             rec = so_far["space-recovered"]
3889             self.failUnlessEqual(rec["examined-buckets"], 1)
3890             self.failUnlessEqual(rec["examined-shares"], 0)
3891             self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
3892         d.addCallback(_after_first_bucket)
3893
3894         d.addCallback(lambda ign: self.render_json(w))
3895         def _check_json(json):
3896             data = simplejson.loads(json)
3897             # grr. json turns all dict keys into strings.
3898             so_far = data["lease-checker"]["cycle-to-date"]
3899             corrupt_shares = so_far["corrupt-shares"]
3900             # it also turns all tuples into lists
3901             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3902         d.addCallback(_check_json)
3903         d.addCallback(lambda ign: self.render1(w))
3904         def _check_html(html):
3905             s = remove_tags(html)
3906             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3907         d.addCallback(_check_html)
3908
3909         def _wait():
3910             return bool(lc.get_state()["last-cycle-finished"] is not None)
3911         d.addCallback(lambda ign: self.poll(_wait))
3912
3913         def _after_first_cycle(ignored):
3914             s = lc.get_state()
3915             last = s["history"][0]
3916             rec = last["space-recovered"]
3917             self.failUnlessEqual(rec["examined-buckets"], 5)
3918             self.failUnlessEqual(rec["examined-shares"], 3)
3919             self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
3920         d.addCallback(_after_first_cycle)
3921         d.addCallback(lambda ign: self.render_json(w))
3922         def _check_json_history(json):
3923             data = simplejson.loads(json)
3924             last = data["lease-checker"]["history"]["0"]
3925             corrupt_shares = last["corrupt-shares"]
3926             self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
3927         d.addCallback(_check_json_history)
3928         d.addCallback(lambda ign: self.render1(w))
3929         def _check_html_history(html):
3930             s = remove_tags(html)
3931             self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
3932         d.addCallback(_check_html_history)
3933
3934         def _cleanup(res):
3935             self.flushLoggedErrors(UnknownMutableContainerVersionError,
3936                                    UnknownImmutableContainerVersionError)
3937             return res
3938         d.addBoth(_cleanup)
3939         return d
3940
3941     def render_json(self, page):
3942         d = self.render1(page, args={"t": ["json"]})
3943         return d
3944
3945 class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
3946
3947     def setUp(self):
3948         self.s = service.MultiService()
3949         self.s.startService()
3950     def tearDown(self):
3951         return self.s.stopService()
3952
3953     def test_no_server(self):
3954         w = StorageStatus(None)
3955         html = w.renderSynchronously()
3956         self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
3957
3958     def test_status(self):
3959         basedir = "storage/WebStatus/status"
3960         fileutil.make_dirs(basedir)
3961         ss = StorageServer(basedir, "\x00" * 20)
3962         ss.setServiceParent(self.s)
3963         w = StorageStatus(ss)
3964         d = self.render1(w)
3965         def _check_html(html):
3966             self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3967             s = remove_tags(html)
3968             self.failUnlessIn("Accepting new shares: Yes", s)
3969             self.failUnlessIn("Reserved space: - 0 B (0)", s)
3970         d.addCallback(_check_html)
3971         d.addCallback(lambda ign: self.render_json(w))
3972         def _check_json(json):
3973             data = simplejson.loads(json)
3974             s = data["stats"]
3975             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
3976             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
3977             self.failUnlessIn("bucket-counter", data)
3978             self.failUnlessIn("lease-checker", data)
3979         d.addCallback(_check_json)
3980         return d
3981
3982     def render_json(self, page):
3983         d = self.render1(page, args={"t": ["json"]})
3984         return d
3985
3986     @mock.patch('allmydata.util.fileutil.get_disk_stats')
3987     def test_status_no_disk_stats(self, mock_get_disk_stats):
3988         mock_get_disk_stats.side_effect = AttributeError()
3989
3990         # Some platforms may have no disk stats API. Make sure the code can handle that
3991         # (test runs on all platforms).
3992         basedir = "storage/WebStatus/status_no_disk_stats"
3993         fileutil.make_dirs(basedir)
3994         ss = StorageServer(basedir, "\x00" * 20)
3995         ss.setServiceParent(self.s)
3996         w = StorageStatus(ss)
3997         html = w.renderSynchronously()
3998         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3999         s = remove_tags(html)
4000         self.failUnlessIn("Accepting new shares: Yes", s)
4001         self.failUnlessIn("Total disk space: ?", s)
4002         self.failUnlessIn("Space Available to Tahoe: ?", s)
4003         self.failUnless(ss.get_available_space() is None)
4004
4005     @mock.patch('allmydata.util.fileutil.get_disk_stats')
4006     def test_status_bad_disk_stats(self, mock_get_disk_stats):
4007         mock_get_disk_stats.side_effect = OSError()
4008
4009         # If the API to get disk stats exists but a call to it fails, then the status should
4010         # show that no shares will be accepted, and get_available_space() should be 0.
4011         basedir = "storage/WebStatus/status_bad_disk_stats"
4012         fileutil.make_dirs(basedir)
4013         ss = StorageServer(basedir, "\x00" * 20)
4014         ss.setServiceParent(self.s)
4015         w = StorageStatus(ss)
4016         html = w.renderSynchronously()
4017         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4018         s = remove_tags(html)
4019         self.failUnlessIn("Accepting new shares: No", s)
4020         self.failUnlessIn("Total disk space: ?", s)
4021         self.failUnlessIn("Space Available to Tahoe: ?", s)
4022         self.failUnlessEqual(ss.get_available_space(), 0)
4023
4024     @mock.patch('allmydata.util.fileutil.get_disk_stats')
4025     def test_status_right_disk_stats(self, mock_get_disk_stats):
4026         GB = 1000000000
4027         total            = 5*GB
4028         free_for_root    = 4*GB
4029         free_for_nonroot = 3*GB
4030         reserved_space   = 1*GB
4031         used = total - free_for_root
4032         avail = max(free_for_nonroot - reserved_space, 0)
4033         mock_get_disk_stats.return_value = {
4034             'total': total,
4035             'free_for_root': free_for_root,
4036             'free_for_nonroot': free_for_nonroot,
4037             'used': used,
4038             'avail': avail,
4039         }
4040
4041         basedir = "storage/WebStatus/status_right_disk_stats"
4042         fileutil.make_dirs(basedir)
4043         ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved_space)
4044         expecteddir = ss.sharedir
4045         ss.setServiceParent(self.s)
4046         w = StorageStatus(ss)
4047         html = w.renderSynchronously()
4048
4049         self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
4050                     mock_get_disk_stats.call_args_list)
4051
4052         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4053         s = remove_tags(html)
4054         self.failUnlessIn("Total disk space: 5.00 GB", s)
4055         self.failUnlessIn("Disk space used: - 1.00 GB", s)
4056         self.failUnlessIn("Disk space free (root): 4.00 GB", s)
4057         self.failUnlessIn("Disk space free (non-root): 3.00 GB", s)
4058         self.failUnlessIn("Reserved space: - 1.00 GB", s)
4059         self.failUnlessIn("Space Available to Tahoe: 2.00 GB", s)
4060         self.failUnlessEqual(ss.get_available_space(), 2*GB)
4061
4062     def test_readonly(self):
4063         basedir = "storage/WebStatus/readonly"
4064         fileutil.make_dirs(basedir)
4065         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
4066         ss.setServiceParent(self.s)
4067         w = StorageStatus(ss)
4068         html = w.renderSynchronously()
4069         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4070         s = remove_tags(html)
4071         self.failUnlessIn("Accepting new shares: No", s)
4072
4073     def test_reserved(self):
4074         basedir = "storage/WebStatus/reserved"
4075         fileutil.make_dirs(basedir)
4076         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4077         ss.setServiceParent(self.s)
4078         w = StorageStatus(ss)
4079         html = w.renderSynchronously()
4080         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4081         s = remove_tags(html)
4082         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4083
4084     def test_huge_reserved(self):
4085         basedir = "storage/WebStatus/reserved"
4086         fileutil.make_dirs(basedir)
4087         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
4088         ss.setServiceParent(self.s)
4089         w = StorageStatus(ss)
4090         html = w.renderSynchronously()
4091         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4092         s = remove_tags(html)
4093         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4094
4095     def test_util(self):
4096         w = StorageStatus(None)
4097         self.failUnlessEqual(w.render_space(None, None), "?")
4098         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
4099         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
4100         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
4101         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
4102         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)