]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
Tests for googlestorage_container.AuthenticationClient.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 import time, os.path, platform, re, simplejson, struct, itertools, urllib
3 from collections import deque
4 from cStringIO import StringIO
5 import thread
6
7 import mock
8 from twisted.trial import unittest
9
10 from twisted.internet import defer
11 from twisted.internet.task import Clock
12 from allmydata.util.deferredutil import for_items
13 from twisted.web.iweb import IBodyProducer, UNKNOWN_LENGTH
14 from twisted.web.http_headers import Headers
15 from twisted.protocols.ftp import FileConsumer
16 from twisted.web.client import ResponseDone
17
18 from twisted.python.failure import Failure
19 from foolscap.logging.log import OPERATIONAL, INFREQUENT, WEIRD
20 from foolscap.logging.web import LogEvent
21
22 from allmydata import interfaces
23 from allmydata.util.assertutil import precondition
24 from allmydata.util import fileutil, hashutil, base32, time_format
25 from allmydata.storage.server import StorageServer
26 from allmydata.storage.backends.null.null_backend import NullBackend
27 from allmydata.storage.backends.disk.disk_backend import DiskBackend
28 from allmydata.storage.backends.disk.immutable import load_immutable_disk_share, \
29      create_immutable_disk_share, ImmutableDiskShare
30 from allmydata.storage.backends.disk.mutable import create_mutable_disk_share, MutableDiskShare
31 from allmydata.storage.backends.cloud.cloud_backend import CloudBackend
32 from allmydata.storage.backends.cloud.cloud_common import CloudError, CloudServiceError, \
33      ContainerItem, ContainerListing
34 from allmydata.storage.backends.cloud import mock_cloud, cloud_common
35 from allmydata.storage.backends.cloud.mock_cloud import MockContainer
36 from allmydata.storage.backends.cloud.openstack import openstack_container
37 from allmydata.storage.backends.cloud.googlestorage import googlestorage_container
38 from allmydata.storage.bucket import BucketWriter, BucketReader
39 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir
40 from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE, SHARETYPE_MUTABLE
41 from allmydata.storage.expiration import ExpirationPolicy
42 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
43      ReadBucketProxy
44 from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
45                                      LayoutInvalid, MDMFSIGNABLEHEADER, \
46                                      SIGNED_PREFIX, MDMFHEADER, \
47                                      MDMFOFFSETS, SDMFSlotWriteProxy, \
48                                      PRIVATE_KEY_SIZE, \
49                                      SIGNATURE_SIZE, \
50                                      VERIFICATION_KEY_SIZE, \
51                                      SHARE_HASH_CHAIN_SIZE
52 from allmydata.interfaces import BadWriteEnablerError, RIStorageServer
53 from allmydata.test.common import LoggingServiceParent, ShouldFailMixin, CrawlerTestMixin, \
54      FakeCanary
55 from allmydata.test.common_util import ReallyEqualMixin
56 from allmydata.test.common_web import WebRenderingMixin
57 from allmydata.test.no_network import NoNetworkServer
58 from allmydata.web.storage import StorageStatus, remove_prefix
59
60
61 class FakeAccount:
62     def __init__(self, server):
63         self.server = server
64     def add_share(self, storage_index, shnum, used_space, sharetype, commit=True):
65         pass
66     def add_or_renew_default_lease(self, storage_index, shnum, commit=True):
67         pass
68     def mark_share_as_stable(self, storage_index, shnum, used_space, commit=True):
69         pass
70
71 class FakeStatsProvider:
72     def count(self, name, delta=1):
73         pass
74     def register_producer(self, producer):
75         pass
76
77
78 class ServiceParentMixin:
79     def setUp(self):
80         self.sparent = LoggingServiceParent()
81         self.sparent.startService()
82         self._lease_secret = itertools.count()
83
84     def tearDown(self):
85         return self.sparent.stopService()
86
87
88 class WorkdirMixin:
89     def workdir(self, name):
90         return os.path.join("storage", self.__class__.__name__, name)
91
92
93 class BucketTestMixin(WorkdirMixin):
94     def make_workdir(self, name):
95         basedir = self.workdir(name)
96         tmpdir = os.path.join(basedir, "tmp")
97         incoming = os.path.join(tmpdir, "bucket")
98         final = os.path.join(basedir, "bucket")
99         fileutil.make_dirs(tmpdir)
100         return incoming, final
101
102     def bucket_writer_closed(self, bw, consumed):
103         pass
104
105     def add_latency(self, category, latency):
106         pass
107
108     def count(self, name, delta=1):
109         pass
110
111
112 class Bucket(BucketTestMixin, unittest.TestCase):
113     def test_create(self):
114         incoming, final = self.make_workdir("test_create")
115         account = FakeAccount(self)
116         d = defer.succeed(None)
117         d.addCallback(lambda ign: create_immutable_disk_share(incoming, final, allocated_data_length=200,
118                                                               storage_index="si1", shnum=0))
119         def _got_share(share):
120             bw = BucketWriter(account, share, FakeCanary())
121             d2 = defer.succeed(None)
122             d2.addCallback(lambda ign: bw.remote_write(0, "a"*25))
123             d2.addCallback(lambda ign: bw.remote_write(25, "b"*25))
124             d2.addCallback(lambda ign: bw.remote_write(50, "c"*25))
125             d2.addCallback(lambda ign: bw.remote_write(75, "d"*7))
126             d2.addCallback(lambda ign: bw.remote_close())
127             return d2
128         d.addCallback(_got_share)
129         return d
130
131     def test_readwrite(self):
132         incoming, final = self.make_workdir("test_readwrite")
133         account = FakeAccount(self)
134         d = defer.succeed(None)
135         d.addCallback(lambda ign: create_immutable_disk_share(incoming, final, allocated_data_length=200,
136                                                               storage_index="si1", shnum=0))
137         def _got_share(share):
138             bw = BucketWriter(account, share, FakeCanary())
139             d2 = defer.succeed(None)
140             d2.addCallback(lambda ign: bw.remote_write(0, "a"*25))
141             d2.addCallback(lambda ign: bw.remote_write(25, "b"*25))
142             d2.addCallback(lambda ign: bw.remote_write(50, "c"*7)) # last block may be short
143             d2.addCallback(lambda ign: bw.remote_close())
144
145             # now read from it
146             def _read(ign):
147                 br = BucketReader(account, share)
148                 d3 = defer.succeed(None)
149                 d3.addCallback(lambda ign: br.remote_read(0, 25))
150                 d3.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
151                 d3.addCallback(lambda ign: br.remote_read(25, 25))
152                 d3.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
153                 d3.addCallback(lambda ign: br.remote_read(50, 7))
154                 d3.addCallback(lambda res: self.failUnlessEqual(res, "c"*7))
155                 return d3
156             d2.addCallback(_read)
157             return d2
158         d.addCallback(_got_share)
159         return d
160
161     def test_read_past_end_of_share_data(self):
162         # test vector for immutable files (hard-coded contents of an immutable share
163         # file):
164
165         containerdata = struct.pack('>LLL', 1, 1, 1)
166
167         # A Tahoe-LAFS storage client would send as the share_data a
168         # complicated string involving hash trees and a URI Extension Block
169         # -- see allmydata/immutable/layout.py . This test, which is
170         # simulating a client, just sends 'a'.
171         share_data = 'a'
172         extra_data = 'b' * ImmutableDiskShare.LEASE_SIZE
173         share_file_data = containerdata + share_data + extra_data
174
175         incoming, final = self.make_workdir("test_read_past_end_of_share_data")
176
177         fileutil.write(final, share_file_data)
178         d = defer.succeed(None)
179         d.addCallback(lambda ign: load_immutable_disk_share(final))
180         def _got_share(share):
181             mockstorageserver = mock.Mock()
182             account = FakeAccount(mockstorageserver)
183
184             # Now read from it.
185             br = BucketReader(account, share)
186
187             d2 = br.remote_read(0, len(share_data))
188             d2.addCallback(lambda res: self.failUnlessEqual(res, share_data))
189
190             # Read past the end of share data to get the cancel secret.
191             read_length = len(share_data) + len(extra_data)
192             d2.addCallback(lambda ign: br.remote_read(0, read_length))
193             d2.addCallback(lambda res: self.failUnlessEqual(res, share_data))
194
195             # Read past the end of share data by 1 byte.
196             d2.addCallback(lambda ign: br.remote_read(0, len(share_data)+1))
197             d2.addCallback(lambda res: self.failUnlessEqual(res, share_data))
198             return d2
199         d.addCallback(_got_share)
200         return d
201
202
203 class RemoteBucket:
204     def __init__(self):
205         self.read_count = 0
206         self.write_count = 0
207
208     def callRemote(self, methname, *args, **kwargs):
209         def _call():
210             meth = getattr(self.target, "remote_" + methname)
211             return meth(*args, **kwargs)
212
213         if methname == "slot_readv":
214             self.read_count += 1
215         if "writev" in methname:
216             self.write_count += 1
217
218         return defer.maybeDeferred(_call)
219
220
221 class BucketProxy(BucketTestMixin, unittest.TestCase):
222     def make_bucket(self, name, size):
223         incoming, final = self.make_workdir(name)
224         account = FakeAccount(self)
225
226         d = defer.succeed(None)
227         d.addCallback(lambda ign: create_immutable_disk_share(incoming, final, size,
228                                                               storage_index="si1", shnum=0))
229         def _got_share(share):
230             bw = BucketWriter(account, share, FakeCanary())
231             rb = RemoteBucket()
232             rb.target = bw
233             return bw, rb, final
234         d.addCallback(_got_share)
235         return d
236
237     def test_create(self):
238         d = self.make_bucket("test_create", 500)
239         def _made_bucket( (bw, rb, sharefile) ):
240             bp = WriteBucketProxy(rb, None,
241                                   data_size=300,
242                                   block_size=10,
243                                   num_segments=5,
244                                   num_share_hashes=3,
245                                   uri_extension_size_max=500)
246             self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
247         d.addCallback(_made_bucket)
248         return d
249
250     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
251         # Let's pretend each share has 100 bytes of data, and that there are
252         # 4 segments (25 bytes each), and 8 shares total. So the two
253         # per-segment merkle trees (crypttext_hash_tree,
254         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
255         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
256         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
257         # long. That should make the whole share:
258         #
259         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
260         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
261
262         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
263
264         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
265                             for i in range(7)]
266         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
267                         for i in range(7)]
268         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
269                         for i in (1,9,13)]
270         uri_extension = "s" + "E"*498 + "e"
271
272         d = self.make_bucket(name, sharesize)
273         def _made_bucket( (bw, rb, sharefile) ):
274             bp = wbp_class(rb, None,
275                            data_size=95,
276                            block_size=25,
277                            num_segments=4,
278                            num_share_hashes=3,
279                            uri_extension_size_max=len(uri_extension))
280
281             d2 = bp.put_header()
282             d2.addCallback(lambda ign: bp.put_block(0, "a"*25))
283             d2.addCallback(lambda ign: bp.put_block(1, "b"*25))
284             d2.addCallback(lambda ign: bp.put_block(2, "c"*25))
285             d2.addCallback(lambda ign: bp.put_block(3, "d"*20))
286             d2.addCallback(lambda ign: bp.put_crypttext_hashes(crypttext_hashes))
287             d2.addCallback(lambda ign: bp.put_block_hashes(block_hashes))
288             d2.addCallback(lambda ign: bp.put_share_hashes(share_hashes))
289             d2.addCallback(lambda ign: bp.put_uri_extension(uri_extension))
290             d2.addCallback(lambda ign: bp.close())
291
292             d2.addCallback(lambda ign: load_immutable_disk_share(sharefile))
293             return d2
294         d.addCallback(_made_bucket)
295
296         # now read everything back
297         def _start_reading(share):
298             br = BucketReader(FakeAccount(self), share)
299             rb = RemoteBucket()
300             rb.target = br
301             server = NoNetworkServer("abc", None)
302             rbp = rbp_class(rb, server, storage_index="")
303             self.failUnlessIn("to peer", repr(rbp))
304             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
305
306             d2 = defer.succeed(None)
307             d2.addCallback(lambda ign: rbp.get_block_data(0, 25, 25))
308             d2.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
309             d2.addCallback(lambda ign: rbp.get_block_data(1, 25, 25))
310             d2.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
311             d2.addCallback(lambda ign: rbp.get_block_data(2, 25, 25))
312             d2.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
313             d2.addCallback(lambda ign: rbp.get_block_data(3, 25, 20))
314             d2.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
315
316             d2.addCallback(lambda ign: rbp.get_crypttext_hashes())
317             d2.addCallback(lambda res: self.failUnlessEqual(res, crypttext_hashes))
318             d2.addCallback(lambda ign: rbp.get_block_hashes(set(range(4))))
319             d2.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
320             d2.addCallback(lambda ign: rbp.get_share_hashes())
321             d2.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
322             d2.addCallback(lambda ign: rbp.get_uri_extension())
323             d2.addCallback(lambda res: self.failUnlessEqual(res, uri_extension))
324             return d2
325         d.addCallback(_start_reading)
326         return d
327
328     def test_readwrite_v1(self):
329         return self._do_test_readwrite("test_readwrite_v1",
330                                        0x24, WriteBucketProxy, ReadBucketProxy)
331
332     def test_readwrite_v2(self):
333         return self._do_test_readwrite("test_readwrite_v2",
334                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
335
336
337 class Seek(unittest.TestCase, WorkdirMixin):
338     def test_seek(self):
339         basedir = self.workdir("test_seek")
340         fileutil.make_dirs(basedir)
341         filename = os.path.join(basedir, "testfile")
342         fileutil.write(filename, "start")
343
344         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
345         # files. mode="a" preserves previous contents but does not allow
346         # seeking-to-create-holes. mode="r+" allows both.
347         f = open(filename, "rb+")
348         try:
349             f.seek(100)
350             f.write("100")
351         finally:
352             f.close()
353
354         filelen = os.stat(filename).st_size
355         self.failUnlessEqual(filelen, 100+3)
356         f2 = open(filename, "rb")
357         try:
358             self.failUnlessEqual(f2.read(5), "start")
359         finally:
360             f2.close()
361
362
363 class CloudCommon(unittest.TestCase, ShouldFailMixin, WorkdirMixin):
364     def test_concat(self):
365         x = deque([[1, 2], (), xrange(3, 6)])
366         self.failUnlessEqual(cloud_common.concat(x), [1, 2, 3, 4, 5])
367
368     def test_list_objects_truncated_badly(self):
369         # If a container misbehaves by not producing listings with increasing keys,
370         # that should cause an incident.
371         basedir = self.workdir("test_list_objects_truncated_badly")
372         fileutil.make_dirs(basedir)
373
374         class BadlyTruncatingMockContainer(MockContainer):
375             def _list_some_objects(self, container_name, prefix='', marker=None):
376                 contents = [ContainerItem("", None, "", 0, None, None)]
377                 return defer.succeed(ContainerListing(container_name, "", "", 0, "true", contents))
378
379         s = {"level": 0}
380         def call_log_msg(*args, **kwargs):
381             s["level"] = max(s["level"], kwargs["level"])
382         self.patch(cloud_common.log, 'msg', call_log_msg)
383
384         container = BadlyTruncatingMockContainer(basedir)
385         d = self.shouldFail(AssertionError,
386                             'truncated badly', "Not making progress in list_objects",
387                             lambda: container.list_objects(prefix=""))
388         d.addCallback(lambda ign: self.failUnless(s["level"] >= WEIRD, s["level"]))
389         return d
390
391     def test_cloud_share_base(self):
392         basedir = self.workdir("test_cloud_share_base")
393         fileutil.make_dirs(basedir)
394
395         container = MockContainer(basedir)
396         base = cloud_common.CloudShareBase(container, "si1", 1)
397         base._data_length = 42
398         base._total_size = 100
399
400         self.failUnlessIn("CloudShareBase", repr(base))
401         self.failUnlessEqual(base.get_storage_index(), "si1")
402         self.failUnlessEqual(base.get_storage_index_string(), "onutc")
403         self.failUnlessEqual(base.get_shnum(), 1)
404         self.failUnlessEqual(base.get_data_length(), 42)
405         self.failUnlessEqual(base.get_size(), 100)
406         self.failUnlessEqual(os.path.normpath(base._get_path()),
407                              os.path.normpath(os.path.join(basedir, "shares", "on", "onutc", "1")))
408
409     # TODO: test cloud_common.delete_chunks
410
411
412 class OpenStackCloudBackend(ServiceParentMixin, WorkdirMixin, ShouldFailMixin, unittest.TestCase):
413     PROVIDER = "rackspace.com"
414     AUTH_SERVICE_URL = "auth_service_url"
415     USERNAME = "username"
416     CONTAINER = "container"
417     API_KEY = "api_key"
418     PUBLIC_STORAGE_URL = "https://public.storage.example/a"
419     INTERNAL_STORAGE_URL = "https://internal.storage.example/a"
420     AUTH_TOKEN = "auth_token"
421
422     TEST_SHARE_PREFIX = "shares/te/"
423     TEST_SHARE_NAME = TEST_SHARE_PREFIX + "test"
424     TEST_SHARE_MODIFIED = "2013-02-14T21:30:00Z"
425     TEST_SHARE_DATA = "share"
426     TEST_SHARE_HASH = "sharehash"
427     TEST_LISTING_JSON = ('[{"name": "%s", "bytes": %d, "last_modified": "%s", "hash": "%s"}]'
428                          % (TEST_SHARE_NAME, len(TEST_SHARE_DATA), TEST_SHARE_MODIFIED, TEST_SHARE_HASH))
429
430     def _patch_agent(self):
431         self._requests = {}
432
433         class MockResponse(object):
434             def __init__(mock_self, response_code, response_phrase, response_headers, response_body):
435                 mock_self.code = response_code
436                 mock_self.phrase = response_phrase
437                 mock_self.headers = Headers(response_headers)
438                 mock_self._body = response_body
439
440             def deliverBody(mock_self, protocol):
441                 protocol.dataReceived(mock_self._body)
442                 protocol.connectionLost(Failure(ResponseDone()))
443
444         class MockAgent(object):
445             def __init__(mock_self, reactor):
446                 pass
447
448             def request(mock_self, method, url, headers, bodyProducer=None):
449                 self.failUnlessIn((method, url), self._requests)
450                 (expected_headers, expected_body,
451                  response_code, response_phrase, response_headers, response_body) = self._requests[(method, url)]
452
453                 self.failUnlessIsInstance(headers, Headers)
454                 for (key, values) in expected_headers.iteritems():
455                     self.failUnlessEqual(headers.getRawHeaders(key), values, str((headers, key)))
456
457                 d = defer.succeed(None)
458                 if bodyProducer is None:
459                     self.failUnlessEqual(expected_body, "")
460                 else:
461                     self.failUnless(IBodyProducer.providedBy(bodyProducer))
462                     body = StringIO()
463                     d = bodyProducer.startProducing(FileConsumer(body))
464                     d.addCallback(lambda ign: self.failUnlessEqual(body.getvalue(), expected_body))
465                     d.addCallback(lambda ign: self.failUnlessIn(bodyProducer.length,
466                                                                 (len(expected_body), UNKNOWN_LENGTH)))
467                 d.addCallback(lambda ign: MockResponse(response_code, response_phrase, response_headers, response_body))
468                 return d
469
470         self.patch(openstack_container, 'Agent', MockAgent)
471
472     def _set_request(self, method, url, expected_headers, expected_body,
473                            response_code, response_phrase, response_headers, response_body):
474         precondition(isinstance(expected_headers, dict), expected_headers)
475         precondition(isinstance(response_headers, dict), response_headers)
476         self._requests[(method, url)] = (expected_headers, expected_body,
477                                          response_code, response_phrase, response_headers, response_body)
478
479     def _make_server(self, name):
480         # This is for the v1 auth protocol.
481         #self._set_request('GET', self.AUTH_SERVICE_URL, {
482         #                    'X-Auth-User': [self.USERNAME],
483         #                    'X-Auth-Key': [self.API_KEY],
484         #                  }, "",
485         #                  204, "No Content", {
486         #                    'X-Storage-Url': [self.STORAGE_URL],
487         #                    'X-Auth-Token': [self.AUTH_TOKEN],
488         #                  }, "")
489
490         self._set_request('POST', self.AUTH_SERVICE_URL, {
491                             'Content-Type': ['application/json'],
492                           }, '{"auth": {"RAX-KSKEY:apiKeyCredentials": {"username": "username", "apiKey": "api_key"}}}',
493                           200, "OK", {
494                           }, '''
495                           {"access": {"token": {"id": "%s"},
496                                       "serviceCatalog": [{"endpoints": [{"region": "FOO", "publicURL": "%s", "internalURL": "%s"}],
497                                                           "type": "object-store"}],
498                                       "user": {"RAX-AUTH:defaultRegion": "", "name": "%s"}
499                                      }
500                           }''' % (self.AUTH_TOKEN, self.PUBLIC_STORAGE_URL, self.INTERNAL_STORAGE_URL, self.USERNAME))
501
502         storage_config = {
503             'openstack.provider': self.PROVIDER,
504             'openstack.url': self.AUTH_SERVICE_URL,
505             'openstack.username': self.USERNAME,
506             'openstack.container': self.CONTAINER,
507         }
508         from allmydata.node import _None
509         class MockConfig(object):
510             def get_config(mock_self, section, option, default=_None, boolean=False):
511                 self.failUnlessEqual(section, "storage")
512                 if default is _None:
513                     self.failUnlessIn(option, storage_config)
514                 return storage_config.get(option, default)
515             def get_private_config(mock_self, filename):
516                 return fileutil.read(os.path.join(privatedir, filename))
517
518         self.workdir = self.workdir(name)
519         privatedir = os.path.join(self.workdir, "private")
520         fileutil.make_dirs(privatedir)
521         fileutil.write(os.path.join(privatedir, "openstack_api_key"), self.API_KEY)
522
523         self.config = MockConfig()
524         self.clock = Clock()
525         self.container = openstack_container.configure_openstack_container(self.workdir, self.config)
526         backend = CloudBackend(self.container)
527         self.server = StorageServer("\x00" * 20, backend, self.workdir,
528                                     stats_provider=FakeStatsProvider(), clock=self.clock)
529         self.server.setServiceParent(self.sparent)
530         self.failUnless(self.server.backend._container is self.container,
531                         (self.server.backend._container, self.container))
532
533     def _shutdown(self, res):
534         # avoid unclean reactor error
535         self.container._auth_client.shutdown()
536         return res
537
538
539     def test_authentication_client(self):
540         self._patch_agent()
541         self._make_server("test_authentication_client")
542
543         d = self.container._auth_client.get_auth_info()
544         def _check(auth_info):
545             self.failUnlessEqual(auth_info.public_storage_url, self.PUBLIC_STORAGE_URL)
546             self.failUnlessEqual(auth_info.internal_storage_url, self.INTERNAL_STORAGE_URL)
547             self.failUnlessEqual(auth_info.auth_token, self.AUTH_TOKEN)
548         d.addCallback(_check)
549         d.addBoth(self._shutdown)
550         return d
551
552     def test_openstack_container(self):
553         self._patch_agent()
554
555         # Set up the requests that we expect to receive.
556         self._set_request('GET', "/".join((self.PUBLIC_STORAGE_URL, self.CONTAINER, "unexpected")), {
557                             'X-Auth-Token': [self.AUTH_TOKEN],
558                           }, "",
559                           404, "Not Found", {}, "")
560
561         self._set_request('PUT', "/".join((self.PUBLIC_STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), {
562                             'X-Auth-Token': [self.AUTH_TOKEN],
563                             'Content-Type': ['application/octet-stream'],
564                             #'Content-Length': [len(self.TEST_SHARE_DATA)],
565                           }, self.TEST_SHARE_DATA,
566                           204, "No Content", {}, "")
567
568         quoted_prefix = urllib.quote(self.TEST_SHARE_PREFIX, safe='')
569         self._set_request('GET', "%s/%s?format=json&prefix=%s"
570                                    % (self.PUBLIC_STORAGE_URL, self.CONTAINER, quoted_prefix), {
571                             'X-Auth-Token': [self.AUTH_TOKEN],
572                           }, "",
573                           200, "OK", {}, self.TEST_LISTING_JSON)
574
575         self._set_request('GET', "/".join((self.PUBLIC_STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), {
576                             'X-Auth-Token': [self.AUTH_TOKEN],
577                           }, "",
578                           200, "OK", {}, self.TEST_SHARE_DATA)
579
580         self._make_server("test_openstack_container")
581
582         d = defer.succeed(None)
583         d.addCallback(lambda ign: self.shouldFail(CloudError, "404", None,
584                                                   self.container.get_object, "unexpected"))
585
586         d.addCallback(lambda ign: self.container.put_object(self.TEST_SHARE_NAME, self.TEST_SHARE_DATA))
587         d.addCallback(lambda res: self.failUnless(res is None, res))
588
589         d.addCallback(lambda ign: self.container.list_objects(prefix=self.TEST_SHARE_PREFIX))
590         def _check_listing(listing):
591             self.failUnlessEqual(listing.name, self.CONTAINER)
592             self.failUnlessEqual(listing.prefix, self.TEST_SHARE_PREFIX)
593             self.failUnlessEqual(listing.is_truncated, "false")
594             self.failUnlessEqual(len(listing.contents), 1)
595             item = listing.contents[0]
596             self.failUnlessEqual(item.key, self.TEST_SHARE_NAME)
597             self.failUnlessEqual(item.modification_date, self.TEST_SHARE_MODIFIED)
598             self.failUnlessEqual(item.etag, self.TEST_SHARE_HASH)
599             self.failUnlessEqual(item.size, len(self.TEST_SHARE_DATA))
600         d.addCallback(_check_listing)
601
602         d.addCallback(lambda ign: self.container.get_object(self.TEST_SHARE_NAME))
603         d.addCallback(lambda res: self.failUnlessEqual(res, self.TEST_SHARE_DATA))
604
605         def _set_up_delete(ign):
606             self._set_request('DELETE', "/".join((self.PUBLIC_STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), {
607                                 'X-Auth-Token': [self.AUTH_TOKEN],
608                               }, "",
609                               204, "No Content", {}, "")
610
611             # this changes the response to the request set up above
612             self._set_request('GET', "%s/%s?format=json&prefix=%s"
613                                        % (self.PUBLIC_STORAGE_URL, self.CONTAINER, quoted_prefix), {
614                                 'X-Auth-Token': [self.AUTH_TOKEN],
615                               }, "",
616                               200, "OK", {}, "[]")
617         d.addCallback(_set_up_delete)
618
619         d.addCallback(lambda ign: self.container.delete_object(self.TEST_SHARE_NAME))
620         d.addCallback(lambda res: self.failUnless(res is None, res))
621
622         d.addCallback(lambda ign: self.container.list_objects(prefix=self.TEST_SHARE_PREFIX))
623         def _check_listing_after_delete(listing):
624             self.failUnlessEqual(listing.name, self.CONTAINER)
625             self.failUnlessEqual(listing.prefix, self.TEST_SHARE_PREFIX)
626             self.failUnlessEqual(listing.is_truncated, "false")
627             self.failUnlessEqual(len(listing.contents), 0)
628         d.addCallback(_check_listing_after_delete)
629
630         d.addBoth(self._shutdown)
631         return d
632
633
634
635 class GoogleStorageBackend(ShouldFailMixin, unittest.TestCase):
636     """
637     Tests for the Google Storage API backend.
638
639     All code references in docstrings/comments are to classes/functions in
640     allmydata.storage.backends.cloud.googlestorage.googlestorage_container
641     unless noted otherwise.
642     """
643
644     def test_authentication_credentials(self):
645         """
646         AuthenticationClient.get_authorization_header() initializes a
647         SignedJwtAssertionCredentials with the correct parameters.
648         """
649         # Somewhat fragile tests, but better than nothing.
650         auth = googlestorage_container.AuthenticationClient("u@example.com", "xxx123")
651         self.assertEqual(auth._credentials.service_account_name, "u@example.com")
652         self.assertEqual(auth._credentials.private_key, "xxx123".encode("base64").strip())
653
654     def test_authentication_initial(self):
655         """
656         When AuthenticationClient() is created, it refreshes its access token.
657         """
658         from oauth2client.client import SignedJwtAssertionCredentials
659         auth = googlestorage_container.AuthenticationClient(
660             "u@example.com", "xxx123",
661             _credentialsClass=mock.create_autospec(SignedJwtAssertionCredentials),
662             _deferToThread=defer.maybeDeferred)
663         self.assertEqual(auth._credentials.refresh.call_count, 1)
664
665     def test_authentication_expired(self):
666         """
667         AuthenticationClient.get_authorization_header() refreshes its
668         credentials if the access token has expired.
669         """
670         from oauth2client.client import SignedJwtAssertionCredentials
671         auth = googlestorage_container.AuthenticationClient(
672             "u@example.com", "xxx123",
673             _credentialsClass=mock.create_autospec(SignedJwtAssertionCredentials),
674             _deferToThread=defer.maybeDeferred)
675         auth._credentials.apply = lambda d: d.__setitem__('Authorization', 'xxx')
676         auth._credentials.access_token_expired = True
677         auth.get_authorization_header()
678         self.assertEqual(auth._credentials.refresh.call_count, 2)
679
680     def test_authentication_no_refresh(self):
681         """
682         AuthenticationClient.get_authorization_header() does not refresh its
683         credentials if the access token has not expired.
684         """
685         from oauth2client.client import SignedJwtAssertionCredentials
686         auth = googlestorage_container.AuthenticationClient(
687             "u@example.com", "xxx123",
688             _credentialsClass=mock.create_autospec(SignedJwtAssertionCredentials),
689             _deferToThread=defer.maybeDeferred)
690         auth._credentials.apply = lambda d: d.__setitem__('Authorization', 'xxx')
691         auth._credentials.access_token_expired = False
692         auth.get_authorization_header()
693         self.assertEqual(auth._credentials.refresh.call_count, 1)
694
695     def test_authentication_header(self):
696         """
697         AuthenticationClient.get_authorization_header() returns a value to be
698         used for the Authorization header.
699         """
700         from oauth2client.client import SignedJwtAssertionCredentials
701         class NoNetworkCreds(SignedJwtAssertionCredentials):
702             def refresh(self, http):
703                 self.access_token = "xxx"
704         auth = googlestorage_container.AuthenticationClient(
705             "u@example.com", "xxx123",
706             _credentialsClass=NoNetworkCreds,
707             _deferToThread=defer.maybeDeferred)
708         result = []
709         auth.get_authorization_header().addCallback(result.append)
710         self.assertEqual(result, ["Bearer xxx"])
711
712     def test_authentication_one_refresh(self):
713         """
714         AuthenticationClient._refresh_if_necessary() only runs one refresh
715         request at a time.
716         """
717         # The second call shouldn't happen until the first Deferred fires!
718         results = [defer.Deferred(), defer.succeed(None)]
719         first = results[0]
720
721         def fakeDeferToThread(f, *args):
722             return results.pop(0)
723
724         from oauth2client.client import SignedJwtAssertionCredentials
725         auth = googlestorage_container.AuthenticationClient(
726             "u@example.com", "xxx123",
727             _credentialsClass=mock.create_autospec(SignedJwtAssertionCredentials),
728             _deferToThread=fakeDeferToThread)
729         # Initial authorization call happens...
730         self.assertEqual(len(results), 1)
731         # ... and still isn't finished, so next one doesn't run yet:
732         auth._refresh_if_necessary(force=True)
733         self.assertEqual(len(results), 1)
734         # When first one finishes, second one can run:
735         first.callback(None)
736         self.assertEqual(len(results), 0)
737
738     def test_authentication_refresh_call(self):
739         """
740         AuthenticationClient._refresh_if_necessary() runs the
741         authentication refresh in a thread, since it blocks, with a
742         httplib2.Http instance.
743         """
744         from httplib2 import Http
745         from oauth2client.client import SignedJwtAssertionCredentials
746         class NoNetworkCreds(SignedJwtAssertionCredentials):
747             def refresh(cred_self, http):
748                 cred_self.access_token = "xxx"
749                 self.assertIsInstance(http, Http)
750                 self.thread = thread.get_ident()
751         auth = googlestorage_container.AuthenticationClient(
752             "u@example.com", "xxx123",
753             _credentialsClass=NoNetworkCreds)
754
755         def gotResult(ignore):
756             self.assertNotEqual(thread.get_ident(), self.thread)
757         return auth.get_authorization_header().addCallback(gotResult)
758
759
760 class ServerMixin:
761     def allocate(self, account, storage_index, sharenums, size, canary=None):
762         # These secrets are not used, but clients still provide them.
763         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
764         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
765         if not canary:
766             canary = FakeCanary()
767         return defer.maybeDeferred(account.remote_allocate_buckets,
768                                    storage_index, renew_secret, cancel_secret,
769                                    sharenums, size, canary)
770
771     def _write_and_close(self, ign, i, bw):
772         d = defer.succeed(None)
773         d.addCallback(lambda ign: bw.remote_write(0, "%25d" % i))
774         d.addCallback(lambda ign: bw.remote_close())
775         return d
776
777     def _close_writer(self, ign, i, bw):
778         return bw.remote_close()
779
780     def _abort_writer(self, ign, i, bw):
781         return bw.remote_abort()
782
783
784 class ServerTest(ServerMixin, ShouldFailMixin):
785     def test_create(self):
786         server = self.create("test_create")
787         aa = server.get_accountant().get_anonymous_account()
788         self.failUnless(RIStorageServer.providedBy(aa), aa)
789
790     def test_declares_fixed_1528(self):
791         server = self.create("test_declares_fixed_1528")
792         aa = server.get_accountant().get_anonymous_account()
793
794         ver = aa.remote_get_version()
795         sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
796         self.failUnless(sv1.get('prevents-read-past-end-of-share-data'), sv1)
797
798     def test_has_immutable_readv(self):
799         server = self.create("test_has_immutable_readv")
800         aa = server.get_accountant().get_anonymous_account()
801
802         ver = aa.remote_get_version()
803         sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
804         self.failUnless(sv1.get('has-immutable-readv'), sv1)
805
806         # TODO: test that we actually support it
807
808     def test_declares_maximum_share_sizes(self):
809         server = self.create("test_declares_maximum_share_sizes")
810         aa = server.get_accountant().get_anonymous_account()
811
812         ver = aa.remote_get_version()
813         sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
814         self.failUnlessIn('maximum-immutable-share-size', sv1)
815         self.failUnlessIn('maximum-mutable-share-size', sv1)
816
817     def test_declares_available_space(self):
818         ss = self.create("test_declares_available_space")
819         ver = ss.remote_get_version()
820         sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
821         self.failUnlessIn('available-space', sv1)
822
823     def test_create_share(self):
824         server = self.create("test_create_share")
825         backend = server.backend
826         aa = server.get_accountant().get_anonymous_account()
827
828         d = self.allocate(aa, "si1", [0], 75)
829         def _allocated( (already, writers) ):
830             self.failUnlessEqual(already, set())
831             self.failUnlessEqual(set(writers.keys()), set([0]))
832
833             d2 = defer.succeed(None)
834             d2.addCallback(lambda ign: writers[0].remote_write(0, "data"))
835             d2.addCallback(lambda ign: writers[0].remote_close())
836
837             d2.addCallback(lambda ign: backend.get_shareset("si1").get_share(0))
838             d2.addCallback(lambda share: self.failUnless(interfaces.IShareForReading.providedBy(share)))
839
840             d2.addCallback(lambda ign: backend.get_shareset("si1").get_shares())
841             def _check( (shares, corrupted) ):
842                 self.failUnlessEqual(len(shares), 1, str(shares))
843                 self.failUnlessEqual(len(corrupted), 0, str(corrupted))
844             d2.addCallback(_check)
845             return d2
846         d.addCallback(_allocated)
847         return d
848
849     def test_dont_overfill_dirs(self):
850         """
851         This test asserts that if you add a second share whose storage index
852         share lots of leading bits with an extant share (but isn't the exact
853         same storage index), this won't add an entry to the share directory.
854         """
855         server = self.create("test_dont_overfill_dirs")
856         aa = server.get_accountant().get_anonymous_account()
857         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
858                                 "shares")
859
860         def _write_and_get_children( (already, writers) ):
861             d = for_items(self._write_and_close, writers)
862             d.addCallback(lambda ign: sorted(fileutil.listdir(storedir)))
863             return d
864
865         d = self.allocate(aa, "storageindex", [0], 25)
866         d.addCallback(_write_and_get_children)
867
868         def _got_children(children_of_storedir):
869             # Now store another one under another storageindex that has leading
870             # chars the same as the first storageindex.
871             d2 = self.allocate(aa, "storageindey", [0], 25)
872             d2.addCallback(_write_and_get_children)
873             d2.addCallback(lambda res: self.failUnlessEqual(res, children_of_storedir))
874             return d2
875         d.addCallback(_got_children)
876         return d
877
878     def OFF_test_allocate(self):
879         server = self.create("test_allocate")
880         aa = server.get_accountant().get_anonymous_account()
881
882         self.failUnlessEqual(aa.remote_get_buckets("allocate"), {})
883
884         already,writers = self.allocate(aa, "allocate", [0,1,2], 75)
885         self.failUnlessEqual(already, set())
886         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
887
888         # while the buckets are open, they should not count as readable
889         self.failUnlessEqual(aa.remote_get_buckets("allocate"), {})
890
891         # close the buckets
892         for i,wb in writers.items():
893             wb.remote_write(0, "%25d" % i)
894             wb.remote_close()
895             # aborting a bucket that was already closed is a no-op
896             wb.remote_abort()
897
898         # now they should be readable
899         b = aa.remote_get_buckets("allocate")
900         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
901         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
902         b_str = str(b[0])
903         self.failUnlessIn("BucketReader", b_str)
904         self.failUnlessIn("mfwgy33dmf2g 0", b_str)
905
906         # now if we ask about writing again, the server should offer those
907         # three buckets as already present. It should offer them even if we
908         # don't ask about those specific ones.
909         already,writers = self.allocate(aa, "allocate", [2,3,4], 75)
910         self.failUnlessEqual(already, set([0,1,2]))
911         self.failUnlessEqual(set(writers.keys()), set([3,4]))
912
913         # while those two buckets are open for writing, the server should
914         # refuse to offer them to uploaders
915
916         already2,writers2 = self.allocate(aa, "allocate", [2,3,4,5], 75)
917         self.failUnlessEqual(already2, set([0,1,2]))
918         self.failUnlessEqual(set(writers2.keys()), set([5]))
919
920         # aborting the writes should remove the tempfiles
921         for i,wb in writers2.items():
922             wb.remote_abort()
923         already2,writers2 = self.allocate(aa, "allocate", [2,3,4,5], 75)
924         self.failUnlessEqual(already2, set([0,1,2]))
925         self.failUnlessEqual(set(writers2.keys()), set([5]))
926
927         for i,wb in writers2.items():
928             wb.remote_abort()
929         for i,wb in writers.items():
930             wb.remote_abort()
931
932     # The following share file content was generated with
933     # storage.immutable.ShareFile from Tahoe-LAFS v1.8.2
934     # with share data == 'a'. The total size of this input
935     # is 85 bytes.
936     shareversionnumber = '\x00\x00\x00\x01'
937     sharedatalength = '\x00\x00\x00\x01'
938     numberofleases = '\x00\x00\x00\x01'
939     shareinputdata = 'a'
940     ownernumber = '\x00\x00\x00\x00'
941     renewsecret  = 'x'*32
942     cancelsecret = 'y'*32
943     expirationtime = '\x00(\xde\x80'
944     nextlease = ''
945     containerdata = shareversionnumber + sharedatalength + numberofleases
946     client_data = (shareinputdata + ownernumber + renewsecret +
947                    cancelsecret + expirationtime + nextlease)
948     share_data = containerdata + client_data
949     testnodeid = 'testnodeidxxxxxxxxxx'
950
951     def test_write_and_read_share(self):
952         """
953         Write a new share, read it, and test the server and backends'
954         handling of simultaneous and successive attempts to write the same
955         share.
956         """
957         server = self.create("test_write_and_read_share")
958         aa = server.get_accountant().get_anonymous_account()
959         canary = FakeCanary()
960
961         shareset = server.backend.get_shareset('teststorage_index')
962         self.failIf(shareset.has_incoming(0))
963
964         # Populate incoming with the sharenum: 0.
965         d = aa.remote_allocate_buckets('teststorage_index', 'x'*32, 'y'*32, frozenset((0,)), 1, canary)
966         def _allocated( (already, writers) ):
967             # This is a white-box test: Inspect incoming and fail unless the sharenum: 0 is listed there.
968             self.failUnless(shareset.has_incoming(0))
969
970             # Attempt to create a second share writer with the same sharenum.
971             d2 = aa.remote_allocate_buckets('teststorage_index', 'x'*32, 'y'*32, frozenset((0,)), 1, canary)
972
973             # Show that no sharewriter results from a remote_allocate_buckets
974             # with the same si and sharenum, until BucketWriter.remote_close()
975             # has been called.
976             d2.addCallback(lambda (already2, writers2): self.failIf(writers2))
977
978             # Test allocated size.
979             d2.addCallback(lambda ign: server.allocated_size())
980             d2.addCallback(lambda space: self.failUnlessEqual(space, 1))
981
982             # Write 'a' to shnum 0. Only tested together with close and read.
983             d2.addCallback(lambda ign: writers[0].remote_write(0, 'a'))
984
985             # Preclose: Inspect final, failUnless nothing there.
986             d2.addCallback(lambda ign: server.backend.get_shareset('teststorage_index').get_shares())
987             def _check( (shares, corrupted) ):
988                 self.failUnlessEqual(len(shares), 0, str(shares))
989                 self.failUnlessEqual(len(corrupted), 0, str(corrupted))
990             d2.addCallback(_check)
991
992             d2.addCallback(lambda ign: writers[0].remote_close())
993
994             # Postclose: fail unless written data is in final.
995             d2.addCallback(lambda ign: server.backend.get_shareset('teststorage_index').get_shares())
996             def _got_shares( (sharesinfinal, corrupted) ):
997                 self.failUnlessEqual(len(sharesinfinal), 1, str(sharesinfinal))
998                 self.failUnlessEqual(len(corrupted), 0, str(corrupted))
999
1000                 d3 = defer.succeed(None)
1001                 d3.addCallback(lambda ign: sharesinfinal[0].read_share_data(0, 73))
1002                 d3.addCallback(lambda contents: self.failUnlessEqual(contents, self.shareinputdata))
1003                 return d3
1004             d2.addCallback(_got_shares)
1005
1006             # Exercise the case that the share we're asking to allocate is
1007             # already (completely) uploaded.
1008             d2.addCallback(lambda ign: aa.remote_allocate_buckets('teststorage_index',
1009                                                                   'x'*32, 'y'*32, set((0,)), 1, canary))
1010             return d2
1011         d.addCallback(_allocated)
1012         return d
1013
1014     def test_read_old_share(self):
1015         """
1016         This tests whether the code correctly finds and reads shares written out by
1017         pre-pluggable-backends (Tahoe-LAFS <= v1.8.2) servers. There is a similar test
1018         in test_download, but that one is from the perspective of the client and exercises
1019         a deeper stack of code. This one is for exercising just the StorageServer and backend.
1020         """
1021         server = self.create("test_read_old_share")
1022         aa = server.get_accountant().get_anonymous_account()
1023
1024         # Contruct a file with the appropriate contents.
1025         datalen = len(self.share_data)
1026         sharedir = server.backend.get_shareset('teststorage_index')._get_sharedir()
1027         fileutil.make_dirs(sharedir)
1028         fileutil.write(os.path.join(sharedir, "0"), self.share_data)
1029
1030         # Now begin the test.
1031         d = aa.remote_get_buckets('teststorage_index')
1032         def _got_buckets(bs):
1033             self.failUnlessEqual(len(bs), 1)
1034             self.failUnlessIn(0, bs)
1035             b = bs[0]
1036
1037             d2 = defer.succeed(None)
1038             d2.addCallback(lambda ign: b.remote_read(0, datalen))
1039             d2.addCallback(lambda res: self.failUnlessEqual(res, self.shareinputdata))
1040
1041             # If you try to read past the end you get as much input data as is there.
1042             d2.addCallback(lambda ign: b.remote_read(0, datalen+20))
1043             d2.addCallback(lambda res: self.failUnlessEqual(res, self.shareinputdata))
1044
1045             # If you start reading past the end of the file you get the empty string.
1046             d2.addCallback(lambda ign: b.remote_read(datalen+1, 3))
1047             d2.addCallback(lambda res: self.failUnlessEqual(res, ''))
1048             return d2
1049         d.addCallback(_got_buckets)
1050         return d
1051
1052     def test_bad_container_version(self):
1053         server = self.create("test_bad_container_version")
1054         aa = server.get_accountant().get_anonymous_account()
1055
1056         d = self.allocate(aa, "allocate", [0,1], 20)
1057         def _allocated( (already, writers) ):
1058             d2 = defer.succeed(None)
1059             d2.addCallback(lambda ign: writers[0].remote_write(0, "\xff"*10))
1060             d2.addCallback(lambda ign: writers[0].remote_close())
1061             d2.addCallback(lambda ign: writers[1].remote_write(1, "\xaa"*10))
1062             d2.addCallback(lambda ign: writers[1].remote_close())
1063             return d2
1064         d.addCallback(_allocated)
1065
1066         d.addCallback(lambda ign: server.backend.get_shareset("allocate").get_share(0))
1067         def _write_invalid_version(share0):
1068             f = open(share0._get_path(), "rb+")
1069             try:
1070                 f.seek(0)
1071                 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
1072             finally:
1073                 f.close()
1074         d.addCallback(_write_invalid_version)
1075
1076         # This should ignore the corrupted share; see ticket #1566.
1077         d.addCallback(lambda ign: aa.remote_get_buckets("allocate"))
1078         d.addCallback(lambda b: self.failUnlessEqual(set(b.keys()), set([1])))
1079
1080         # Also if there are only corrupted shares.
1081         d.addCallback(lambda ign: server.backend.get_shareset("allocate").get_share(1))
1082         d.addCallback(lambda share: share.unlink())
1083         d.addCallback(lambda ign: aa.remote_get_buckets("allocate"))
1084         d.addCallback(lambda b: self.failUnlessEqual(b, {}))
1085         return d
1086
1087     def test_advise_corruption(self):
1088         server = self.create("test_advise_corruption")
1089         aa = server.get_accountant().get_anonymous_account()
1090
1091         si0_s = base32.b2a("si0")
1092         aa.remote_advise_corrupt_share("immutable", "si0", 0,
1093                                        "This share smells funny.\n")
1094         reportdir = os.path.join(server._statedir, "corruption-advisories")
1095         self.failUnless(os.path.exists(reportdir), reportdir)
1096         reports = fileutil.listdir(reportdir)
1097         self.failUnlessEqual(len(reports), 1)
1098         report_si0 = reports[0]
1099         self.failUnlessIn(si0_s, str(report_si0))
1100         report = fileutil.read(os.path.join(reportdir, report_si0))
1101
1102         self.failUnlessIn("type: immutable", report)
1103         self.failUnlessIn("storage_index: %s" % si0_s, report)
1104         self.failUnlessIn("share_number: 0", report)
1105         self.failUnlessIn("This share smells funny.", report)
1106
1107         # test the RIBucketWriter version too
1108         si1_s = base32.b2a("si1")
1109         d = self.allocate(aa, "si1", [1], 75)
1110         def _allocated( (already, writers) ):
1111             self.failUnlessEqual(already, set())
1112             self.failUnlessEqual(set(writers.keys()), set([1]))
1113
1114             d2 = defer.succeed(None)
1115             d2.addCallback(lambda ign: writers[1].remote_write(0, "data"))
1116             d2.addCallback(lambda ign: writers[1].remote_close())
1117
1118             d2.addCallback(lambda ign: aa.remote_get_buckets("si1"))
1119             def _got_buckets(b):
1120                 self.failUnlessEqual(set(b.keys()), set([1]))
1121                 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
1122
1123                 reports = fileutil.listdir(reportdir)
1124                 self.failUnlessEqual(len(reports), 2)
1125                 report_si1 = [r for r in reports if si1_s in r][0]
1126                 report = fileutil.read(os.path.join(reportdir, report_si1))
1127
1128                 self.failUnlessIn("type: immutable", report)
1129                 self.failUnlessIn("storage_index: %s" % (si1_s,), report)
1130                 self.failUnlessIn("share_number: 1", report)
1131                 self.failUnlessIn("This share tastes like dust.", report)
1132             d2.addCallback(_got_buckets)
1133             return d2
1134         d.addCallback(_allocated)
1135         return d
1136
1137     def compare_leases(self, leases_a, leases_b, with_timestamps=True):
1138         self.failUnlessEqual(len(leases_a), len(leases_b))
1139         for i in range(len(leases_a)):
1140             a = leases_a[i]
1141             b = leases_b[i]
1142             self.failUnlessEqual(a.owner_num, b.owner_num)
1143             if with_timestamps:
1144                 self.failUnlessEqual(a.renewal_time, b.renewal_time)
1145                 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1146
1147     def OFF_test_immutable_leases(self):
1148         server = self.create("test_immutable_leases")
1149         aa = server.get_accountant().get_anonymous_account()
1150         sa = server.get_accountant().get_starter_account()
1151
1152         canary = FakeCanary()
1153         sharenums = range(5)
1154         size = 100
1155
1156         # create a random non-numeric file in the bucket directory, to
1157         # exercise the code that's supposed to ignore those.
1158         bucket_dir = os.path.join(self.workdir("test_leases"),
1159                                   "shares", storage_index_to_dir("six"))
1160         os.makedirs(bucket_dir)
1161         fileutil.write(os.path.join(bucket_dir, "ignore_me.txt"),
1162                        "you ought to be ignoring me\n")
1163
1164         already,writers = aa.remote_allocate_buckets("si1", "", "",
1165                                                      sharenums, size, canary)
1166         self.failUnlessEqual(len(already), 0)
1167         self.failUnlessEqual(len(writers), 5)
1168         for wb in writers.values():
1169             wb.remote_close()
1170
1171         leases = aa.get_leases("si1")
1172         self.failUnlessEqual(len(leases), 5)
1173
1174         aa.add_share("six", 0, 0, SHARETYPE_IMMUTABLE)
1175         # adding a share does not immediately add a lease
1176         self.failUnlessEqual(len(aa.get_leases("six")), 0)
1177
1178         aa.add_or_renew_default_lease("six", 0)
1179         self.failUnlessEqual(len(aa.get_leases("six")), 1)
1180
1181         # add-lease on a missing storage index is silently ignored
1182         self.failUnlessEqual(aa.remote_add_lease("si18", "", ""), None)
1183         self.failUnlessEqual(len(aa.get_leases("si18")), 0)
1184
1185         all_leases = aa.get_leases("si1")
1186
1187         # renew the lease directly
1188         aa.remote_renew_lease("si1", "")
1189         self.failUnlessEqual(len(aa.get_leases("si1")), 5)
1190         self.compare_leases(all_leases, aa.get_leases("si1"), with_timestamps=False)
1191
1192         # Now allocate more leases using a different account.
1193         # A new lease should be allocated for every share in the shareset.
1194         sa.remote_renew_lease("si1", "")
1195         self.failUnlessEqual(len(aa.get_leases("si1")), 5)
1196         self.failUnlessEqual(len(sa.get_leases("si1")), 5)
1197
1198         all_leases2 = sa.get_leases("si1")
1199
1200         sa.remote_renew_lease("si1", "")
1201         self.compare_leases(all_leases2, sa.get_leases("si1"), with_timestamps=False)
1202
1203
1204 class MutableServerMixin:
1205     def write_enabler(self, we_tag):
1206         return hashutil.tagged_hash("we_blah", we_tag)
1207
1208     def renew_secret(self, tag):
1209         return hashutil.tagged_hash("renew_blah", str(tag))
1210
1211     def cancel_secret(self, tag):
1212         return hashutil.tagged_hash("cancel_blah", str(tag))
1213
1214     def allocate(self, aa, storage_index, we_tag, sharenums, size):
1215         write_enabler = self.write_enabler(we_tag)
1216
1217         # These secrets are not used, but clients still provide them.
1218         lease_tag = "%d" % (self._lease_secret.next(),)
1219         renew_secret = self.renew_secret(lease_tag)
1220         cancel_secret = self.cancel_secret(lease_tag)
1221
1222         rstaraw = aa.remote_slot_testv_and_readv_and_writev
1223         testandwritev = dict( [ (shnum, ([], [], None) )
1224                                 for shnum in sharenums ] )
1225         readv = []
1226
1227         d = defer.succeed(None)
1228         d.addCallback(lambda ign: rstaraw(storage_index,
1229                                           (write_enabler, renew_secret, cancel_secret),
1230                                           testandwritev,
1231                                           readv))
1232         def _check( (did_write, readv_data) ):
1233             self.failUnless(did_write)
1234             self.failUnless(isinstance(readv_data, dict))
1235             self.failUnlessEqual(len(readv_data), 0)
1236         d.addCallback(_check)
1237         return d
1238
1239
1240 class MutableServerTest(MutableServerMixin, ShouldFailMixin):
1241     def test_create(self):
1242         server = self.create("test_create")
1243         aa = server.get_accountant().get_anonymous_account()
1244         self.failUnless(RIStorageServer.providedBy(aa), aa)
1245
1246     def test_bad_magic(self):
1247         server = self.create("test_bad_magic")
1248         aa = server.get_accountant().get_anonymous_account()
1249         read = aa.remote_slot_readv
1250
1251         d = self.allocate(aa, "si1", "we1", set([0,1]), 10)
1252         d.addCallback(lambda ign: server.backend.get_shareset("si1").get_share(0))
1253         def _got_share(share0):
1254             f = open(share0._get_path(), "rb+")
1255             try:
1256                 f.seek(0)
1257                 f.write("BAD MAGIC")
1258             finally:
1259                 f.close()
1260         d.addCallback(_got_share)
1261
1262         # This should ignore the corrupted share; see ticket #1566.
1263         d.addCallback(lambda ign: read("si1", [0,1], [(0,10)]) )
1264         d.addCallback(lambda res: self.failUnlessEqual(res, {1: ['']}))
1265
1266         # Also if there are only corrupted shares.
1267         d.addCallback(lambda ign: server.backend.get_shareset("si1").get_share(1))
1268         d.addCallback(lambda share: share.unlink())
1269         d.addCallback(lambda ign: read("si1", [0], [(0,10)]) )
1270         d.addCallback(lambda res: self.failUnlessEqual(res, {}))
1271         return d
1272
1273     def test_container_size(self):
1274         server = self.create("test_container_size")
1275         aa = server.get_accountant().get_anonymous_account()
1276         read = aa.remote_slot_readv
1277         rstaraw = aa.remote_slot_testv_and_readv_and_writev
1278         secrets = ( self.write_enabler("we1"),
1279                     self.renew_secret("we1"),
1280                     self.cancel_secret("we1") )
1281         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1282
1283         d = self.allocate(aa, "si1", "we1", set([0,1,2]), 100)
1284         d.addCallback(lambda ign: rstaraw("si1", secrets,
1285                                           {0: ([], [(0,data)], len(data)+12)},
1286                                           []))
1287         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[],1:[],2:[]}) ))
1288
1289         # Trying to make the container too large (by sending a write vector
1290         # whose offset is too high) will raise an exception.
1291         TOOBIG = MutableDiskShare.MAX_SIZE + 10
1292         d.addCallback(lambda ign: self.shouldFail(DataTooLargeError,
1293                                                   'make container too large', None,
1294                                                   lambda: rstaraw("si1", secrets,
1295                                                                   {0: ([], [(TOOBIG,data)], None)},
1296                                                                   []) ))
1297
1298         d.addCallback(lambda ign: rstaraw("si1", secrets,
1299                                           {0: ([], [(0,data)], None)},
1300                                           []))
1301         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[],1:[],2:[]}) ))
1302
1303         d.addCallback(lambda ign: read("si1", [0], [(0,10)]))
1304         d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data[:10]]}))
1305
1306         # Sending a new_length shorter than the current length truncates the
1307         # data.
1308         d.addCallback(lambda ign: rstaraw("si1", secrets,
1309                                           {0: ([], [], 9)},
1310                                           []))
1311         d.addCallback(lambda ign: read("si1", [0], [(0,10)]))
1312         d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data[:9]]}))
1313
1314         # Sending a new_length longer than the current length doesn't change
1315         # the data.
1316         d.addCallback(lambda ign: rstaraw("si1", secrets,
1317                                           {0: ([], [], 20)},
1318                                           []))
1319         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[],1:[],2:[]}) ))
1320         d.addCallback(lambda ign: read("si1", [0], [(0, 20)]))
1321         d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data[:9]]}))
1322
1323         # Sending a write vector whose start is after the end of the current
1324         # data doesn't reveal "whatever was there last time" (palimpsest),
1325         # but instead fills with zeroes.
1326
1327         # To test this, we fill the data area with a recognizable pattern.
1328         pattern = ''.join([chr(i) for i in range(100)])
1329         d.addCallback(lambda ign: rstaraw("si1", secrets,
1330                                           {0: ([], [(0, pattern)], None)},
1331                                           []))
1332         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[],1:[],2:[]}) ))
1333         # Then truncate the data...
1334         d.addCallback(lambda ign: rstaraw("si1", secrets,
1335                                           {0: ([], [], 20)},
1336                                           []))
1337         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[],1:[],2:[]}) ))
1338         # Just confirm that you get an empty string if you try to read from
1339         # past the (new) endpoint now.
1340         d.addCallback(lambda ign: rstaraw("si1", secrets,
1341                                           {0: ([], [], None)},
1342                                           [(20, 1980)]))
1343         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[''],1:[''],2:['']}) ))
1344
1345         # Then the extend the file by writing a vector which starts out past
1346         # the end...
1347         d.addCallback(lambda ign: rstaraw("si1", secrets,
1348                                           {0: ([], [(50, 'hellothere')], None)},
1349                                           []))
1350         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[],1:[],2:[]}) ))
1351         # Now if you read the stuff between 20 (where we earlier truncated)
1352         # and 50, it had better be all zeroes.
1353         d.addCallback(lambda ign: rstaraw("si1", secrets,
1354                                           {0: ([], [], None)},
1355                                           [(20, 30)]))
1356         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:['\x00'*30],1:[''],2:['']}) ))
1357
1358         # Also see if the server explicitly declares that it supports this
1359         # feature.
1360         d.addCallback(lambda ign: aa.remote_get_version())
1361         def _check_declaration(ver):
1362             storage_v1_ver = ver["http://allmydata.org/tahoe/protocols/storage/v1"]
1363             self.failUnless(storage_v1_ver.get("fills-holes-with-zero-bytes"))
1364         d.addCallback(_check_declaration)
1365
1366         # If the size is dropped to zero the share is deleted.
1367         d.addCallback(lambda ign: rstaraw("si1", secrets,
1368                                           {0: ([], [(0,data)], 0)},
1369                                           []))
1370         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[],1:[],2:[]}) ))
1371
1372         d.addCallback(lambda ign: read("si1", [0], [(0,10)]))
1373         d.addCallback(lambda res: self.failUnlessEqual(res, {}))
1374         return d
1375
1376     def test_allocate(self):
1377         server = self.create("test_allocate")
1378         aa = server.get_accountant().get_anonymous_account()
1379         read = aa.remote_slot_readv
1380         write = aa.remote_slot_testv_and_readv_and_writev
1381
1382         d = self.allocate(aa, "si1", "we1", set([0,1,2]), 100)
1383
1384         d.addCallback(lambda ign: read("si1", [0], [(0, 10)]))
1385         d.addCallback(lambda res: self.failUnlessEqual(res, {0: [""]}))
1386         d.addCallback(lambda ign: read("si1", [], [(0, 10)]))
1387         d.addCallback(lambda res: self.failUnlessEqual(res, {0: [""], 1: [""], 2: [""]}))
1388         d.addCallback(lambda ign: read("si1", [0], [(100, 10)]))
1389         d.addCallback(lambda res: self.failUnlessEqual(res, {0: [""]}))
1390
1391         # try writing to one
1392         secrets = ( self.write_enabler("we1"),
1393                     self.renew_secret("we1"),
1394                     self.cancel_secret("we1") )
1395         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1396
1397         d.addCallback(lambda ign: write("si1", secrets,
1398                                         {0: ([], [(0,data)], None)},
1399                                         []))
1400         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[],1:[],2:[]}) ))
1401
1402         d.addCallback(lambda ign: read("si1", [0], [(0,20)]))
1403         d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["00000000001111111111"]}))
1404         d.addCallback(lambda ign: read("si1", [0], [(95,10)]))
1405         d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["99999"]}))
1406         #d.addCallback(lambda ign: s0.remote_get_length())
1407         #d.addCallback(lambda res: self.failUnlessEqual(res, 100))
1408
1409         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
1410         d.addCallback(lambda ign: self.shouldFail(BadWriteEnablerError, 'bad write enabler',
1411                                                   "The write enabler was recorded by nodeid "
1412                                                   "'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.",
1413                                                   lambda: write("si1", bad_secrets, {}, []) ))
1414
1415         # this testv should fail
1416         d.addCallback(lambda ign: write("si1", secrets,
1417                                         {0: ([(0, 12, "eq", "444444444444"),
1418                                               (20, 5, "eq", "22222"),],
1419                                              [(0, "x"*100)],
1420                                              None)},
1421                                         [(0,12), (20,5)]))
1422         d.addCallback(lambda res: self.failUnlessEqual(res, (False,
1423                                                              {0: ["000000000011", "22222"],
1424                                                               1: ["", ""],
1425                                                               2: ["", ""]}) ))
1426         d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1427         d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1428
1429         # as should this one
1430         d.addCallback(lambda ign: write("si1", secrets,
1431                                         {0: ([(10, 5, "lt", "11111"),],
1432                                              [(0, "x"*100)],
1433                                              None)},
1434                                         [(10,5)]))
1435         d.addCallback(lambda res: self.failUnlessEqual(res, (False,
1436                                                              {0: ["11111"],
1437                                                               1: [""],
1438                                                               2: [""]}) ))
1439         d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1440         d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1441         return d
1442
1443     def test_operators(self):
1444         # test operators, the data we're comparing is '11111' in all cases.
1445         # test both fail+pass, reset data after each one.
1446         server = self.create("test_operators")
1447         aa = server.get_accountant().get_anonymous_account()
1448
1449         secrets = ( self.write_enabler("we1"),
1450                     self.renew_secret("we1"),
1451                     self.cancel_secret("we1") )
1452         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1453         write = aa.remote_slot_testv_and_readv_and_writev
1454         read = aa.remote_slot_readv
1455
1456         def _reset(ign):
1457             return write("si1", secrets,
1458                          {0: ([], [(0,data)], None)},
1459                          [])
1460
1461         d = defer.succeed(None)
1462         d.addCallback(_reset)
1463
1464         #  lt
1465         d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "lt", "11110"),],
1466                                                              [(0, "x"*100)],
1467                                                              None,
1468                                                             )}, [(10,5)]))
1469         d.addCallback(lambda res: self.failUnlessEqual(res, (False, {0: ["11111"]}) ))
1470         d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1471         d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1472         d.addCallback(lambda ign: read("si1", [], [(0,100)]))
1473         d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1474         d.addCallback(_reset)
1475
1476         d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "lt", "11111"),],
1477                                                              [(0, "x"*100)],
1478                                                              None,
1479                                                             )}, [(10,5)]))
1480         d.addCallback(lambda res: self.failUnlessEqual(res, (False, {0: ["11111"]}) ))
1481         d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1482         d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1483         d.addCallback(_reset)
1484
1485         d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "lt", "11112"),],
1486                                                              [(0, "y"*100)],
1487                                                              None,
1488                                                             )}, [(10,5)]))
1489         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0: ["11111"]}) ))
1490         d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1491         d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["y"*100]}))
1492         d.addCallback(_reset)
1493
1494         #  le
1495         d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "le", "11110"),],
1496                                                              [(0, "x"*100)],
1497                                                              None,
1498                                                             )}, [(10,5)]))
1499         d.addCallback(lambda res: self.failUnlessEqual(res, (False, {0: ["11111"]}) ))
1500         d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1501         d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1502         d.addCallback(_reset)
1503
1504         d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "le", "11111"),],
1505                                                              [(0, "y"*100)],
1506                                                              None,
1507                                                             )}, [(10,5)]))
1508         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0: ["11111"]}) ))
1509         d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1510         d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["y"*100]}))
1511         d.addCallback(_reset)
1512
1513         d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "le", "11112"),],
1514                                                              [(0, "y"*100)],
1515                                                              None,
1516                                                             )}, [(10,5)]))
1517         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0: ["11111"]}) ))
1518         d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1519         d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["y"*100]}))
1520         d.addCallback(_reset)
1521
1522         #  eq
1523         d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "eq", "11112"),],
1524                                                              [(0, "x"*100)],
1525                                                              None,
1526                                                             )}, [(10,5)]))
1527         d.addCallback(lambda res: self.failUnlessEqual(res, (False, {0: ["11111"]}) ))
1528         d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1529         d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1530         d.addCallback(_reset)
1531
1532         d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "eq", "11111"),],
1533                                                              [(0, "y"*100)],
1534                                                              None,
1535                                                             )}, [(10,5)]))
1536         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0: ["11111"]}) ))
1537         d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1538         d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["y"*100]}))
1539         d.addCallback(_reset)
1540
1541         #  ne
1542         d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "ne", "11111"),],
1543                                                              [(0, "x"*100)],
1544                                                              None,
1545                                                             )}, [(10,5)]))
1546         d.addCallback(lambda res: self.failUnlessEqual(res, (False, {0: ["11111"]}) ))
1547         d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1548         d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1549         d.addCallback(_reset)
1550
1551         d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "ne", "11112"),],
1552                                                               [(0, "y"*100)],
1553                                                              None,
1554                                                             )}, [(10,5)]))
1555         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0: ["11111"]}) ))
1556         d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1557         d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["y"*100]}))
1558         d.addCallback(_reset)
1559
1560         #  ge
1561         d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "ge", "11110"),],
1562                                                              [(0, "y"*100)],
1563                                                              None,
1564                                                             )}, [(10,5)]))
1565         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0: ["11111"]}) ))
1566         d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1567         d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["y"*100]}))
1568         d.addCallback(_reset)
1569
1570         d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "ge", "11111"),],
1571                                                              [(0, "y"*100)],
1572                                                              None,
1573                                                             )}, [(10,5)]))
1574         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0: ["11111"]}) ))
1575         d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1576         d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["y"*100]}))
1577         d.addCallback(_reset)
1578
1579         d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "ge", "11112"),],
1580                                                              [(0, "y"*100)],
1581                                                              None,
1582                                                             )}, [(10,5)]))
1583         d.addCallback(lambda res: self.failUnlessEqual(res, (False, {0: ["11111"]}) ))
1584         d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1585         d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1586         d.addCallback(_reset)
1587
1588         #  gt
1589         d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "gt", "11110"),],
1590                                                              [(0, "y"*100)],
1591                                                              None,
1592                                                             )}, [(10,5)]))
1593         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0: ["11111"]}) ))
1594         d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1595         d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["y"*100]}))
1596         d.addCallback(_reset)
1597
1598         d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "gt", "11111"),],
1599                                                              [(0, "x"*100)],
1600                                                              None,
1601                                                             )}, [(10,5)]))
1602         d.addCallback(lambda res: self.failUnlessEqual(res, (False, {0: ["11111"]}) ))
1603         d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1604         d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1605         d.addCallback(_reset)
1606
1607         d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "gt", "11112"),],
1608                                                              [(0, "x"*100)],
1609                                                              None,
1610                                                             )}, [(10,5)]))
1611         d.addCallback(lambda res: self.failUnlessEqual(res, (False, {0: ["11111"]}) ))
1612         d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1613         d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1614         d.addCallback(_reset)
1615
1616         # finally, test some operators against empty shares
1617         d.addCallback(lambda ign: write("si1", secrets, {1: ([(10, 5, "eq", "11112"),],
1618                                                              [(0, "x"*100)],
1619                                                              None,
1620                                                             )}, [(10,5)]))
1621         d.addCallback(lambda res: self.failUnlessEqual(res, (False, {0: ["11111"]}) ))
1622         d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1623         d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1624         d.addCallback(_reset)
1625         return d
1626
1627     def test_readv(self):
1628         server = self.create("test_readv")
1629         aa = server.get_accountant().get_anonymous_account()
1630
1631         secrets = ( self.write_enabler("we1"),
1632                     self.renew_secret("we1"),
1633                     self.cancel_secret("we1") )
1634         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1635         write = aa.remote_slot_testv_and_readv_and_writev
1636         read = aa.remote_slot_readv
1637         data = [("%d" % i) * 100 for i in range(3)]
1638
1639         d = defer.succeed(None)
1640         d.addCallback(lambda ign: write("si1", secrets,
1641                                         {0: ([], [(0,data[0])], None),
1642                                          1: ([], [(0,data[1])], None),
1643                                          2: ([], [(0,data[2])], None),
1644                                         }, []))
1645         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {}) ))
1646
1647         d.addCallback(lambda ign: read("si1", [], [(0, 10)]))
1648         d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["0"*10],
1649                                                              1: ["1"*10],
1650                                                              2: ["2"*10]}))
1651         return d
1652
1653     def test_writev(self):
1654         # This is run for both the disk and cloud backends, but it is particularly
1655         # designed to exercise the cloud backend's implementation of chunking for
1656         # mutable shares, assuming that PREFERRED_CHUNK_SIZE has been patched to 500.
1657         # Note that the header requires 472 bytes, so only the first 28 bytes of data are
1658         # in the first chunk.
1659
1660         server = self.create("test_writev")
1661         aa = server.get_accountant().get_anonymous_account()
1662         read = aa.remote_slot_readv
1663         rstaraw = aa.remote_slot_testv_and_readv_and_writev
1664         secrets = ( self.write_enabler("we1"),
1665                     self.renew_secret("we1"),
1666                     self.cancel_secret("we1") )
1667
1668         def _check(ign, writev, expected_data, expected_write_loads, expected_write_stores,
1669                    expected_read_loads, should_exist):
1670             d2 = rstaraw("si1", secrets, {0: writev}, [])
1671             if should_exist:
1672                 d2.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[]}) ))
1673             else:
1674                 d2.addCallback(lambda res: self.failUnlessEqual(res, (True, {}) ))
1675             d2.addCallback(lambda ign: self.check_load_store_counts(expected_write_loads,
1676                                                                     expected_write_stores))
1677             d2.addCallback(lambda ign: self.reset_load_store_counts())
1678
1679             d2.addCallback(lambda ign: read("si1", [0], [(0, len(expected_data) + 1)]))
1680             if expected_data == "":
1681                 d2.addCallback(lambda res: self.failUnlessEqual(res, {}))
1682             else:
1683                 d2.addCallback(lambda res: self.failUnlessEqual(res, {0: [expected_data]}))
1684             d2.addCallback(lambda ign: self.check_load_store_counts(expected_read_loads, 0))
1685             d2.addCallback(lambda ign: self.reset_load_store_counts())
1686             return d2
1687
1688         self.reset_load_store_counts()
1689         d = self.allocate(aa, "si1", "we1", set([0]), 2725)
1690         d.addCallback(_check, ([], [(0, "a"*10)], None),
1691                               "a"*10,
1692                               1, 2, 1, True)
1693         d.addCallback(_check, ([], [(20, "b"*18)], None),
1694                               "a"*10 + "\x00"*10 + "b"*18,
1695                               1, 2, 2, True)
1696         d.addCallback(_check, ([], [(1038, "c")], None),
1697                               "a"*10 + "\x00"*10 + "b"*18 + "\x00"*(490+500+10) + "c",
1698                               2, 4, 4, True)
1699         d.addCallback(_check, ([], [(0, "d"*1038)], None),
1700                               "d"*1038 + "c",
1701                               2, 4, 4, True)
1702         d.addCallback(_check, ([], [(2167, "a"*54)], None),
1703                               "d"*1038 + "c" + "\x00"*1128 + "a"*54,
1704                               2, 4, 6, True)
1705         # This pattern was observed from the MDMF publisher in v1.9.1.
1706         # Notice the duplicated write of length 41 bytes at offset 0.
1707         d.addCallback(_check, ([], [(2167, "e"*54), (123, "f"*347), (2221, "g"*32), (470, "h"*136),
1708                                     (0, "i"*41), (606, "j"*66), (672, "k"*93), (59, "l"*64),
1709                                     (41, "m"*18), (0, "i"*41)], None),
1710                               "i"*41 + "m"*18 + "l"*64 + "f"*347 + "h"*136 + "j"*66 + "k"*93 + "d"*273 + "c" + "\x00"*1128 +
1711                               "e"*54 + "g"*32,
1712                               4, 4, 6, True)
1713         # This should delete all chunks.
1714         d.addCallback(_check, ([], [], 0),
1715                               "",
1716                               1, 0, 0, True)
1717         d.addCallback(_check, ([], [(2167, "e"*54), (123, "f"*347), (2221, "g"*32), (470, "h"*136),
1718                                     (0, "i"*41), (606, "j"*66), (672, "k"*93), (59, "l"*64),
1719                                     (41, "m"*18), (0, "i"*41)], None),
1720                               "i"*41 + "m"*18 + "l"*64 + "f"*347 + "h"*136 + "j"*66 + "k"*93 + "\x00"*1402 +
1721                               "e"*54 + "g"*32,
1722                               0, 7, 6, False)
1723         return d
1724
1725     def test_remove(self):
1726         server = self.create("test_remove")
1727         aa = server.get_accountant().get_anonymous_account()
1728         readv = aa.remote_slot_readv
1729         writev = aa.remote_slot_testv_and_readv_and_writev
1730         secrets = ( self.write_enabler("we1"),
1731                     self.renew_secret("we1"),
1732                     self.cancel_secret("we1") )
1733
1734         d = defer.succeed(None)
1735         d.addCallback(lambda ign: self.allocate(aa, "si1", "we1", set([0,1,2]), 100))
1736         # delete sh0 by setting its size to zero
1737         d.addCallback(lambda ign: writev("si1", secrets,
1738                                          {0: ([], [], 0)},
1739                                          []))
1740         # the answer should mention all the shares that existed before the
1741         # write
1742         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[],1:[],2:[]}) ))
1743         # but a new read should show only sh1 and sh2
1744         d.addCallback(lambda ign: readv("si1", [], [(0,10)]))
1745         d.addCallback(lambda res: self.failUnlessEqual(res, {1: [""], 2: [""]}))
1746
1747         # delete sh1 by setting its size to zero
1748         d.addCallback(lambda ign: writev("si1", secrets,
1749                                          {1: ([], [], 0)},
1750                                          []))
1751         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {1:[],2:[]}) ))
1752         d.addCallback(lambda ign: readv("si1", [], [(0,10)]))
1753         d.addCallback(lambda res: self.failUnlessEqual(res, {2: [""]}))
1754
1755         # delete sh2 by setting its size to zero
1756         d.addCallback(lambda ign: writev("si1", secrets,
1757                                          {2: ([], [], 0)},
1758                                          []))
1759         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {2:[]}) ))
1760         d.addCallback(lambda ign: readv("si1", [], [(0,10)]))
1761         d.addCallback(lambda res: self.failUnlessEqual(res, {}))
1762
1763         d.addCallback(lambda ign: server.backend.get_shareset("si1").get_overhead())
1764         d.addCallback(lambda overhead: self.failUnlessEqual(overhead, 0))
1765
1766         # and the shareset directory should now be gone. This check is only
1767         # applicable to the disk backend.
1768         def _check_gone(ign):
1769             si = base32.b2a("si1")
1770             # note: this is a detail of the disk backend, and may change in the future
1771             prefix = si[:2]
1772             prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1773             sidir = os.path.join(prefixdir, si)
1774             self.failUnless(os.path.exists(prefixdir), prefixdir)
1775             self.failIf(os.path.exists(sidir), sidir)
1776
1777         if isinstance(server.backend, DiskBackend):
1778             d.addCallback(_check_gone)
1779         return d
1780
1781     def compare_leases(self, leases_a, leases_b, with_timestamps=True):
1782         self.failUnlessEqual(len(leases_a), len(leases_b))
1783         for i in range(len(leases_a)):
1784             a = leases_a[i]
1785             b = leases_b[i]
1786             self.failUnlessEqual(a.owner_num, b.owner_num)
1787             if with_timestamps:
1788                 self.failUnlessEqual(a.renewal_time, b.renewal_time)
1789                 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1790
1791     def test_mutable_leases(self):
1792         server = self.create("test_mutable_leases")
1793         aa = server.get_accountant().get_anonymous_account()
1794         sa = server.get_accountant().get_starter_account()
1795
1796         def secrets(n):
1797             return ( self.write_enabler("we1"),
1798                      self.renew_secret("we1-%d" % n),
1799                      self.cancel_secret("we1-%d" % n) )
1800         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1801         aa_write = aa.remote_slot_testv_and_readv_and_writev
1802         sa_write = sa.remote_slot_testv_and_readv_and_writev
1803         read = aa.remote_slot_readv
1804
1805         # There is no such method as remote_cancel_lease -- see ticket #1528.
1806         self.failIf(hasattr(aa, 'remote_cancel_lease'),
1807                     "aa should not have a 'remote_cancel_lease' method/attribute")
1808
1809         # create a random non-numeric file in the bucket directory, to
1810         # exercise the code that's supposed to ignore those.
1811         bucket_dir = os.path.join(self.workdir("test_leases"),
1812                                   "shares", storage_index_to_dir("six"))
1813         os.makedirs(bucket_dir)
1814         fileutil.write(os.path.join(bucket_dir, "ignore_me.txt"),
1815                        "you ought to be ignoring me\n")
1816
1817         create_mutable_disk_share(os.path.join(bucket_dir, "0"), server.get_serverid(),
1818                                   secrets(0)[0], storage_index="six", shnum=0)
1819
1820         aa.add_share("six", 0, 0, SHARETYPE_MUTABLE)
1821         # adding a share does not immediately add a lease
1822         self.failUnlessEqual(len(aa.get_leases("six")), 0)
1823
1824         aa.add_or_renew_default_lease("six", 0)
1825         self.failUnlessEqual(len(aa.get_leases("six")), 1)
1826
1827         d = defer.succeed(None)
1828
1829         d.addCallback(lambda ign: aa_write("si0", secrets(1), {0: ([], [(0,data)], None)}, []))
1830         d.addCallback(lambda res: self.failUnlessEqual(res, (True, {})))
1831
1832         # add-lease on a missing storage index is silently ignored
1833         d.addCallback(lambda ign: aa.remote_add_lease("si18", "", ""))
1834         d.addCallback(lambda res: self.failUnless(res is None, res))
1835         d.addCallback(lambda ign: self.failUnlessEqual(len(aa.get_leases("si18")), 0))
1836
1837         # create a lease by writing
1838         d.addCallback(lambda ign: aa_write("si1", secrets(2), {0: ([], [(0,data)], None)}, []))
1839         d.addCallback(lambda ign: self.failUnlessEqual(len(aa.get_leases("si1")), 1))
1840
1841         # renew it directly
1842         d.addCallback(lambda ign: aa.remote_renew_lease("si1", secrets(2)[1]))
1843         d.addCallback(lambda ign: self.failUnlessEqual(len(aa.get_leases("si1")), 1))
1844
1845         # now allocate another lease using a different account
1846         d.addCallback(lambda ign: sa_write("si1", secrets(3), {0: ([], [(0,data)], None)}, []))
1847         def _check(ign):
1848             aa_leases = aa.get_leases("si1")
1849             sa_leases = sa.get_leases("si1")
1850
1851             self.failUnlessEqual(len(aa_leases), 1)
1852             self.failUnlessEqual(len(sa_leases), 1)
1853
1854             d2 = defer.succeed(None)
1855             d2.addCallback(lambda ign: aa.remote_renew_lease("si1", secrets(2)[1]))
1856             d2.addCallback(lambda ign: self.compare_leases(aa_leases, aa.get_leases("si1"),
1857                                                            with_timestamps=False))
1858
1859             d2.addCallback(lambda ign: sa.remote_renew_lease("si1", "shouldn't matter"))
1860             d2.addCallback(lambda ign: self.compare_leases(sa_leases, sa.get_leases("si1"),
1861                                                            with_timestamps=False))
1862
1863             # Get a new copy of the leases, with the current timestamps. Reading
1864             # data should leave the timestamps alone.
1865             d2.addCallback(lambda ign: aa.get_leases("si1"))
1866             def _check2(new_aa_leases):
1867                 # reading shares should not modify the timestamp
1868                 d3 = read("si1", [], [(0, 200)])
1869                 d3.addCallback(lambda ign: self.compare_leases(new_aa_leases, aa.get_leases("si1"),
1870                                                                with_timestamps=False))
1871
1872                 d3.addCallback(lambda ign: aa_write("si1", secrets(2),
1873                       {0: ([], [(500, "make me bigger")], None)}, []))
1874                 d3.addCallback(lambda ign: self.compare_leases(new_aa_leases, aa.get_leases("si1"),
1875                                                                with_timestamps=False))
1876                 return d3
1877             d2.addCallback(_check2)
1878             return d2
1879         d.addCallback(_check)
1880         return d
1881
1882
1883 class ServerWithNullBackend(ServiceParentMixin, WorkdirMixin, ServerMixin, unittest.TestCase):
1884     def test_null_backend(self):
1885         workdir = self.workdir("test_null_backend")
1886         backend = NullBackend()
1887         server = StorageServer("\x00" * 20, backend, workdir)
1888         server.setServiceParent(self.sparent)
1889         aa = server.get_accountant().get_anonymous_account()
1890
1891         d = self.allocate(aa, "vid", [0,1,2], 75)
1892         def _allocated( (already, writers) ):
1893             self.failUnlessEqual(already, set())
1894             self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
1895
1896             d2 = for_items(self._write_and_close, writers)
1897
1898             # The shares should be present but have no data.
1899             d2.addCallback(lambda ign: aa.remote_get_buckets("vid"))
1900             def _check(buckets):
1901                 self.failUnlessEqual(set(buckets.keys()), set([0,1,2]))
1902                 d3 = defer.succeed(None)
1903                 d3.addCallback(lambda ign: buckets[0].remote_read(0, 25))
1904                 d3.addCallback(lambda res: self.failUnlessEqual(res, ""))
1905                 return d3
1906             d2.addCallback(_check)
1907             return d2
1908         d.addCallback(_allocated)
1909         return d
1910
1911
1912 class WithMockCloudBackend(ServiceParentMixin, WorkdirMixin):
1913     def create(self, name, detached=False, readonly=False, reserved_space=0, klass=StorageServer):
1914         assert not readonly
1915         workdir = self.workdir(name)
1916         self._container = MockContainer(workdir)
1917         backend = CloudBackend(self._container)
1918         server = klass("\x00" * 20, backend, workdir,
1919                        stats_provider=FakeStatsProvider())
1920         if not detached:
1921             server.setServiceParent(self.sparent)
1922         return server
1923
1924     def reset_load_store_counts(self):
1925         self._container.reset_load_store_counts()
1926
1927     def check_load_store_counts(self, expected_load_count, expected_store_count):
1928         self.failUnlessEqual((self._container.get_load_count(), self._container.get_store_count()),
1929                              (expected_load_count, expected_store_count))
1930
1931
1932 class WithDiskBackend(ServiceParentMixin, WorkdirMixin):
1933     def create(self, name, detached=False, readonly=False, reserved_space=0, klass=StorageServer):
1934         workdir = self.workdir(name)
1935         backend = DiskBackend(workdir, readonly=readonly, reserved_space=reserved_space)
1936         server = klass("\x00" * 20, backend, workdir,
1937                        stats_provider=FakeStatsProvider())
1938         if not detached:
1939             server.setServiceParent(self.sparent)
1940         return server
1941
1942     def reset_load_store_counts(self):
1943         pass
1944
1945     def check_load_store_counts(self, expected_loads, expected_stores):
1946         pass
1947
1948
1949 class ServerWithMockCloudBackend(WithMockCloudBackend, ServerTest, unittest.TestCase):
1950     def setUp(self):
1951         ServiceParentMixin.setUp(self)
1952
1953         # A smaller chunk size causes the tests to exercise more cases in the chunking implementation.
1954         self.patch(cloud_common, 'PREFERRED_CHUNK_SIZE', 500)
1955
1956         # This causes ContainerListMixin to be exercised.
1957         self.patch(mock_cloud, 'MAX_KEYS', 2)
1958
1959     def test_bad_container_version(self):
1960         return ServerTest.test_bad_container_version(self)
1961     test_bad_container_version.todo = "The cloud backend hasn't been modified to fix ticket #1566."
1962
1963
1964     def _describe_level(self, level):
1965         return getattr(LogEvent, 'LEVELMAP', {}).get(level, str(level))
1966
1967     def _test_cloud_retry(self, name, failure_count, levels):
1968         self.patch(cloud_common, 'BACKOFF_SECONDS_BEFORE_RETRY', (0, 0.1, 0.2))
1969
1970         t = {'count': 0}
1971         old_put_object = MockContainer._put_object
1972         def call_put_object(self, ign, object_name, data, content_type=None, metadata={}):
1973             t['count'] += 1
1974             if t['count'] <= failure_count:
1975                 return defer.fail(CloudServiceError("XML", 500, "Internal error", "response"))
1976             else:
1977                 return old_put_object(self, ign, object_name, data, content_type=content_type, metadata=metadata)
1978         self.patch(MockContainer, '_put_object', call_put_object)
1979
1980         def call_log_msg(*args, **kwargs):
1981             # the log message and parameters should not include the data
1982             self.failIfIn("%25d" % (0,), repr( (args, kwargs) ))
1983             level = kwargs.get("level", OPERATIONAL)
1984             if level > OPERATIONAL:
1985                 levels.append(level)
1986         self.patch(cloud_common.log, 'msg', call_log_msg)
1987
1988         server = self.create(name)
1989         aa = server.get_accountant().get_anonymous_account()
1990
1991         d = self.allocate(aa, "vid", [0], 75)
1992         d.addCallback(lambda (already, writers): for_items(self._write_and_close, writers))
1993         return d
1994
1995     def test_cloud_retry_fail(self):
1996         levels = [] # list of logging levels above OPERATIONAL for calls to log.msg
1997         d = self._test_cloud_retry("test_cloud_retry_fail", 4, levels)
1998         # shouldFail would check repr(res.value.args[0]) which is not what we want
1999         def done(res):
2000             if isinstance(res, Failure):
2001                 res.trap(cloud_common.CloudError)
2002                 self.failUnlessIn(", 500, 'Internal error', 'response')", str(res.value))
2003                 # the stringified exception should not include the data
2004                 self.failIfIn("%25d" % (0,), str(res.value))
2005                 desc = ", ".join(map(self._describe_level, levels))
2006                 self.failUnlessEqual(levels, [INFREQUENT]*4 + [WEIRD], desc)
2007             else:
2008                 self.fail("was supposed to raise CloudError, not get %r" % (res,))
2009         d.addBoth(done)
2010         return d
2011
2012     def test_cloud_retry_succeed(self):
2013         levels = [] # list of logging levels above OPERATIONAL for calls to log.msg
2014         d = self._test_cloud_retry("test_cloud_retry_succeed", 3, levels)
2015         def done(res):
2016             desc = ", ".join(map(self._describe_level, levels))
2017             self.failUnlessEqual(levels, [INFREQUENT]*3 + [WEIRD], desc)
2018         d.addCallback(done)
2019         return d
2020
2021
2022 class ServerWithDiskBackend(WithDiskBackend, ServerTest, unittest.TestCase):
2023     # The following tests are for behaviour that is only supported by a disk backend.
2024
2025     def test_readonly(self):
2026         server = self.create("test_readonly", readonly=True)
2027         aa = server.get_accountant().get_anonymous_account()
2028
2029         d = self.allocate(aa, "vid", [0,1,2], 75)
2030         def _allocated( (already, writers) ):
2031             self.failUnlessEqual(already, set())
2032             self.failUnlessEqual(writers, {})
2033
2034             stats = server.get_stats()
2035             self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
2036             if "storage_server.disk_avail" in stats:
2037                 # Some platforms may not have an API to get disk stats.
2038                 # But if there are stats, readonly_storage means disk_avail=0
2039                 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
2040         d.addCallback(_allocated)
2041         return d
2042
2043     def test_large_share(self):
2044         syslow = platform.system().lower()
2045         if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
2046             raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
2047
2048         avail = fileutil.get_available_space('.', 512*2**20)
2049         if avail <= 4*2**30:
2050             raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
2051
2052         server = self.create("test_large_share")
2053         aa = server.get_accountant().get_anonymous_account()
2054
2055         d = self.allocate(aa, "allocate", [0], 2**32+2)
2056         def _allocated( (already, writers) ):
2057             self.failUnlessEqual(already, set())
2058             self.failUnlessEqual(set(writers.keys()), set([0]))
2059
2060             shnum, bucket = writers.items()[0]
2061
2062             # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
2063             d2 = defer.succeed(None)
2064             d2.addCallback(lambda ign: bucket.remote_write(2**32, "ab"))
2065             d2.addCallback(lambda ign: bucket.remote_close())
2066
2067             d2.addCallback(lambda ign: aa.remote_get_buckets("allocate"))
2068             d2.addCallback(lambda readers: readers[shnum].remote_read(2**32, 2))
2069             d2.addCallback(lambda res: self.failUnlessEqual(res, "ab"))
2070             return d2
2071         d.addCallback(_allocated)
2072         return d
2073
2074     def test_remove_incoming(self):
2075         server = self.create("test_remove_incoming")
2076         aa = server.get_accountant().get_anonymous_account()
2077
2078         d = self.allocate(aa, "vid", range(3), 25)
2079         def _write_and_check( (already, writers) ):
2080             d2 = defer.succeed(None)
2081             for i, bw in sorted(writers.items()):
2082                 incoming_share_home = bw._share._get_path()
2083                 d2.addCallback(self._write_and_close, i, bw)
2084
2085             def _check(ign):
2086                 incoming_si_dir = os.path.dirname(incoming_share_home)
2087                 incoming_prefix_dir = os.path.dirname(incoming_si_dir)
2088                 incoming_dir = os.path.dirname(incoming_prefix_dir)
2089
2090                 self.failIf(os.path.exists(incoming_si_dir), incoming_si_dir)
2091                 self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
2092                 self.failUnless(os.path.exists(incoming_dir), incoming_dir)
2093             d2.addCallback(_check)
2094             return d2
2095         d.addCallback(_write_and_check)
2096         return d
2097
2098     def test_abort(self):
2099         # remote_abort, when called on a writer, should make sure that
2100         # the allocated size of the bucket is not counted by the storage
2101         # server when accounting for space.
2102         server = self.create("test_abort")
2103         aa = server.get_accountant().get_anonymous_account()
2104
2105         d = self.allocate(aa, "allocate", [0, 1, 2], 150)
2106         def _allocated( (already, writers) ):
2107             self.failIfEqual(server.allocated_size(), 0)
2108
2109             # Now abort the writers.
2110             d2 = for_items(self._abort_writer, writers)
2111             d2.addCallback(lambda ign: self.failUnlessEqual(server.allocated_size(), 0))
2112             return d2
2113         d.addCallback(_allocated)
2114         return d
2115
2116     def test_disconnect(self):
2117         # simulate a disconnection
2118         server = self.create("test_disconnect")
2119         aa = server.get_accountant().get_anonymous_account()
2120         canary = FakeCanary()
2121
2122         d = self.allocate(aa, "disconnect", [0,1,2], 75, canary)
2123         def _allocated( (already, writers) ):
2124             self.failUnlessEqual(already, set())
2125             self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
2126             for (f,args,kwargs) in canary.disconnectors.values():
2127                 f(*args, **kwargs)
2128         d.addCallback(_allocated)
2129
2130         # returning from _allocated ought to delete the incoming shares
2131         d.addCallback(lambda ign: self.allocate(aa, "disconnect", [0,1,2], 75))
2132         def _allocated2( (already, writers) ):
2133             self.failUnlessEqual(already, set())
2134             self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
2135         d.addCallback(_allocated2)
2136         return d
2137
2138     @mock.patch('allmydata.util.fileutil.get_disk_stats')
2139     def test_reserved_space(self, mock_get_disk_stats):
2140         reserved_space=10000
2141         mock_get_disk_stats.return_value = {
2142             'free_for_nonroot': 15000,
2143             'avail': max(15000 - reserved_space, 0),
2144             }
2145
2146         server = self.create("test_reserved_space", reserved_space=reserved_space)
2147         aa = server.get_accountant().get_anonymous_account()
2148
2149         # 15k available, 10k reserved, leaves 5k for shares
2150
2151         # a newly created and filled share incurs this much overhead, beyond
2152         # the size we request.
2153         OVERHEAD = 3*4
2154         LEASE_SIZE = 4+32+32+4
2155         canary = FakeCanary(True)
2156
2157         d = self.allocate(aa, "vid1", [0,1,2], 1000, canary)
2158         def _allocated( (already, writers) ):
2159             self.failUnlessEqual(len(writers), 3)
2160             # now the StorageServer should have 3000 bytes provisionally
2161             # allocated, allowing only 2000 more to be claimed
2162             self.failUnlessEqual(len(server._active_writers), 3)
2163             self.writers = writers
2164             del already
2165
2166             # allocating 1001-byte shares only leaves room for one
2167             d2 = self.allocate(aa, "vid2", [0,1,2], 1001, canary)
2168             def _allocated2( (already2, writers2) ):
2169                 self.failUnlessEqual(len(writers2), 1)
2170                 self.failUnlessEqual(len(server._active_writers), 4)
2171
2172                 # we abandon the first set, so their provisional allocation should be
2173                 # returned
2174                 d3 = for_items(self._abort_writer, self.writers)
2175                 #def _del_writers(ign):
2176                 #    del self.writers
2177                 #d3.addCallback(_del_writers)
2178                 d3.addCallback(lambda ign: self.failUnlessEqual(len(server._active_writers), 1))
2179
2180                 # and we close the second set, so their provisional allocation should
2181                 # become real, long-term allocation, and grows to include the
2182                 # overhead.
2183                 d3.addCallback(lambda ign: for_items(self._write_and_close, writers2))
2184                 d3.addCallback(lambda ign: self.failUnlessEqual(len(server._active_writers), 0))
2185                 return d3
2186             d2.addCallback(_allocated2)
2187
2188             allocated = 1001 + OVERHEAD + LEASE_SIZE
2189
2190             # we have to manually increase available, since we're not doing real
2191             # disk measurements
2192             def _mock(ign):
2193                 mock_get_disk_stats.return_value = {
2194                     'free_for_nonroot': 15000 - allocated,
2195                     'avail': max(15000 - allocated - reserved_space, 0),
2196                     }
2197             d2.addCallback(_mock)
2198
2199             # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
2200             # 5000-1085=3915 free, therefore we can fit 39 100byte shares
2201             d2.addCallback(lambda ign: self.allocate(aa, "vid3", range(100), 100, canary))
2202             def _allocated3( (already3, writers3) ):
2203                 self.failUnlessEqual(len(writers3), 39)
2204                 self.failUnlessEqual(len(server._active_writers), 39)
2205
2206                 d3 = for_items(self._abort_writer, writers3)
2207                 d3.addCallback(lambda ign: self.failUnlessEqual(len(server._active_writers), 0))
2208                 d3.addCallback(lambda ign: server.disownServiceParent())
2209                 return d3
2210             d2.addCallback(_allocated3)
2211         d.addCallback(_allocated)
2212         return d
2213
2214     def OFF_test_immutable_leases(self):
2215         server = self.create("test_immutable_leases")
2216         aa = server.get_accountant().get_anonymous_account()
2217         canary = FakeCanary()
2218         sharenums = range(5)
2219         size = 100
2220
2221         rs = []
2222         cs = []
2223         for i in range(6):
2224             rs.append(hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
2225             cs.append(hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
2226
2227         d = aa.remote_allocate_buckets("si0", rs[0], cs[0],
2228                                        sharenums, size, canary)
2229         def _allocated( (already, writers) ):
2230             self.failUnlessEqual(len(already), 0)
2231             self.failUnlessEqual(len(writers), 5)
2232
2233             d2 = for_items(self._close_writer, writers)
2234
2235             d2.addCallback(lambda ign: list(aa.get_leases("si0")))
2236             d2.addCallback(lambda leases: self.failUnlessEqual(len(leases), 1))
2237
2238             d2.addCallback(lambda ign: aa.remote_allocate_buckets("si1", rs[1], cs[1],
2239                                                                   sharenums, size, canary))
2240             return d2
2241         d.addCallback(_allocated)
2242
2243         def _allocated2( (already, writers) ):
2244             d2 = for_items(self._close_writer, writers)
2245
2246             # take out a second lease on si1
2247             d2.addCallback(lambda ign: aa.remote_allocate_buckets("si1", rs[2], cs[2],
2248                                                                   sharenums, size, canary))
2249             return d2
2250         d.addCallback(_allocated2)
2251
2252         def _allocated2a( (already, writers) ):
2253             self.failUnlessEqual(len(already), 5)
2254             self.failUnlessEqual(len(writers), 0)
2255
2256             d2 = defer.succeed(None)
2257             d2.addCallback(lambda ign: list(aa.get_leases("si1")))
2258             d2.addCallback(lambda leases: self.failUnlessEqual(len(leases), 2))
2259
2260             # and a third lease, using add-lease
2261             d2.addCallback(lambda ign: aa.remote_add_lease("si1", rs[3], cs[3]))
2262
2263             d2.addCallback(lambda ign: list(aa.get_leases("si1")))
2264             d2.addCallback(lambda leases: self.failUnlessEqual(len(leases), 3))
2265
2266             # add-lease on a missing storage index is silently ignored
2267             d2.addCallback(lambda ign: aa.remote_add_lease("si18", "", ""))
2268             d2.addCallback(lambda res: self.failUnlessEqual(res, None))
2269
2270             # check that si0 is readable
2271             d2.addCallback(lambda ign: aa.remote_get_buckets("si0"))
2272             d2.addCallback(lambda readers: self.failUnlessEqual(len(readers), 5))
2273
2274             # renew the first lease. Only the proper renew_secret should work
2275             d2.addCallback(lambda ign: aa.remote_renew_lease("si0", rs[0]))
2276             d2.addCallback(lambda ign: self.shouldFail(IndexError, 'wrong secret 1', None,
2277                                                        lambda: aa.remote_renew_lease("si0", cs[0]) ))
2278             d2.addCallback(lambda ign: self.shouldFail(IndexError, 'wrong secret 2', None,
2279                                                        lambda: aa.remote_renew_lease("si0", rs[1]) ))
2280
2281             # check that si0 is still readable
2282             d2.addCallback(lambda ign: aa.remote_get_buckets("si0"))
2283             d2.addCallback(lambda readers: self.failUnlessEqual(len(readers), 5))
2284
2285             # There is no such method as remote_cancel_lease for now -- see
2286             # ticket #1528.
2287             d2.addCallback(lambda ign: self.failIf(hasattr(aa, 'remote_cancel_lease'),
2288                                                    "aa should not have a 'remote_cancel_lease' method/attribute"))
2289
2290             # test overlapping uploads
2291             d2.addCallback(lambda ign: aa.remote_allocate_buckets("si3", rs[4], cs[4],
2292                                                                   sharenums, size, canary))
2293             return d2
2294         d.addCallback(_allocated2a)
2295
2296         def _allocated4( (already, writers) ):
2297             self.failUnlessEqual(len(already), 0)
2298             self.failUnlessEqual(len(writers), 5)
2299
2300             d2 = defer.succeed(None)
2301             d2.addCallback(lambda ign: aa.remote_allocate_buckets("si3", rs[5], cs[5],
2302                                                                   sharenums, size, canary))
2303             def _allocated5( (already2, writers2) ):
2304                 self.failUnlessEqual(len(already2), 0)
2305                 self.failUnlessEqual(len(writers2), 0)
2306
2307                 d3 = for_items(self._close_writer, writers)
2308
2309                 d3.addCallback(lambda ign: list(aa.get_leases("si3")))
2310                 d3.addCallback(lambda leases: self.failUnlessEqual(len(leases), 1))
2311
2312                 d3.addCallback(lambda ign: aa.remote_allocate_buckets("si3", rs[5], cs[5],
2313                                                                       sharenums, size, canary))
2314                 return d3
2315             d2.addCallback(_allocated5)
2316
2317             def _allocated6( (already3, writers3) ):
2318                 self.failUnlessEqual(len(already3), 5)
2319                 self.failUnlessEqual(len(writers3), 0)
2320
2321                 d3 = defer.succeed(None)
2322                 d3.addCallback(lambda ign: list(aa.get_leases("si3")))
2323                 d3.addCallback(lambda leases: self.failUnlessEqual(len(leases), 2))
2324                 return d3
2325             d2.addCallback(_allocated6)
2326             return d2
2327         d.addCallback(_allocated4)
2328         return d
2329
2330
2331 class MutableServerWithMockCloudBackend(WithMockCloudBackend, MutableServerTest, unittest.TestCase):
2332     def setUp(self):
2333         ServiceParentMixin.setUp(self)
2334
2335         # A smaller chunk size causes the tests to exercise more cases in the chunking implementation.
2336         self.patch(cloud_common, 'PREFERRED_CHUNK_SIZE', 500)
2337
2338         # This causes ContainerListMixin to be exercised.
2339         self.patch(mock_cloud, 'MAX_KEYS', 2)
2340
2341     def test_bad_magic(self):
2342         return MutableServerTest.test_bad_magic(self)
2343     test_bad_magic.todo = "The cloud backend hasn't been modified to fix ticket #1566."
2344
2345
2346 class MutableServerWithDiskBackend(WithDiskBackend, MutableServerTest, unittest.TestCase):
2347     # There are no mutable tests specific to a disk backend.
2348     pass
2349
2350
2351 class MDMFProxies(WithDiskBackend, ShouldFailMixin, unittest.TestCase):
2352     def init(self, name):
2353         self._lease_secret = itertools.count()
2354         self.server = self.create(name)
2355         self.aa = self.server.get_accountant().get_anonymous_account()
2356         self.rref = RemoteBucket()
2357         self.rref.target = self.aa
2358         self.secrets = (self.write_enabler("we_secret"),
2359                         self.renew_secret("renew_secret"),
2360                         self.cancel_secret("cancel_secret"))
2361         self.segment = "aaaaaa"
2362         self.block = "aa"
2363         self.salt = "a" * 16
2364         self.block_hash = "a" * 32
2365         self.block_hash_tree = [self.block_hash for i in xrange(6)]
2366         self.share_hash = self.block_hash
2367         self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
2368         self.signature = "foobarbaz"
2369         self.verification_key = "vvvvvv"
2370         self.encprivkey = "private"
2371         self.root_hash = self.block_hash
2372         self.salt_hash = self.root_hash
2373         self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
2374         self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
2375         self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
2376         # blockhashes and salt hashes are serialized in the same way,
2377         # only we lop off the first element and store that in the
2378         # header.
2379         self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
2380
2381     def write_enabler(self, we_tag):
2382         return hashutil.tagged_hash("we_blah", we_tag)
2383
2384     def renew_secret(self, tag):
2385         return hashutil.tagged_hash("renew_blah", str(tag))
2386
2387     def cancel_secret(self, tag):
2388         return hashutil.tagged_hash("cancel_blah", str(tag))
2389
2390     def build_test_mdmf_share(self, tail_segment=False, empty=False):
2391         # Start with the checkstring
2392         data = struct.pack(">BQ32s",
2393                            1,
2394                            0,
2395                            self.root_hash)
2396         self.checkstring = data
2397         # Next, the encoding parameters
2398         if tail_segment:
2399             data += struct.pack(">BBQQ",
2400                                 3,
2401                                 10,
2402                                 6,
2403                                 33)
2404         elif empty:
2405             data += struct.pack(">BBQQ",
2406                                 3,
2407                                 10,
2408                                 0,
2409                                 0)
2410         else:
2411             data += struct.pack(">BBQQ",
2412                                 3,
2413                                 10,
2414                                 6,
2415                                 36)
2416         # Now we'll build the offsets.
2417         sharedata = ""
2418         if not tail_segment and not empty:
2419             for i in xrange(6):
2420                 sharedata += self.salt + self.block
2421         elif tail_segment:
2422             for i in xrange(5):
2423                 sharedata += self.salt + self.block
2424             sharedata += self.salt + "a"
2425
2426         # The encrypted private key comes after the shares + salts
2427         offset_size = struct.calcsize(MDMFOFFSETS)
2428         encrypted_private_key_offset = len(data) + offset_size
2429         # The share has chain comes after the private key
2430         sharehashes_offset = encrypted_private_key_offset + \
2431             len(self.encprivkey)
2432
2433         # The signature comes after the share hash chain.
2434         signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
2435
2436         verification_key_offset = signature_offset + len(self.signature)
2437         verification_key_end = verification_key_offset + \
2438             len(self.verification_key)
2439
2440         share_data_offset = offset_size
2441         share_data_offset += PRIVATE_KEY_SIZE
2442         share_data_offset += SIGNATURE_SIZE
2443         share_data_offset += VERIFICATION_KEY_SIZE
2444         share_data_offset += SHARE_HASH_CHAIN_SIZE
2445
2446         blockhashes_offset = share_data_offset + len(sharedata)
2447         eof_offset = blockhashes_offset + len(self.block_hash_tree_s)
2448
2449         data += struct.pack(MDMFOFFSETS,
2450                             encrypted_private_key_offset,
2451                             sharehashes_offset,
2452                             signature_offset,
2453                             verification_key_offset,
2454                             verification_key_end,
2455                             share_data_offset,
2456                             blockhashes_offset,
2457                             eof_offset)
2458
2459         self.offsets = {}
2460         self.offsets['enc_privkey'] = encrypted_private_key_offset
2461         self.offsets['block_hash_tree'] = blockhashes_offset
2462         self.offsets['share_hash_chain'] = sharehashes_offset
2463         self.offsets['signature'] = signature_offset
2464         self.offsets['verification_key'] = verification_key_offset
2465         self.offsets['share_data'] = share_data_offset
2466         self.offsets['verification_key_end'] = verification_key_end
2467         self.offsets['EOF'] = eof_offset
2468
2469         # the private key,
2470         data += self.encprivkey
2471         # the sharehashes
2472         data += self.share_hash_chain_s
2473         # the signature,
2474         data += self.signature
2475         # and the verification key
2476         data += self.verification_key
2477         # Then we'll add in gibberish until we get to the right point.
2478         nulls = "".join([" " for i in xrange(len(data), share_data_offset)])
2479         data += nulls
2480
2481         # Then the share data
2482         data += sharedata
2483         # the blockhashes
2484         data += self.block_hash_tree_s
2485         return data
2486
2487     def write_test_share_to_server(self,
2488                                    storage_index,
2489                                    tail_segment=False,
2490                                    empty=False):
2491         """
2492         I write some data for the read tests to read to self.aa
2493
2494         If tail_segment=True, then I will write a share that has a
2495         smaller tail segment than other segments.
2496         """
2497         write = self.aa.remote_slot_testv_and_readv_and_writev
2498         data = self.build_test_mdmf_share(tail_segment, empty)
2499         # Finally, we write the whole thing to the storage server in one
2500         # pass.
2501         testvs = [(0, 1, "eq", "")]
2502         tws = {}
2503         tws[0] = (testvs, [(0, data)], None)
2504         readv = [(0, 1)]
2505         d = write(storage_index, self.secrets, tws, readv)
2506         d.addCallback(lambda res: self.failUnless(res[0]))
2507         return d
2508
2509     def build_test_sdmf_share(self, empty=False):
2510         if empty:
2511             sharedata = ""
2512         else:
2513             sharedata = self.segment * 6
2514         self.sharedata = sharedata
2515         blocksize = len(sharedata) / 3
2516         block = sharedata[:blocksize]
2517         self.blockdata = block
2518         prefix = struct.pack(">BQ32s16s BBQQ",
2519                              0, # version,
2520                              0,
2521                              self.root_hash,
2522                              self.salt,
2523                              3,
2524                              10,
2525                              len(sharedata),
2526                              len(sharedata),
2527                             )
2528         post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
2529         signature_offset = post_offset + len(self.verification_key)
2530         sharehashes_offset = signature_offset + len(self.signature)
2531         blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
2532         sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
2533         encprivkey_offset = sharedata_offset + len(block)
2534         eof_offset = encprivkey_offset + len(self.encprivkey)
2535         offsets = struct.pack(">LLLLQQ",
2536                               signature_offset,
2537                               sharehashes_offset,
2538                               blockhashes_offset,
2539                               sharedata_offset,
2540                               encprivkey_offset,
2541                               eof_offset)
2542         final_share = "".join([prefix,
2543                            offsets,
2544                            self.verification_key,
2545                            self.signature,
2546                            self.share_hash_chain_s,
2547                            self.block_hash_tree_s,
2548                            block,
2549                            self.encprivkey])
2550         self.offsets = {}
2551         self.offsets['signature'] = signature_offset
2552         self.offsets['share_hash_chain'] = sharehashes_offset
2553         self.offsets['block_hash_tree'] = blockhashes_offset
2554         self.offsets['share_data'] = sharedata_offset
2555         self.offsets['enc_privkey'] = encprivkey_offset
2556         self.offsets['EOF'] = eof_offset
2557         return final_share
2558
2559     def write_sdmf_share_to_server(self,
2560                                    storage_index,
2561                                    empty=False):
2562         # Some tests need SDMF shares to verify that we can still
2563         # read them. This method writes one, which resembles but is not
2564         assert self.rref
2565         write = self.aa.remote_slot_testv_and_readv_and_writev
2566         share = self.build_test_sdmf_share(empty)
2567         testvs = [(0, 1, "eq", "")]
2568         tws = {}
2569         tws[0] = (testvs, [(0, share)], None)
2570         readv = []
2571         d = write(storage_index, self.secrets, tws, readv)
2572         d.addCallback(lambda res: self.failUnless(res[0]))
2573         return d
2574
2575
2576     def test_read(self):
2577         self.init("test_read")
2578
2579         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2580         d = defer.succeed(None)
2581         d.addCallback(lambda ign: self.write_test_share_to_server("si1"))
2582
2583         # Check that every method equals what we expect it to.
2584         def _check_block_and_salt((block, salt)):
2585             self.failUnlessEqual(block, self.block)
2586             self.failUnlessEqual(salt, self.salt)
2587
2588         for i in xrange(6):
2589             d.addCallback(lambda ignored, i=i:
2590                 mr.get_block_and_salt(i))
2591             d.addCallback(_check_block_and_salt)
2592
2593         d.addCallback(lambda ignored:
2594             mr.get_encprivkey())
2595         d.addCallback(lambda encprivkey:
2596             self.failUnlessEqual(self.encprivkey, encprivkey))
2597
2598         d.addCallback(lambda ignored:
2599             mr.get_blockhashes())
2600         d.addCallback(lambda blockhashes:
2601             self.failUnlessEqual(self.block_hash_tree, blockhashes))
2602
2603         d.addCallback(lambda ignored:
2604             mr.get_sharehashes())
2605         d.addCallback(lambda sharehashes:
2606             self.failUnlessEqual(self.share_hash_chain, sharehashes))
2607
2608         d.addCallback(lambda ignored:
2609             mr.get_signature())
2610         d.addCallback(lambda signature:
2611             self.failUnlessEqual(signature, self.signature))
2612
2613         d.addCallback(lambda ignored:
2614             mr.get_verification_key())
2615         d.addCallback(lambda verification_key:
2616             self.failUnlessEqual(verification_key, self.verification_key))
2617
2618         d.addCallback(lambda ignored:
2619             mr.get_seqnum())
2620         d.addCallback(lambda seqnum:
2621             self.failUnlessEqual(seqnum, 0))
2622
2623         d.addCallback(lambda ignored:
2624             mr.get_root_hash())
2625         d.addCallback(lambda root_hash:
2626             self.failUnlessEqual(self.root_hash, root_hash))
2627
2628         d.addCallback(lambda ignored:
2629             mr.get_seqnum())
2630         d.addCallback(lambda seqnum:
2631             self.failUnlessEqual(0, seqnum))
2632
2633         d.addCallback(lambda ignored:
2634             mr.get_encoding_parameters())
2635         def _check_encoding_parameters((k, n, segsize, datalen)):
2636             self.failUnlessEqual(k, 3)
2637             self.failUnlessEqual(n, 10)
2638             self.failUnlessEqual(segsize, 6)
2639             self.failUnlessEqual(datalen, 36)
2640         d.addCallback(_check_encoding_parameters)
2641
2642         d.addCallback(lambda ignored:
2643             mr.get_checkstring())
2644         d.addCallback(lambda checkstring:
2645             self.failUnlessEqual(checkstring, checkstring))
2646         return d
2647
2648     def test_read_with_different_tail_segment_size(self):
2649         self.init("test_read_with_different_tail_segment_size")
2650
2651         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2652         d = defer.succeed(None)
2653         d.addCallback(lambda ign: self.write_test_share_to_server("si1", tail_segment=True))
2654
2655         d.addCallback(lambda ign: mr.get_block_and_salt(5))
2656         def _check_tail_segment(results):
2657             block, salt = results
2658             self.failUnlessEqual(len(block), 1)
2659             self.failUnlessEqual(block, "a")
2660         d.addCallback(_check_tail_segment)
2661         return d
2662
2663     def test_get_block_with_invalid_segnum(self):
2664         self.init("test_get_block_with_invalid_segnum")
2665
2666         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2667         d = defer.succeed(None)
2668         d.addCallback(lambda ign: self.write_test_share_to_server("si1"))
2669         d.addCallback(lambda ignored:
2670             self.shouldFail(LayoutInvalid, "test invalid segnum",
2671                             None,
2672                             mr.get_block_and_salt, 7))
2673         return d
2674
2675     def test_get_encoding_parameters_first(self):
2676         self.init("test_get_encoding_parameters_first")
2677
2678         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2679         d = defer.succeed(None)
2680         d.addCallback(lambda ign: self.write_test_share_to_server("si1"))
2681         d.addCallback(lambda ign: mr.get_encoding_parameters())
2682         def _check_encoding_parameters((k, n, segment_size, datalen)):
2683             self.failUnlessEqual(k, 3)
2684             self.failUnlessEqual(n, 10)
2685             self.failUnlessEqual(segment_size, 6)
2686             self.failUnlessEqual(datalen, 36)
2687         d.addCallback(_check_encoding_parameters)
2688         return d
2689
2690     def test_get_seqnum_first(self):
2691         self.init("test_get_seqnum_first")
2692
2693         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2694         d = defer.succeed(None)
2695         d.addCallback(lambda ign: self.write_test_share_to_server("si1"))
2696         d.addCallback(lambda ign: mr.get_seqnum())
2697         d.addCallback(lambda seqnum:
2698             self.failUnlessEqual(seqnum, 0))
2699         return d
2700
2701     def test_get_root_hash_first(self):
2702         self.init("test_root_hash_first")
2703
2704         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2705         d = defer.succeed(None)
2706         d.addCallback(lambda ign: self.write_test_share_to_server("si1"))
2707         d.addCallback(lambda ign: mr.get_root_hash())
2708         d.addCallback(lambda root_hash:
2709             self.failUnlessEqual(root_hash, self.root_hash))
2710         return d
2711
2712     def test_get_checkstring_first(self):
2713         self.init("test_checkstring_first")
2714
2715         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2716         d = defer.succeed(None)
2717         d.addCallback(lambda ign: self.write_test_share_to_server("si1"))
2718         d.addCallback(lambda ign: mr.get_checkstring())
2719         d.addCallback(lambda checkstring:
2720             self.failUnlessEqual(checkstring, self.checkstring))
2721         return d
2722
2723     def test_write_read_vectors(self):
2724         self.init("test_write_read_vectors")
2725
2726         # When writing for us, the storage server will return to us a
2727         # read vector, along with its result. If a write fails because
2728         # the test vectors failed, this read vector can help us to
2729         # diagnose the problem. This test ensures that the read vector
2730         # is working appropriately.
2731         mw = self._make_new_mw("si1", 0)
2732
2733         for i in xrange(6):
2734             mw.put_block(self.block, i, self.salt)
2735         mw.put_encprivkey(self.encprivkey)
2736         mw.put_blockhashes(self.block_hash_tree)
2737         mw.put_sharehashes(self.share_hash_chain)
2738         mw.put_root_hash(self.root_hash)
2739         mw.put_signature(self.signature)
2740         mw.put_verification_key(self.verification_key)
2741
2742         d = mw.finish_publishing()
2743         def _then(results):
2744             self.failUnless(len(results), 2)
2745             result, readv = results
2746             self.failUnless(result)
2747             self.failIf(readv)
2748             self.old_checkstring = mw.get_checkstring()
2749             mw.set_checkstring("")
2750         d.addCallback(_then)
2751         d.addCallback(lambda ignored:
2752             mw.finish_publishing())
2753         def _then_again(results):
2754             self.failUnlessEqual(len(results), 2)
2755             result, readvs = results
2756             self.failIf(result)
2757             self.failUnlessIn(0, readvs)
2758             readv = readvs[0][0]
2759             self.failUnlessEqual(readv, self.old_checkstring)
2760         d.addCallback(_then_again)
2761         # The checkstring remains the same for the rest of the process.
2762         return d
2763
2764     def test_private_key_after_share_hash_chain(self):
2765         self.init("test_private_key_after_share_hash_chain")
2766
2767         mw = self._make_new_mw("si1", 0)
2768         d = defer.succeed(None)
2769         for i in xrange(6):
2770             d.addCallback(lambda ignored, i=i:
2771                 mw.put_block(self.block, i, self.salt))
2772         d.addCallback(lambda ignored:
2773             mw.put_encprivkey(self.encprivkey))
2774         d.addCallback(lambda ignored:
2775             mw.put_sharehashes(self.share_hash_chain))
2776
2777         # Now try to put the private key again.
2778         d.addCallback(lambda ignored:
2779             self.shouldFail(LayoutInvalid, "test repeat private key",
2780                             None,
2781                             mw.put_encprivkey, self.encprivkey))
2782         return d
2783
2784     def test_signature_after_verification_key(self):
2785         self.init("test_signature_after_verification_key")
2786
2787         mw = self._make_new_mw("si1", 0)
2788         d = defer.succeed(None)
2789         # Put everything up to and including the verification key.
2790         for i in xrange(6):
2791             d.addCallback(lambda ignored, i=i:
2792                 mw.put_block(self.block, i, self.salt))
2793         d.addCallback(lambda ignored:
2794             mw.put_encprivkey(self.encprivkey))
2795         d.addCallback(lambda ignored:
2796             mw.put_blockhashes(self.block_hash_tree))
2797         d.addCallback(lambda ignored:
2798             mw.put_sharehashes(self.share_hash_chain))
2799         d.addCallback(lambda ignored:
2800             mw.put_root_hash(self.root_hash))
2801         d.addCallback(lambda ignored:
2802             mw.put_signature(self.signature))
2803         d.addCallback(lambda ignored:
2804             mw.put_verification_key(self.verification_key))
2805         # Now try to put the signature again. This should fail
2806         d.addCallback(lambda ignored:
2807             self.shouldFail(LayoutInvalid, "signature after verification",
2808                             None,
2809                             mw.put_signature, self.signature))
2810         return d
2811
2812     def test_uncoordinated_write(self):
2813         self.init("test_uncoordinated_write")
2814
2815         # Make two mutable writers, both pointing to the same storage
2816         # server, both at the same storage index, and try writing to the
2817         # same share.
2818         mw1 = self._make_new_mw("si1", 0)
2819         mw2 = self._make_new_mw("si1", 0)
2820
2821         def _check_success(results):
2822             result, readvs = results
2823             self.failUnless(result)
2824
2825         def _check_failure(results):
2826             result, readvs = results
2827             self.failIf(result)
2828
2829         def _write_share(mw):
2830             for i in xrange(6):
2831                 mw.put_block(self.block, i, self.salt)
2832             mw.put_encprivkey(self.encprivkey)
2833             mw.put_blockhashes(self.block_hash_tree)
2834             mw.put_sharehashes(self.share_hash_chain)
2835             mw.put_root_hash(self.root_hash)
2836             mw.put_signature(self.signature)
2837             mw.put_verification_key(self.verification_key)
2838             return mw.finish_publishing()
2839         d = _write_share(mw1)
2840         d.addCallback(_check_success)
2841         d.addCallback(lambda ignored:
2842             _write_share(mw2))
2843         d.addCallback(_check_failure)
2844         return d
2845
2846     def test_invalid_salt_size(self):
2847         self.init("test_invalid_salt_size")
2848
2849         # Salts need to be 16 bytes in size. Writes that attempt to
2850         # write more or less than this should be rejected.
2851         mw = self._make_new_mw("si1", 0)
2852         invalid_salt = "a" * 17 # 17 bytes
2853         another_invalid_salt = "b" * 15 # 15 bytes
2854         d = defer.succeed(None)
2855         d.addCallback(lambda ignored:
2856             self.shouldFail(LayoutInvalid, "salt too big",
2857                             None,
2858                             mw.put_block, self.block, 0, invalid_salt))
2859         d.addCallback(lambda ignored:
2860             self.shouldFail(LayoutInvalid, "salt too small",
2861                             None,
2862                             mw.put_block, self.block, 0,
2863                             another_invalid_salt))
2864         return d
2865
2866     def test_write_test_vectors(self):
2867         self.init("test_write_test_vectors")
2868
2869         # If we give the write proxy a bogus test vector at
2870         # any point during the process, it should fail to write when we
2871         # tell it to write.
2872         def _check_failure(results):
2873             self.failUnlessEqual(len(results), 2)
2874             res, d = results
2875             self.failIf(res)
2876
2877         def _check_success(results):
2878             self.failUnlessEqual(len(results), 2)
2879             res, d = results
2880             self.failUnless(results)
2881
2882         mw = self._make_new_mw("si1", 0)
2883         mw.set_checkstring("this is a lie")
2884         for i in xrange(6):
2885             mw.put_block(self.block, i, self.salt)
2886         mw.put_encprivkey(self.encprivkey)
2887         mw.put_blockhashes(self.block_hash_tree)
2888         mw.put_sharehashes(self.share_hash_chain)
2889         mw.put_root_hash(self.root_hash)
2890         mw.put_signature(self.signature)
2891         mw.put_verification_key(self.verification_key)
2892         d = mw.finish_publishing()
2893         d.addCallback(_check_failure)
2894         d.addCallback(lambda ignored:
2895             mw.set_checkstring(""))
2896         d.addCallback(lambda ignored:
2897             mw.finish_publishing())
2898         d.addCallback(_check_success)
2899         return d
2900
2901
2902     def serialize_blockhashes(self, blockhashes):
2903         return "".join(blockhashes)
2904
2905     def serialize_sharehashes(self, sharehashes):
2906         ret = "".join([struct.pack(">H32s", i, sharehashes[i])
2907                         for i in sorted(sharehashes.keys())])
2908         return ret
2909
2910
2911     def test_write(self):
2912         self.init("test_write")
2913
2914         # This translates to a file with 6 6-byte segments, and with 2-byte
2915         # blocks.
2916         mw = self._make_new_mw("si1", 0)
2917         # Test writing some blocks.
2918         read = self.aa.remote_slot_readv
2919         expected_private_key_offset = struct.calcsize(MDMFHEADER)
2920         expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
2921                                     PRIVATE_KEY_SIZE + \
2922                                     SIGNATURE_SIZE + \
2923                                     VERIFICATION_KEY_SIZE + \
2924                                     SHARE_HASH_CHAIN_SIZE
2925         written_block_size = 2 + len(self.salt)
2926         written_block = self.block + self.salt
2927         for i in xrange(6):
2928             mw.put_block(self.block, i, self.salt)
2929
2930         mw.put_encprivkey(self.encprivkey)
2931         mw.put_blockhashes(self.block_hash_tree)
2932         mw.put_sharehashes(self.share_hash_chain)
2933         mw.put_root_hash(self.root_hash)
2934         mw.put_signature(self.signature)
2935         mw.put_verification_key(self.verification_key)
2936
2937         d = mw.finish_publishing()
2938         d.addCallback(lambda (result, ign): self.failUnless(result, "publish failed"))
2939
2940         for i in xrange(6):
2941             d.addCallback(lambda ign, i=i: read("si1", [0],
2942                                                 [(expected_sharedata_offset + (i * written_block_size),
2943                                                   written_block_size)]))
2944             d.addCallback(lambda res: self.failUnlessEqual(res, {0: [written_block]}))
2945
2946             d.addCallback(lambda ign: self.failUnlessEqual(len(self.encprivkey), 7))
2947             d.addCallback(lambda ign: read("si1", [0], [(expected_private_key_offset, 7)]))
2948             d.addCallback(lambda res: self.failUnlessEqual(res, {0: [self.encprivkey]}))
2949
2950             expected_block_hash_offset = expected_sharedata_offset + (6 * written_block_size)
2951             d.addCallback(lambda ign: self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6))
2952             d.addCallback(lambda ign, ebho=expected_block_hash_offset:
2953                                       read("si1", [0], [(ebho, 32 * 6)]))
2954             d.addCallback(lambda res: self.failUnlessEqual(res, {0: [self.block_hash_tree_s]}))
2955
2956             expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey)
2957             d.addCallback(lambda ign, esho=expected_share_hash_offset:
2958                                       read("si1", [0], [(esho, (32 + 2) * 6)]))
2959             d.addCallback(lambda res: self.failUnlessEqual(res, {0: [self.share_hash_chain_s]}))
2960
2961             d.addCallback(lambda ign: read("si1", [0], [(9, 32)]))
2962             d.addCallback(lambda res: self.failUnlessEqual(res,  {0: [self.root_hash]}))
2963
2964             expected_signature_offset = expected_share_hash_offset + len(self.share_hash_chain_s)
2965             d.addCallback(lambda ign: self.failUnlessEqual(len(self.signature), 9))
2966             d.addCallback(lambda ign, esigo=expected_signature_offset:
2967                                       read("si1", [0], [(esigo, 9)]))
2968             d.addCallback(lambda res: self.failUnlessEqual(res, {0: [self.signature]}))
2969
2970             expected_verification_key_offset = expected_signature_offset + len(self.signature)
2971             d.addCallback(lambda ign: self.failUnlessEqual(len(self.verification_key), 6))
2972             d.addCallback(lambda ign, evko=expected_verification_key_offset:
2973                                       read("si1", [0], [(evko, 6)]))
2974             d.addCallback(lambda res: self.failUnlessEqual(res, {0: [self.verification_key]}))
2975
2976             def _check_other_fields(ign, ebho=expected_block_hash_offset,
2977                                          esho=expected_share_hash_offset,
2978                                          esigo=expected_signature_offset,
2979                                          evko=expected_verification_key_offset):
2980                 signable = mw.get_signable()
2981                 verno, seq, roothash, k, N, segsize, datalen = struct.unpack(">BQ32sBBQQ",
2982                                                                              signable)
2983                 self.failUnlessEqual(verno, 1)
2984                 self.failUnlessEqual(seq, 0)
2985                 self.failUnlessEqual(roothash, self.root_hash)
2986                 self.failUnlessEqual(k, 3)
2987                 self.failUnlessEqual(N, 10)
2988                 self.failUnlessEqual(segsize, 6)
2989                 self.failUnlessEqual(datalen, 36)
2990
2991                 def _check_field(res, offset, fmt, which, value):
2992                     encoded = struct.pack(fmt, value)
2993                     d3 = defer.succeed(None)
2994                     d3.addCallback(lambda ign: read("si1", [0], [(offset, len(encoded))]))
2995                     d3.addCallback(lambda res: self.failUnlessEqual(res, {0: [encoded]}, which))
2996                     return d3
2997
2998                 d2 = defer.succeed(None)
2999                 d2.addCallback(_check_field,   0, ">B", "version number", verno)
3000                 d2.addCallback(_check_field,   1, ">Q", "sequence number", seq)
3001                 d2.addCallback(_check_field,  41, ">B", "k", k)
3002                 d2.addCallback(_check_field,  42, ">B", "N", N)
3003                 d2.addCallback(_check_field,  43, ">Q", "segment size", segsize)
3004                 d2.addCallback(_check_field,  51, ">Q", "data length", datalen)
3005                 d2.addCallback(_check_field,  59, ">Q", "private key offset",
3006                                              expected_private_key_offset)
3007                 d2.addCallback(_check_field,  67, ">Q", "share hash offset", esho)
3008                 d2.addCallback(_check_field,  75, ">Q", "signature offset", esigo)
3009                 d2.addCallback(_check_field,  83, ">Q", "verification key offset", evko)
3010                 d2.addCallback(_check_field,  91, ">Q", "end of verification key",
3011                                              evko + len(self.verification_key))
3012                 d2.addCallback(_check_field,  99, ">Q", "sharedata offset",
3013                                              expected_sharedata_offset)
3014                 d2.addCallback(_check_field, 107, ">Q", "block hash offset", ebho)
3015                 d2.addCallback(_check_field, 115, ">Q", "eof offset",
3016                                              ebho + len(self.block_hash_tree_s))
3017                 return d2
3018             d.addCallback(_check_other_fields)
3019
3020         return d
3021
3022
3023     def _make_new_mw(self, si, share, datalength=36):
3024         # This is a file of size 36 bytes. Since it has a segment
3025         # size of 6, we know that it has 6 byte segments, which will
3026         # be split into blocks of 2 bytes because our FEC k
3027         # parameter is 3.
3028         mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
3029                                 6, datalength)
3030         return mw
3031
3032     def test_write_rejected_with_too_many_blocks(self):
3033         self.init("test_write_rejected_with_too_many_blocks")
3034
3035         mw = self._make_new_mw("si0", 0)
3036
3037         # Try writing too many blocks. We should not be able to write
3038         # more than 6
3039         # blocks into each share.
3040         d = defer.succeed(None)
3041         for i in xrange(6):
3042             d.addCallback(lambda ignored, i=i:
3043                 mw.put_block(self.block, i, self.salt))
3044         d.addCallback(lambda ignored:
3045             self.shouldFail(LayoutInvalid, "too many blocks",
3046                             None,
3047                             mw.put_block, self.block, 7, self.salt))
3048         return d
3049
3050     def test_write_rejected_with_invalid_salt(self):
3051         self.init("test_write_rejected_with_invalid_salt")
3052
3053         # Try writing an invalid salt. Salts are 16 bytes -- any more or
3054         # less should cause an error.
3055         mw = self._make_new_mw("si1", 0)
3056         bad_salt = "a" * 17 # 17 bytes
3057         d = defer.succeed(None)
3058         d.addCallback(lambda ignored:
3059             self.shouldFail(LayoutInvalid, "test_invalid_salt",
3060                             None, mw.put_block, self.block, 7, bad_salt))
3061         return d
3062
3063     def test_write_rejected_with_invalid_root_hash(self):
3064         self.init("test_write_rejected_with_invalid_root_hash")
3065
3066         # Try writing an invalid root hash. This should be SHA256d, and
3067         # 32 bytes long as a result.
3068         mw = self._make_new_mw("si2", 0)
3069         # 17 bytes != 32 bytes
3070         invalid_root_hash = "a" * 17
3071         d = defer.succeed(None)
3072         # Before this test can work, we need to put some blocks + salts,
3073         # a block hash tree, and a share hash tree. Otherwise, we'll see
3074         # failures that match what we are looking for, but are caused by
3075         # the constraints imposed on operation ordering.
3076         for i in xrange(6):
3077             d.addCallback(lambda ignored, i=i:
3078                 mw.put_block(self.block, i, self.salt))
3079         d.addCallback(lambda ignored:
3080             mw.put_encprivkey(self.encprivkey))
3081         d.addCallback(lambda ignored:
3082             mw.put_blockhashes(self.block_hash_tree))
3083         d.addCallback(lambda ignored:
3084             mw.put_sharehashes(self.share_hash_chain))
3085         d.addCallback(lambda ignored:
3086             self.shouldFail(LayoutInvalid, "invalid root hash",
3087                             None, mw.put_root_hash, invalid_root_hash))
3088         return d
3089
3090     def test_write_rejected_with_invalid_blocksize(self):
3091         self.init("test_write_rejected_with_invalid_blocksize")
3092
3093         # The blocksize implied by the writer that we get from
3094         # _make_new_mw is 2bytes -- any more or any less than this
3095         # should be cause for failure, unless it is the tail segment, in
3096         # which case it may not be failure.
3097         invalid_block = "a"
3098         mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
3099                                              # one byte blocks
3100         # 1 bytes != 2 bytes
3101         d = defer.succeed(None)
3102         d.addCallback(lambda ignored, invalid_block=invalid_block:
3103             self.shouldFail(LayoutInvalid, "test blocksize too small",
3104                             None, mw.put_block, invalid_block, 0,
3105                             self.salt))
3106         invalid_block = invalid_block * 3
3107         # 3 bytes != 2 bytes
3108         d.addCallback(lambda ignored:
3109             self.shouldFail(LayoutInvalid, "test blocksize too large",
3110                             None,
3111                             mw.put_block, invalid_block, 0, self.salt))
3112         for i in xrange(5):
3113             d.addCallback(lambda ignored, i=i:
3114                 mw.put_block(self.block, i, self.salt))
3115         # Try to put an invalid tail segment
3116         d.addCallback(lambda ignored:
3117             self.shouldFail(LayoutInvalid, "test invalid tail segment",
3118                             None,
3119                             mw.put_block, self.block, 5, self.salt))
3120         valid_block = "a"
3121         d.addCallback(lambda ignored:
3122             mw.put_block(valid_block, 5, self.salt))
3123         return d
3124
3125     def test_write_enforces_order_constraints(self):
3126         self.init("test_write_enforces_order_constraints")
3127
3128         # We require that the MDMFSlotWriteProxy be interacted with in a
3129         # specific way.
3130         # That way is:
3131         # 0: __init__
3132         # 1: write blocks and salts
3133         # 2: Write the encrypted private key
3134         # 3: Write the block hashes
3135         # 4: Write the share hashes
3136         # 5: Write the root hash and salt hash
3137         # 6: Write the signature and verification key
3138         # 7: Write the file.
3139         #
3140         # Some of these can be performed out-of-order, and some can't.
3141         # The dependencies that I want to test here are:
3142         #  - Private key before block hashes
3143         #  - share hashes and block hashes before root hash
3144         #  - root hash before signature
3145         #  - signature before verification key
3146         mw0 = self._make_new_mw("si0", 0)
3147         # Write some shares
3148         d = defer.succeed(None)
3149         for i in xrange(6):
3150             d.addCallback(lambda ign, i=i:
3151                           mw0.put_block(self.block, i, self.salt))
3152
3153         # Try to write the share hash chain without writing the
3154         # encrypted private key
3155         d.addCallback(lambda ignored:
3156             self.shouldFail(LayoutInvalid, "share hash chain before "
3157                                            "private key",
3158                             None,
3159                             lambda: mw0.put_sharehashes(self.share_hash_chain) ))
3160
3161         # Write the private key.
3162         d.addCallback(lambda ign: mw0.put_encprivkey(self.encprivkey))
3163
3164         # Now write the block hashes and try again
3165         d.addCallback(lambda ignored:
3166             mw0.put_blockhashes(self.block_hash_tree))
3167
3168         # We haven't yet put the root hash on the share, so we shouldn't
3169         # be able to sign it.
3170         d.addCallback(lambda ignored:
3171             self.shouldFail(LayoutInvalid, "signature before root hash",
3172                             None,
3173                             lambda: mw0.put_signature(self.signature) ))
3174
3175         d.addCallback(lambda ignored:
3176             self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
3177
3178         # ..and, since that fails, we also shouldn't be able to put the
3179         # verification key.
3180         d.addCallback(lambda ignored:
3181             self.shouldFail(LayoutInvalid, "key before signature",
3182                             None,
3183                             lambda: mw0.put_verification_key(self.verification_key) ))
3184
3185         # Now write the share hashes.
3186         d.addCallback(lambda ign: mw0.put_sharehashes(self.share_hash_chain))
3187
3188         # We should be able to write the root hash now too
3189         d.addCallback(lambda ign: mw0.put_root_hash(self.root_hash))
3190
3191         # We should still be unable to put the verification key
3192         d.addCallback(lambda ignored:
3193             self.shouldFail(LayoutInvalid, "key before signature",
3194                             None,
3195                             lambda: mw0.put_verification_key(self.verification_key) ))
3196
3197         d.addCallback(lambda ign: mw0.put_signature(self.signature))
3198
3199         # We shouldn't be able to write the offsets to the remote server
3200         # until the offset table is finished; IOW, until we have written
3201         # the verification key.
3202         d.addCallback(lambda ignored:
3203             self.shouldFail(LayoutInvalid, "offsets before verification key",
3204                             None,
3205                             mw0.finish_publishing))
3206
3207         d.addCallback(lambda ignored:
3208             mw0.put_verification_key(self.verification_key))
3209         return d
3210
3211     def test_end_to_end(self):
3212         self.init("test_end_to_end")
3213
3214         mw = self._make_new_mw("si1", 0)
3215         # Write a share using the mutable writer, and make sure that the
3216         # reader knows how to read everything back to us.
3217         d = defer.succeed(None)
3218         for i in xrange(6):
3219             d.addCallback(lambda ignored, i=i:
3220                 mw.put_block(self.block, i, self.salt))
3221         d.addCallback(lambda ignored:
3222             mw.put_encprivkey(self.encprivkey))
3223         d.addCallback(lambda ignored:
3224             mw.put_blockhashes(self.block_hash_tree))
3225         d.addCallback(lambda ignored:
3226             mw.put_sharehashes(self.share_hash_chain))
3227         d.addCallback(lambda ignored:
3228             mw.put_root_hash(self.root_hash))
3229         d.addCallback(lambda ignored:
3230             mw.put_signature(self.signature))
3231         d.addCallback(lambda ignored:
3232             mw.put_verification_key(self.verification_key))
3233         d.addCallback(lambda ignored:
3234             mw.finish_publishing())
3235
3236         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3237         def _check_block_and_salt((block, salt)):
3238             self.failUnlessEqual(block, self.block)
3239             self.failUnlessEqual(salt, self.salt)
3240
3241         for i in xrange(6):
3242             d.addCallback(lambda ignored, i=i:
3243                 mr.get_block_and_salt(i))
3244             d.addCallback(_check_block_and_salt)
3245
3246         d.addCallback(lambda ignored:
3247             mr.get_encprivkey())
3248         d.addCallback(lambda encprivkey:
3249             self.failUnlessEqual(self.encprivkey, encprivkey))
3250
3251         d.addCallback(lambda ignored:
3252             mr.get_blockhashes())
3253         d.addCallback(lambda blockhashes:
3254             self.failUnlessEqual(self.block_hash_tree, blockhashes))
3255
3256         d.addCallback(lambda ignored:
3257             mr.get_sharehashes())
3258         d.addCallback(lambda sharehashes:
3259             self.failUnlessEqual(self.share_hash_chain, sharehashes))
3260
3261         d.addCallback(lambda ignored:
3262             mr.get_signature())
3263         d.addCallback(lambda signature:
3264             self.failUnlessEqual(signature, self.signature))
3265
3266         d.addCallback(lambda ignored:
3267             mr.get_verification_key())
3268         d.addCallback(lambda verification_key:
3269             self.failUnlessEqual(verification_key, self.verification_key))
3270
3271         d.addCallback(lambda ignored:
3272             mr.get_seqnum())
3273         d.addCallback(lambda seqnum:
3274             self.failUnlessEqual(seqnum, 0))
3275
3276         d.addCallback(lambda ignored:
3277             mr.get_root_hash())
3278         d.addCallback(lambda root_hash:
3279             self.failUnlessEqual(self.root_hash, root_hash))
3280
3281         d.addCallback(lambda ignored:
3282             mr.get_encoding_parameters())
3283         def _check_encoding_parameters((k, n, segsize, datalen)):
3284             self.failUnlessEqual(k, 3)
3285             self.failUnlessEqual(n, 10)
3286             self.failUnlessEqual(segsize, 6)
3287             self.failUnlessEqual(datalen, 36)
3288         d.addCallback(_check_encoding_parameters)
3289
3290         d.addCallback(lambda ignored:
3291             mr.get_checkstring())
3292         d.addCallback(lambda checkstring:
3293             self.failUnlessEqual(checkstring, mw.get_checkstring()))
3294         return d
3295
3296     def test_is_sdmf(self):
3297         self.init("test_is_sdmf")
3298
3299         # The MDMFSlotReadProxy should also know how to read SDMF files,
3300         # since it will encounter them on the grid. Callers use the
3301         # is_sdmf method to test this.
3302         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3303         d = defer.succeed(None)
3304         d.addCallback(lambda ign: self.write_sdmf_share_to_server("si1"))
3305         d.addCallback(lambda ign: mr.is_sdmf())
3306         d.addCallback(lambda issdmf: self.failUnless(issdmf))
3307         return d
3308
3309     def test_reads_sdmf(self):
3310         self.init("test_reads_sdmf")
3311
3312         # The slot read proxy should, naturally, know how to tell us
3313         # about data in the SDMF format
3314         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3315         d = defer.succeed(None)
3316         d.addCallback(lambda ign: self.write_sdmf_share_to_server("si1"))
3317         d.addCallback(lambda ign: mr.is_sdmf())
3318         d.addCallback(lambda issdmf: self.failUnless(issdmf))
3319
3320         # What do we need to read?
3321         #  - The sharedata
3322         #  - The salt
3323         d.addCallback(lambda ignored:
3324             mr.get_block_and_salt(0))
3325         def _check_block_and_salt(results):
3326             block, salt = results
3327             # Our original file is 36 bytes long. Then each share is 12
3328             # bytes in size. The share is composed entirely of the
3329             # letter a. self.block contains 2 as, so 6 * self.block is
3330             # what we are looking for.
3331             self.failUnlessEqual(block, self.block * 6)
3332             self.failUnlessEqual(salt, self.salt)
3333         d.addCallback(_check_block_and_salt)
3334
3335         #  - The blockhashes
3336         d.addCallback(lambda ignored:
3337             mr.get_blockhashes())
3338         d.addCallback(lambda blockhashes:
3339             self.failUnlessEqual(self.block_hash_tree,
3340                                  blockhashes,
3341                                  blockhashes))
3342         #  - The sharehashes
3343         d.addCallback(lambda ignored:
3344             mr.get_sharehashes())
3345         d.addCallback(lambda sharehashes:
3346             self.failUnlessEqual(self.share_hash_chain,
3347                                  sharehashes))
3348         #  - The keys
3349         d.addCallback(lambda ignored:
3350             mr.get_encprivkey())
3351         d.addCallback(lambda encprivkey:
3352             self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
3353         d.addCallback(lambda ignored:
3354             mr.get_verification_key())
3355         d.addCallback(lambda verification_key:
3356             self.failUnlessEqual(verification_key,
3357                                  self.verification_key,
3358                                  verification_key))
3359         #  - The signature
3360         d.addCallback(lambda ignored:
3361             mr.get_signature())
3362         d.addCallback(lambda signature:
3363             self.failUnlessEqual(signature, self.signature, signature))
3364
3365         #  - The sequence number
3366         d.addCallback(lambda ignored:
3367             mr.get_seqnum())
3368         d.addCallback(lambda seqnum:
3369             self.failUnlessEqual(seqnum, 0, seqnum))
3370
3371         #  - The root hash
3372         d.addCallback(lambda ignored:
3373             mr.get_root_hash())
3374         d.addCallback(lambda root_hash:
3375             self.failUnlessEqual(root_hash, self.root_hash, root_hash))
3376         return d
3377
3378     def test_only_reads_one_segment_sdmf(self):
3379         self.init("test_only_reads_one_segment_sdmf")
3380
3381         # SDMF shares have only one segment, so it doesn't make sense to
3382         # read more segments than that. The reader should know this and
3383         # complain if we try to do that.
3384         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3385         d = defer.succeed(None)
3386         d.addCallback(lambda ign: self.write_sdmf_share_to_server("si1"))
3387         d.addCallback(lambda ign: mr.is_sdmf())
3388         d.addCallback(lambda issdmf: self.failUnless(issdmf))
3389         d.addCallback(lambda ignored:
3390             self.shouldFail(LayoutInvalid, "test bad segment",
3391                             None,
3392                             mr.get_block_and_salt, 1))
3393         return d
3394
3395     def test_read_with_prefetched_mdmf_data(self):
3396         self.init("test_read_with_prefetched_mdmf_data")
3397
3398         # The MDMFSlotReadProxy will prefill certain fields if you pass
3399         # it data that you have already fetched. This is useful for
3400         # cases like the Servermap, which prefetches ~2kb of data while
3401         # finding out which shares are on the remote peer so that it
3402         # doesn't waste round trips.
3403         mdmf_data = self.build_test_mdmf_share()
3404         def _make_mr(ignored, length):
3405             mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
3406             return mr
3407
3408         d = defer.succeed(None)
3409         d.addCallback(lambda ign: self.write_test_share_to_server("si1"))
3410
3411         # This should be enough to fill in both the encoding parameters
3412         # and the table of offsets, which will complete the version
3413         # information tuple.
3414         d.addCallback(_make_mr, 123)
3415         d.addCallback(lambda mr:
3416             mr.get_verinfo())
3417         def _check_verinfo(verinfo):
3418             self.failUnless(verinfo)
3419             self.failUnlessEqual(len(verinfo), 9)
3420             (seqnum,
3421              root_hash,
3422              salt_hash,
3423              segsize,
3424              datalen,
3425              k,
3426              n,
3427              prefix,
3428              offsets) = verinfo
3429             self.failUnlessEqual(seqnum, 0)
3430             self.failUnlessEqual(root_hash, self.root_hash)
3431             self.failUnlessEqual(segsize, 6)
3432             self.failUnlessEqual(datalen, 36)
3433             self.failUnlessEqual(k, 3)
3434             self.failUnlessEqual(n, 10)
3435             expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
3436                                           1,
3437                                           seqnum,
3438                                           root_hash,
3439                                           k,
3440                                           n,
3441                                           segsize,
3442                                           datalen)
3443             self.failUnlessEqual(expected_prefix, prefix)
3444             self.failUnlessEqual(self.rref.read_count, 0)
3445         d.addCallback(_check_verinfo)
3446
3447         # This is not enough data to read a block and a share, so the
3448         # wrapper should attempt to read this from the remote server.
3449         d.addCallback(_make_mr, 123)
3450         d.addCallback(lambda mr:
3451             mr.get_block_and_salt(0))
3452         def _check_block_and_salt((block, salt)):
3453             self.failUnlessEqual(block, self.block)
3454             self.failUnlessEqual(salt, self.salt)
3455             self.failUnlessEqual(self.rref.read_count, 1)
3456
3457         # This should be enough data to read one block.
3458         d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140)
3459         d.addCallback(lambda mr:
3460             mr.get_block_and_salt(0))
3461         d.addCallback(_check_block_and_salt)
3462         return d
3463
3464     def test_read_with_prefetched_sdmf_data(self):
3465         self.init("test_read_with_prefetched_sdmf_data")
3466
3467         sdmf_data = self.build_test_sdmf_share()
3468         def _make_mr(ignored, length):
3469             mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
3470             return mr
3471
3472         d = defer.succeed(None)
3473         d.addCallback(lambda ign: self.write_sdmf_share_to_server("si1"))
3474
3475         # This should be enough to get us the encoding parameters,
3476         # offset table, and everything else we need to build a verinfo
3477         # string.
3478         d.addCallback(_make_mr, 123)
3479         d.addCallback(lambda mr:
3480             mr.get_verinfo())
3481         def _check_verinfo(verinfo):
3482             self.failUnless(verinfo)
3483             self.failUnlessEqual(len(verinfo), 9)
3484             (seqnum,
3485              root_hash,
3486              salt,
3487              segsize,
3488              datalen,
3489              k,
3490              n,
3491              prefix,
3492              offsets) = verinfo
3493             self.failUnlessEqual(seqnum, 0)
3494             self.failUnlessEqual(root_hash, self.root_hash)
3495             self.failUnlessEqual(salt, self.salt)
3496             self.failUnlessEqual(segsize, 36)
3497             self.failUnlessEqual(datalen, 36)
3498             self.failUnlessEqual(k, 3)
3499             self.failUnlessEqual(n, 10)
3500             expected_prefix = struct.pack(SIGNED_PREFIX,
3501                                           0,
3502                                           seqnum,
3503                                           root_hash,
3504                                           salt,
3505                                           k,
3506                                           n,
3507                                           segsize,
3508                                           datalen)
3509             self.failUnlessEqual(expected_prefix, prefix)
3510             self.failUnlessEqual(self.rref.read_count, 0)
3511         d.addCallback(_check_verinfo)
3512         # This shouldn't be enough to read any share data.
3513         d.addCallback(_make_mr, 123)
3514         d.addCallback(lambda mr:
3515             mr.get_block_and_salt(0))
3516         def _check_block_and_salt((block, salt)):
3517             self.failUnlessEqual(block, self.block * 6)
3518             self.failUnlessEqual(salt, self.salt)
3519             # TODO: Fix the read routine so that it reads only the data
3520             #       that it has cached if it can't read all of it.
3521             self.failUnlessEqual(self.rref.read_count, 2)
3522
3523         # This should be enough to read share data.
3524         d.addCallback(_make_mr, self.offsets['share_data'])
3525         d.addCallback(lambda mr:
3526             mr.get_block_and_salt(0))
3527         d.addCallback(_check_block_and_salt)
3528         return d
3529
3530     def test_read_with_empty_mdmf_file(self):
3531         self.init("test_read_with_empty_mdmf_file")
3532
3533         # Some tests upload a file with no contents to test things
3534         # unrelated to the actual handling of the content of the file.
3535         # The reader should behave intelligently in these cases.
3536         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3537         d = defer.succeed(None)
3538         d.addCallback(lambda ign: self.write_test_share_to_server("si1", empty=True))
3539         # We should be able to get the encoding parameters, and they
3540         # should be correct.
3541         d.addCallback(lambda ignored:
3542             mr.get_encoding_parameters())
3543         def _check_encoding_parameters(params):
3544             self.failUnlessEqual(len(params), 4)
3545             k, n, segsize, datalen = params
3546             self.failUnlessEqual(k, 3)
3547             self.failUnlessEqual(n, 10)
3548             self.failUnlessEqual(segsize, 0)
3549             self.failUnlessEqual(datalen, 0)
3550         d.addCallback(_check_encoding_parameters)
3551
3552         # We should not be able to fetch a block, since there are no
3553         # blocks to fetch
3554         d.addCallback(lambda ignored:
3555             self.shouldFail(LayoutInvalid, "get block on empty file",
3556                             None,
3557                             mr.get_block_and_salt, 0))
3558         return d
3559
3560     def test_read_with_empty_sdmf_file(self):
3561         self.init("test_read_with_empty_sdmf_file")
3562
3563         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3564         d = defer.succeed(None)
3565         d.addCallback(lambda ign: self.write_sdmf_share_to_server("si1", empty=True))
3566         # We should be able to get the encoding parameters, and they
3567         # should be correct
3568         d.addCallback(lambda ignored:
3569             mr.get_encoding_parameters())
3570         def _check_encoding_parameters(params):
3571             self.failUnlessEqual(len(params), 4)
3572             k, n, segsize, datalen = params
3573             self.failUnlessEqual(k, 3)
3574             self.failUnlessEqual(n, 10)
3575             self.failUnlessEqual(segsize, 0)
3576             self.failUnlessEqual(datalen, 0)
3577         d.addCallback(_check_encoding_parameters)
3578
3579         # It does not make sense to get a block in this format, so we
3580         # should not be able to.
3581         d.addCallback(lambda ignored:
3582             self.shouldFail(LayoutInvalid, "get block on an empty file",
3583                             None,
3584                             mr.get_block_and_salt, 0))
3585         return d
3586
3587     def test_verinfo_with_sdmf_file(self):
3588         self.init("test_verinfo_with_sdmf_file")
3589
3590         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3591         # We should be able to get the version information.
3592         d = defer.succeed(None)
3593         d.addCallback(lambda ign: self.write_sdmf_share_to_server("si1"))
3594         d.addCallback(lambda ignored:
3595             mr.get_verinfo())
3596         def _check_verinfo(verinfo):
3597             self.failUnless(verinfo)
3598             self.failUnlessEqual(len(verinfo), 9)
3599             (seqnum,
3600              root_hash,
3601              salt,
3602              segsize,
3603              datalen,
3604              k,
3605              n,
3606              prefix,
3607              offsets) = verinfo
3608             self.failUnlessEqual(seqnum, 0)
3609             self.failUnlessEqual(root_hash, self.root_hash)
3610             self.failUnlessEqual(salt, self.salt)
3611             self.failUnlessEqual(segsize, 36)
3612             self.failUnlessEqual(datalen, 36)
3613             self.failUnlessEqual(k, 3)
3614             self.failUnlessEqual(n, 10)
3615             expected_prefix = struct.pack(">BQ32s16s BBQQ",
3616                                           0,
3617                                           seqnum,
3618                                           root_hash,
3619                                           salt,
3620                                           k,
3621                                           n,
3622                                           segsize,
3623                                           datalen)
3624             self.failUnlessEqual(prefix, expected_prefix)
3625             self.failUnlessEqual(offsets, self.offsets)
3626         d.addCallback(_check_verinfo)
3627         return d
3628
3629     def test_verinfo_with_mdmf_file(self):
3630         self.init("test_verinfo_with_mdmf_file")
3631
3632         mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3633         d = defer.succeed(None)
3634         d.addCallback(lambda ign: self.write_test_share_to_server("si1"))
3635         d.addCallback(lambda ignored:
3636             mr.get_verinfo())
3637         def _check_verinfo(verinfo):
3638             self.failUnless(verinfo)
3639             self.failUnlessEqual(len(verinfo), 9)
3640             (seqnum,
3641              root_hash,
3642              IV,
3643              segsize,
3644              datalen,
3645              k,
3646              n,
3647              prefix,
3648              offsets) = verinfo
3649             self.failUnlessEqual(seqnum, 0)
3650             self.failUnlessEqual(root_hash, self.root_hash)
3651             self.failIf(IV, IV)
3652             self.failUnlessEqual(segsize, 6)
3653             self.failUnlessEqual(datalen, 36)
3654             self.failUnlessEqual(k, 3)
3655             self.failUnlessEqual(n, 10)
3656             expected_prefix = struct.pack(">BQ32s BBQQ",
3657                                           1,
3658                                           seqnum,
3659                                           root_hash,
3660                                           k,
3661                                           n,
3662                                           segsize,
3663                                           datalen)
3664             self.failUnlessEqual(prefix, expected_prefix)
3665             self.failUnlessEqual(offsets, self.offsets)
3666         d.addCallback(_check_verinfo)
3667         return d
3668
3669     def test_sdmf_writer(self):
3670         self.init("test_sdmf_writer")
3671
3672         # Go through the motions of writing an SDMF share to the storage
3673         # server. Then read the storage server to see that the share got
3674         # written in the way that we think it should have.
3675
3676         # We do this first so that the necessary instance variables get
3677         # set the way we want them for the tests below.
3678         data = self.build_test_sdmf_share()
3679         sdmfr = SDMFSlotWriteProxy(0,
3680                                    self.rref,
3681                                    "si1",
3682                                    self.secrets,
3683                                    0, 3, 10, 36, 36)
3684         # Put the block and salt.
3685         sdmfr.put_block(self.blockdata, 0, self.salt)
3686
3687         # Put the encprivkey
3688         sdmfr.put_encprivkey(self.encprivkey)
3689
3690         # Put the block and share hash chains
3691         sdmfr.put_blockhashes(self.block_hash_tree)
3692         sdmfr.put_sharehashes(self.share_hash_chain)
3693         sdmfr.put_root_hash(self.root_hash)
3694
3695         # Put the signature
3696         sdmfr.put_signature(self.signature)
3697
3698         # Put the verification key
3699         sdmfr.put_verification_key(self.verification_key)
3700
3701         # Now check to make sure that nothing has been written yet.
3702         self.failUnlessEqual(self.rref.write_count, 0)
3703
3704         # Now finish publishing
3705         d = sdmfr.finish_publishing()
3706         d.addCallback(lambda ign: self.failUnlessEqual(self.rref.write_count, 1))
3707         d.addCallback(lambda ign: self.aa.remote_slot_readv("si1", [0], [(0, len(data))]))
3708         d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
3709         return d
3710
3711     def test_sdmf_writer_preexisting_share(self):
3712         self.init("test_sdmf_writer_preexisting_share")
3713
3714         data = self.build_test_sdmf_share()
3715         d = defer.succeed(None)
3716         d.addCallback(lambda ign: self.write_sdmf_share_to_server("si1"))
3717         def _written(ign):
3718             # Now there is a share on the storage server. To successfully
3719             # write, we need to set the checkstring correctly. When we
3720             # don't, no write should occur.
3721             sdmfw = SDMFSlotWriteProxy(0,
3722                                        self.rref,
3723                                        "si1",
3724                                        self.secrets,
3725                                        1, 3, 10, 36, 36)
3726             sdmfw.put_block(self.blockdata, 0, self.salt)
3727
3728             # Put the encprivkey
3729             sdmfw.put_encprivkey(self.encprivkey)
3730
3731             # Put the block and share hash chains
3732             sdmfw.put_blockhashes(self.block_hash_tree)
3733             sdmfw.put_sharehashes(self.share_hash_chain)
3734
3735             # Put the root hash
3736             sdmfw.put_root_hash(self.root_hash)
3737
3738             # Put the signature
3739             sdmfw.put_signature(self.signature)
3740
3741             # Put the verification key
3742             sdmfw.put_verification_key(self.verification_key)
3743
3744             # We shouldn't have a checkstring yet
3745             self.failUnlessEqual(sdmfw.get_checkstring(), "")
3746
3747             d2 = sdmfw.finish_publishing()
3748             def _then(results):
3749                 self.failIf(results[0])
3750                 # this is the correct checkstring
3751                 self._expected_checkstring = results[1][0][0]
3752                 return self._expected_checkstring
3753             d2.addCallback(_then)
3754             d2.addCallback(sdmfw.set_checkstring)
3755             d2.addCallback(lambda ign: sdmfw.get_checkstring())
3756             d2.addCallback(lambda checkstring: self.failUnlessEqual(checkstring,
3757                                                                     self._expected_checkstring))
3758             d2.addCallback(lambda ign: sdmfw.finish_publishing())
3759             d2.addCallback(lambda res: self.failUnless(res[0], res))
3760             d2.addCallback(lambda ign: self.aa.remote_slot_readv("si1", [0], [(1, 8)]))
3761             d2.addCallback(lambda res: self.failUnlessEqual(res, {0: [struct.pack(">Q", 1)]}))
3762             d2.addCallback(lambda ign: self.aa.remote_slot_readv("si1", [0], [(9, len(data) - 9)]))
3763             d2.addCallback(lambda res: self.failUnlessEqual(res, {0: [data[9:]]}))
3764             return d2
3765         d.addCallback(_written)
3766         return d
3767
3768
3769 class Stats(WithDiskBackend, unittest.TestCase):
3770     def test_latencies(self):
3771         server = self.create("test_latencies")
3772         for i in range(10000):
3773             server.add_latency("allocate", 1.0 * i)
3774         for i in range(1000):
3775             server.add_latency("renew", 1.0 * i)
3776         for i in range(20):
3777             server.add_latency("write", 1.0 * i)
3778         for i in range(10):
3779             server.add_latency("cancel", 2.0 * i)
3780         server.add_latency("get", 5.0)
3781
3782         output = server.get_latencies()
3783
3784         self.failUnlessEqual(sorted(output.keys()),
3785                              sorted(["allocate", "renew", "cancel", "write", "get"]))
3786         self.failUnlessEqual(len(server.latencies["allocate"]), 1000)
3787         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
3788         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
3789         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
3790         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
3791         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
3792         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
3793         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
3794         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
3795
3796         self.failUnlessEqual(len(server.latencies["renew"]), 1000)
3797         self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
3798         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1, output)
3799         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
3800         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
3801         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
3802         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
3803         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
3804         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
3805
3806         self.failUnlessEqual(len(server.latencies["write"]), 20)
3807         self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
3808         self.failUnless(output["write"]["01_0_percentile"] is None, output)
3809         self.failUnless(abs(output["write"]["10_0_percentile"] -  2) < 1, output)
3810         self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
3811         self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
3812         self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
3813         self.failUnless(output["write"]["99_0_percentile"] is None, output)
3814         self.failUnless(output["write"]["99_9_percentile"] is None, output)
3815
3816         self.failUnlessEqual(len(server.latencies["cancel"]), 10)
3817         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
3818         self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
3819         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1, output)
3820         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
3821         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
3822         self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
3823         self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
3824         self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
3825
3826         self.failUnlessEqual(len(server.latencies["get"]), 1)
3827         self.failUnless(output["get"]["mean"] is None, output)
3828         self.failUnless(output["get"]["01_0_percentile"] is None, output)
3829         self.failUnless(output["get"]["10_0_percentile"] is None, output)
3830         self.failUnless(output["get"]["50_0_percentile"] is None, output)
3831         self.failUnless(output["get"]["90_0_percentile"] is None, output)
3832         self.failUnless(output["get"]["95_0_percentile"] is None, output)
3833         self.failUnless(output["get"]["99_0_percentile"] is None, output)
3834         self.failUnless(output["get"]["99_9_percentile"] is None, output)
3835
3836
3837 def remove_tags(s):
3838     s = re.sub(r'<[^>]*>', ' ', s)
3839     s = re.sub(r'\s+', ' ', s)
3840     return s
3841
3842
3843 class BucketCounterTest(WithDiskBackend, CrawlerTestMixin, ReallyEqualMixin, unittest.TestCase):
3844     def test_bucket_counter(self):
3845         server = self.create("test_bucket_counter", detached=True)
3846         bucket_counter = server.bucket_counter
3847
3848         # finish as fast as possible
3849         bucket_counter.slow_start = 0
3850         bucket_counter.cpu_slice = 100.0
3851
3852         d = server.bucket_counter.set_hook('after_prefix')
3853
3854         server.setServiceParent(self.sparent)
3855
3856         w = StorageStatus(server)
3857
3858         # this sample is before the crawler has started doing anything
3859         html = w.renderSynchronously()
3860         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3861         s = remove_tags(html)
3862         self.failUnlessIn("Accepting new shares: Yes", s)
3863         self.failUnlessIn("Reserved space: - 0 B (0)", s)
3864         self.failUnlessIn("Total sharesets: Not computed yet", s)
3865         self.failUnlessIn("Next crawl in", s)
3866
3867         def _after_first_prefix(prefix):
3868             server.bucket_counter.save_state()
3869             state = bucket_counter.get_state()
3870             self.failUnlessEqual(prefix, state["last-complete-prefix"])
3871             self.failUnlessEqual(prefix, bucket_counter.prefixes[0])
3872
3873             html = w.renderSynchronously()
3874             s = remove_tags(html)
3875             self.failUnlessIn(" Current crawl ", s)
3876             self.failUnlessIn(" (next work in ", s)
3877
3878             return bucket_counter.set_hook('after_cycle')
3879         d.addCallback(_after_first_prefix)
3880
3881         def _after_first_cycle(cycle):
3882             self.failUnlessEqual(cycle, 0)
3883             progress = bucket_counter.get_progress()
3884             self.failUnlessReallyEqual(progress["cycle-in-progress"], False)
3885         d.addCallback(_after_first_cycle)
3886         d.addBoth(self._wait_for_yield, bucket_counter)
3887
3888         def _after_yield(ign):
3889             html = w.renderSynchronously()
3890             s = remove_tags(html)
3891             self.failUnlessIn("Total sharesets: 0 (the number of", s)
3892             self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
3893         d.addCallback(_after_yield)
3894         return d
3895
3896     def test_bucket_counter_cleanup(self):
3897         server = self.create("test_bucket_counter_cleanup", detached=True)
3898         bucket_counter = server.bucket_counter
3899
3900         # finish as fast as possible
3901         bucket_counter.slow_start = 0
3902         bucket_counter.cpu_slice = 100.0
3903
3904         d = bucket_counter.set_hook('after_prefix')
3905
3906         server.setServiceParent(self.sparent)
3907
3908         def _after_first_prefix(prefix):
3909             bucket_counter.save_state()
3910             state = bucket_counter.state
3911             self.failUnlessEqual(prefix, state["last-complete-prefix"])
3912             self.failUnlessEqual(prefix, bucket_counter.prefixes[0])
3913
3914             # now sneak in and mess with its state, to make sure it cleans up
3915             # properly at the end of the cycle
3916             state["bucket-counts"][-12] = {}
3917             bucket_counter.save_state()
3918
3919             return bucket_counter.set_hook('after_cycle')
3920         d.addCallback(_after_first_prefix)
3921
3922         def _after_first_cycle(cycle):
3923             self.failUnlessEqual(cycle, 0)
3924             progress = bucket_counter.get_progress()
3925             self.failUnlessReallyEqual(progress["cycle-in-progress"], False)
3926
3927             s = bucket_counter.get_state()
3928             self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
3929         d.addCallback(_after_first_cycle)
3930         d.addBoth(self._wait_for_yield, bucket_counter)
3931         return d
3932
3933     def test_bucket_counter_eta(self):
3934         server = self.create("test_bucket_counter_eta", detached=True)
3935         bucket_counter = server.bucket_counter
3936
3937         # finish as fast as possible
3938         bucket_counter.slow_start = 0
3939         bucket_counter.cpu_slice = 100.0
3940
3941         d = bucket_counter.set_hook('after_prefix')
3942
3943         server.setServiceParent(self.sparent)
3944
3945         w = StorageStatus(server)
3946
3947         def _check_1(prefix1):
3948             # no ETA is available yet
3949             html = w.renderSynchronously()
3950             s = remove_tags(html)
3951             self.failUnlessIn("complete (next work", s)
3952
3953             return bucket_counter.set_hook('after_prefix')
3954         d.addCallback(_check_1)
3955
3956         def _check_2(prefix2):
3957             # an ETA based upon elapsed time should be available.
3958             html = w.renderSynchronously()
3959             s = remove_tags(html)
3960             self.failUnlessIn("complete (ETA ", s)
3961         d.addCallback(_check_2)
3962         d.addBoth(self._wait_for_yield, bucket_counter)
3963         return d
3964
3965
3966 class AccountingCrawlerTest(CrawlerTestMixin, WebRenderingMixin, ReallyEqualMixin):
3967     def make_shares(self, server):
3968         aa = server.get_accountant().get_anonymous_account()
3969         sa = server.get_accountant().get_starter_account()
3970
3971         def make(si):
3972             return (si, hashutil.tagged_hash("renew", si),
3973                     hashutil.tagged_hash("cancel", si))
3974         def make_mutable(si):
3975             return (si, hashutil.tagged_hash("renew", si),
3976                     hashutil.tagged_hash("cancel", si),
3977                     hashutil.tagged_hash("write-enabler", si))
3978         def make_extra_lease(si, num):
3979             return (hashutil.tagged_hash("renew-%d" % num, si),
3980                     hashutil.tagged_hash("cancel-%d" % num, si))
3981
3982         writev = aa.remote_slot_testv_and_readv_and_writev
3983
3984         immutable_si_0, rs0, cs0 = make("\x00" * 16)
3985         immutable_si_1, rs1, cs1 = make("\x01" * 16)
3986         rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
3987         mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
3988         mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
3989         rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
3990         sharenums = [0]
3991         canary = FakeCanary()
3992         # note: 'tahoe debug dump-share' will not handle this file, since the
3993         # inner contents are not a valid CHK share
3994         data = "\xff" * 1000
3995
3996         self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
3997         self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
3998         self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
3999
4000         d = defer.succeed(None)
4001         d.addCallback(lambda ign: aa.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
4002                                                              1000, canary))
4003         def _got_buckets( (a, w) ):
4004             w[0].remote_write(0, data)
4005             w[0].remote_close()
4006         d.addCallback(_got_buckets)
4007
4008         d.addCallback(lambda ign: aa.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
4009                                                              1000, canary))
4010         d.addCallback(_got_buckets)
4011         d.addCallback(lambda ign: sa.remote_add_lease(immutable_si_1, rs1a, cs1a))
4012
4013         d.addCallback(lambda ign: writev(mutable_si_2, (we2, rs2, cs2),
4014                                          {0: ([], [(0,data)], len(data))}, []))
4015         d.addCallback(lambda ign: writev(mutable_si_3, (we3, rs3, cs3),
4016                                          {0: ([], [(0,data)], len(data))}, []))
4017         d.addCallback(lambda ign: sa.remote_add_lease(mutable_si_3, rs3a, cs3a))
4018         return d
4019
4020     def test_basic(self):
4021         server = self.create("test_basic", detached=True)
4022
4023         ep = ExpirationPolicy(enabled=False)
4024         server.get_accountant().set_expiration_policy(ep)
4025         aa = server.get_accountant().get_anonymous_account()
4026         sa = server.get_accountant().get_starter_account()
4027
4028         # finish as fast as possible
4029         ac = server.get_accounting_crawler()
4030         ac.slow_start = 0
4031         ac.cpu_slice = 500
4032
4033         webstatus = StorageStatus(server)
4034
4035         # create a few shares, with some leases on them
4036         d = self.make_shares(server)
4037         def _do_test(ign):
4038             [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
4039
4040             if isinstance(server.backend, DiskBackend):
4041                 # add a non-sharefile to exercise another code path
4042                 fn = os.path.join(server.backend._sharedir,
4043                                   storage_index_to_dir(immutable_si_0),
4044                                   "not-a-share")
4045                 fileutil.write(fn, "I am not a share.\n")
4046
4047             # this is before the crawl has started, so we're not in a cycle yet
4048             initial_state = ac.get_state()
4049             self.failIf(ac.get_progress()["cycle-in-progress"])
4050             self.failIfIn("cycle-to-date", initial_state)
4051             self.failIfIn("estimated-remaining-cycle", initial_state)
4052             self.failIfIn("estimated-current-cycle", initial_state)
4053             self.failUnlessIn("history", initial_state)
4054             self.failUnlessEqual(initial_state["history"], {})
4055
4056             server.setServiceParent(self.sparent)
4057
4058             DAY = 24*60*60
4059
4060             # now examine the state right after the 'aa' prefix has been processed.
4061             d2 = self._after_prefix(None, 'aa', ac)
4062             def _after_aa_prefix(state):
4063                 self.failUnlessIn("cycle-to-date", state)
4064                 self.failUnlessIn("estimated-remaining-cycle", state)
4065                 self.failUnlessIn("estimated-current-cycle", state)
4066                 self.failUnlessIn("history", state)
4067                 self.failUnlessEqual(state["history"], {})
4068
4069                 so_far = state["cycle-to-date"]
4070                 self.failUnlessEqual(so_far["expiration-enabled"], False)
4071                 self.failUnlessIn("configured-expiration-mode", so_far)
4072                 self.failUnlessIn("lease-age-histogram", so_far)
4073                 lah = so_far["lease-age-histogram"]
4074                 self.failUnlessEqual(type(lah), list)
4075                 self.failUnlessEqual(len(lah), 1)
4076                 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
4077                 self.failUnlessEqual(so_far["corrupt-shares"], [])
4078                 sr1 = so_far["space-recovered"]
4079                 self.failUnlessEqual(sr1["examined-buckets"], 1)
4080                 self.failUnlessEqual(sr1["examined-shares"], 1)
4081                 self.failUnlessEqual(sr1["actual-shares"], 0)
4082                 left = state["estimated-remaining-cycle"]
4083                 sr2 = left["space-recovered"]
4084                 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
4085                 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
4086                 self.failIfEqual(sr2["actual-shares"], None)
4087             d2.addCallback(_after_aa_prefix)
4088
4089             d2.addCallback(lambda ign: self.render1(webstatus))
4090             def _check_html_in_cycle(html):
4091                 s = remove_tags(html)
4092                 self.failUnlessIn("So far, this cycle has examined "
4093                                   "1 shares in 1 sharesets (0 mutable / 1 immutable) ", s)
4094                 self.failUnlessIn("and has recovered: "
4095                                   "0 shares, 0 sharesets (0 mutable / 0 immutable), "
4096                                   "0 B (0 B / 0 B)", s)
4097
4098                 return ac.set_hook('after_cycle')
4099             d2.addCallback(_check_html_in_cycle)
4100
4101             def _after_first_cycle(cycle):
4102                 # After the first cycle, nothing should have been removed.
4103                 self.failUnlessEqual(cycle, 0)
4104                 progress = ac.get_progress()
4105                 self.failUnlessReallyEqual(progress["cycle-in-progress"], False)
4106
4107                 s = ac.get_state()
4108                 self.failIf("cycle-to-date" in s)
4109                 self.failIf("estimated-remaining-cycle" in s)
4110                 self.failIf("estimated-current-cycle" in s)
4111                 last = s["history"][0]
4112                 self.failUnlessEqual(type(last), dict, repr(last))
4113                 self.failUnlessIn("cycle-start-finish-times", last)
4114                 self.failUnlessEqual(type(last["cycle-start-finish-times"]), list, repr(last))
4115                 self.failUnlessEqual(last["expiration-enabled"], False)
4116                 self.failUnlessIn("configured-expiration-mode", last)
4117
4118                 self.failUnlessIn("lease-age-histogram", last)
4119                 lah = last["lease-age-histogram"]
4120                 self.failUnlessEqual(type(lah), list)
4121                 self.failUnlessEqual(len(lah), 1)
4122                 self.failUnlessEqual(tuple(lah[0]), (0.0, DAY, 6) )
4123
4124                 self.failUnlessEqual(last["corrupt-shares"], [])
4125
4126                 rec = last["space-recovered"]
4127                 self.failUnlessEqual(rec["examined-buckets"], 4)
4128                 self.failUnlessEqual(rec["examined-shares"], 4)
4129                 self.failUnlessEqual(rec["actual-buckets"], 0)
4130                 self.failUnlessEqual(rec["actual-shares"], 0)
4131                 self.failUnlessEqual(rec["actual-diskbytes"], 0)
4132
4133                 def count_leases(si):
4134                     return (len(aa.get_leases(si)), len(sa.get_leases(si)))
4135                 self.failUnlessEqual(count_leases(immutable_si_0), (1, 0))
4136                 self.failUnlessEqual(count_leases(immutable_si_1), (1, 1))
4137                 self.failUnlessEqual(count_leases(mutable_si_2), (1, 0))
4138                 self.failUnlessEqual(count_leases(mutable_si_3), (1, 1))
4139             d2.addCallback(_after_first_cycle)
4140
4141             d2.addCallback(lambda ign: self.render1(webstatus))
4142             def _check_html_after_cycle(html):
4143                 s = remove_tags(html)
4144                 self.failUnlessIn("recovered: 0 shares, 0 sharesets "
4145                                   "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
4146                 self.failUnlessIn("and saw a total of 4 shares, 4 sharesets "
4147                                   "(2 mutable / 2 immutable),", s)
4148                 self.failUnlessIn("but expiration was not enabled", s)
4149             d2.addCallback(_check_html_after_cycle)
4150
4151             d2.addCallback(lambda ign: self.render_json(webstatus))
4152             def _check_json_after_cycle(json):
4153                 data = simplejson.loads(json)
4154                 self.failUnlessIn("lease-checker", data)
4155                 self.failUnlessIn("lease-checker-progress", data)
4156             d2.addCallback(_check_json_after_cycle)
4157             d2.addBoth(self._wait_for_yield, ac)
4158             return d2
4159         d.addCallback(_do_test)
4160         return d
4161
4162     def _assert_sharecount(self, server, si, expected):
4163         d = defer.succeed(None)
4164         d.addCallback(lambda ign: server.backend.get_shareset(si).get_shares())
4165         def _got_shares( (shares, corrupted) ):
4166             self.failUnlessEqual(len(shares), expected, "share count for %r" % (si,))
4167             self.failUnlessEqual(len(corrupted), 0, str(corrupted))
4168         d.addCallback(_got_shares)
4169         return d
4170
4171     def _assert_leasecount(self, server, si, expected):
4172         aa = server.get_accountant().get_anonymous_account()
4173         sa = server.get_accountant().get_starter_account()
4174         self.failUnlessEqual((len(aa.get_leases(si)), len(sa.get_leases(si))),
4175                              expected)
4176
4177     def test_expire_age(self):
4178         server = self.create("test_expire_age", detached=True)
4179
4180         # setting expiration_time to 2000 means that any lease which is more
4181         # than 2000s old will be expired.
4182         now = time.time()
4183         ep = ExpirationPolicy(enabled=True, mode="age", override_lease_duration=2000)
4184         server.get_accountant().set_expiration_policy(ep)
4185         aa = server.get_accountant().get_anonymous_account()
4186         sa = server.get_accountant().get_starter_account()
4187
4188         # finish as fast as possible
4189         ac = server.get_accounting_crawler()
4190         ac.slow_start = 0
4191         ac.cpu_slice = 500
4192
4193         webstatus = StorageStatus(server)
4194
4195         # create a few shares, with some leases on them
4196         d = self.make_shares(server)
4197         def _do_test(ign):
4198             [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
4199
4200             d2 = defer.succeed(None)
4201             d2.addCallback(lambda ign: self._assert_sharecount(server, immutable_si_0, 1))
4202             d2.addCallback(lambda ign: self._assert_leasecount(server, immutable_si_0, (1, 0)))
4203             d2.addCallback(lambda ign: self._assert_sharecount(server, immutable_si_1, 1))
4204             d2.addCallback(lambda ign: self._assert_leasecount(server, immutable_si_1, (1, 1)))
4205             d2.addCallback(lambda ign: self._assert_sharecount(server, mutable_si_2,   1))
4206             d2.addCallback(lambda ign: self._assert_leasecount(server, mutable_si_2,   (1, 0)))
4207             d2.addCallback(lambda ign: self._assert_sharecount(server, mutable_si_3,   1))
4208             d2.addCallback(lambda ign: self._assert_leasecount(server, mutable_si_3,   (1, 1)))
4209
4210             def _then(ign):
4211                 # artificially crank back the renewal time on the first lease of each
4212                 # share to 3000s ago, and set the expiration time to 31 days later.
4213                 new_renewal_time = now - 3000
4214                 new_expiration_time = new_renewal_time + 31*24*60*60
4215
4216                 # Some shares have an extra lease which is set to expire at the
4217                 # default time in 31 days from now (age=31days). We then run the
4218                 # crawler, which will expire the first lease, making some shares get
4219                 # deleted and others stay alive (with one remaining lease)
4220
4221                 aa.add_or_renew_lease(immutable_si_0, 0, new_renewal_time, new_expiration_time)
4222
4223                 # immutable_si_1 gets an extra lease
4224                 sa.add_or_renew_lease(immutable_si_1, 0, new_renewal_time, new_expiration_time)
4225
4226                 aa.add_or_renew_lease(mutable_si_2,   0, new_renewal_time, new_expiration_time)
4227
4228                 # mutable_si_3 gets an extra lease
4229                 sa.add_or_renew_lease(mutable_si_3,   0, new_renewal_time, new_expiration_time)
4230
4231                 server.setServiceParent(self.sparent)
4232
4233                 # now examine the web status right after the 'aa' prefix has been processed.
4234                 d3 = self._after_prefix(None, 'aa', ac)
4235                 d3.addCallback(lambda ign: self.render1(webstatus))
4236                 def _check_html_in_cycle(html):
4237                     s = remove_tags(html)
4238                     # the first shareset encountered gets deleted, and its prefix
4239                     # happens to be about 1/5th of the way through the ring, so the
4240                     # predictor thinks we'll have 5 shares and that we'll delete them
4241                     # all. This part of the test depends upon the SIs landing right
4242                     # where they do now.
4243                     self.failUnlessIn("The remainder of this cycle is expected to "
4244                                       "recover: 4 shares, 4 sharesets", s)
4245                     self.failUnlessIn("The whole cycle is expected to examine "
4246                                       "5 shares in 5 sharesets and to recover: "
4247                                       "5 shares, 5 sharesets", s)
4248
4249                     return ac.set_hook('after_cycle')
4250                 d3.addCallback(_check_html_in_cycle)
4251
4252                 d3.addCallback(lambda ign: self._assert_sharecount(server, immutable_si_0, 0))
4253                 d3.addCallback(lambda ign: self._assert_sharecount(server, immutable_si_1, 1))
4254                 d3.addCallback(lambda ign: self._assert_leasecount(server, immutable_si_1, (1, 0)))
4255                 d3.addCallback(lambda ign: self._assert_sharecount(server, mutable_si_2,   0))
4256                 d3.addCallback(lambda ign: self._assert_sharecount(server, mutable_si_3,   1))
4257                 d3.addCallback(lambda ign: self._assert_leasecount(server, mutable_si_3,   (1, 0)))
4258
4259                 def _after_first_cycle(ignored):
4260                     s = ac.get_state()
4261                     last = s["history"][0]
4262
4263                     self.failUnlessEqual(last["expiration-enabled"], True)
4264                     cem = last["configured-expiration-mode"]
4265                     self.failUnlessEqual(cem[0], "age")
4266                     self.failUnlessEqual(cem[1], 2000)
4267                     self.failUnlessEqual(cem[2], None)
4268                     self.failUnlessEqual(cem[3][0], "mutable")
4269                     self.failUnlessEqual(cem[3][1], "immutable")
4270
4271                     rec = last["space-recovered"]
4272                     self.failUnlessEqual(rec["examined-buckets"], 4)
4273                     self.failUnlessEqual(rec["examined-shares"], 4)
4274                     self.failUnlessEqual(rec["actual-buckets"], 2)
4275                     self.failUnlessEqual(rec["actual-shares"], 2)
4276                     # different platforms have different notions of "blocks used by
4277                     # this file", so merely assert that it's a number
4278                     self.failUnless(rec["actual-diskbytes"] >= 0,
4279                                     rec["actual-diskbytes"])
4280                 d3.addCallback(_after_first_cycle)
4281
4282                 d3.addCallback(lambda ign: self.render1(webstatus))
4283                 def _check_html_after_cycle(html):
4284                     s = remove_tags(html)
4285                     self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
4286                     self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
4287                     self.failUnlessIn(" recovered: 2 shares, 2 sharesets (1 mutable / 1 immutable), ", s)
4288                 d3.addCallback(_check_html_after_cycle)
4289                 d3.addBoth(self._wait_for_yield, ac)
4290                 return d3
4291             d2.addCallback(_then)
4292             return d2
4293         d.addCallback(_do_test)
4294         return d
4295
4296     def test_expire_cutoff_date(self):
4297         server = self.create("test_expire_cutoff_date", detached=True)
4298
4299         # setting cutoff-date to 2000 seconds ago means that any lease which
4300         # is more than 2000s old will be expired.
4301         now = time.time()
4302         then = int(now - 2000)
4303         ep = ExpirationPolicy(enabled=True, mode="cutoff-date", cutoff_date=then)
4304         server.get_accountant().set_expiration_policy(ep)
4305         aa = server.get_accountant().get_anonymous_account()
4306         sa = server.get_accountant().get_starter_account()
4307
4308         # finish as fast as possible
4309         ac = server.get_accounting_crawler()
4310         ac.slow_start = 0
4311         ac.cpu_slice = 500
4312
4313         webstatus = StorageStatus(server)
4314
4315         # create a few shares, with some leases on them
4316         d = self.make_shares(server)
4317         def _do_test(ign):
4318             [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
4319
4320             d2 = defer.succeed(None)
4321             d2.addCallback(lambda ign: self._assert_sharecount(server, immutable_si_0, 1))
4322             d2.addCallback(lambda ign: self._assert_leasecount(server, immutable_si_0, (1, 0)))
4323             d2.addCallback(lambda ign: self._assert_sharecount(server, immutable_si_1, 1))
4324             d2.addCallback(lambda ign: self._assert_leasecount(server, immutable_si_1, (1, 1)))
4325             d2.addCallback(lambda ign: self._assert_sharecount(server, mutable_si_2,   1))
4326             d2.addCallback(lambda ign: self._assert_leasecount(server, mutable_si_2,   (1, 0)))
4327             d2.addCallback(lambda ign: self._assert_sharecount(server, mutable_si_3,   1))
4328             d2.addCallback(lambda ign: self._assert_leasecount(server, mutable_si_3,   (1, 1)))
4329
4330             def _then(ign):
4331                 # artificially crank back the renewal time on the first lease of each
4332                 # share to 3000s ago, and set the expiration time to 31 days later.
4333                 new_renewal_time = now - 3000
4334                 new_expiration_time = new_renewal_time + 31*24*60*60
4335
4336                 # Some shares have an extra lease which is set to expire at the
4337                 # default time in 31 days from now (age=31days). We then run the
4338                 # crawler, which will expire the first lease, making some shares get
4339                 # deleted and others stay alive (with one remaining lease)
4340
4341                 aa.add_or_renew_lease(immutable_si_0, 0, new_renewal_time, new_expiration_time)
4342
4343                 # immutable_si_1 gets an extra lease
4344                 sa.add_or_renew_lease(immutable_si_1, 0, new_renewal_time, new_expiration_time)
4345
4346                 aa.add_or_renew_lease(mutable_si_2,   0, new_renewal_time, new_expiration_time)
4347
4348                 # mutable_si_3 gets an extra lease
4349                 sa.add_or_renew_lease(mutable_si_3,   0, new_renewal_time, new_expiration_time)
4350
4351                 server.setServiceParent(self.sparent)
4352
4353                 # now examine the web status right after the 'aa' prefix has been processed.
4354                 d3 = self._after_prefix(None, 'aa', ac)
4355                 d3.addCallback(lambda ign: self.render1(webstatus))
4356                 def _check_html_in_cycle(html):
4357                     s = remove_tags(html)
4358                     # the first bucket encountered gets deleted, and its prefix
4359                     # happens to be about 1/5th of the way through the ring, so the
4360                     # predictor thinks we'll have 5 shares and that we'll delete them
4361                     # all. This part of the test depends upon the SIs landing right
4362                     # where they do now.
4363                     self.failUnlessIn("The remainder of this cycle is expected to "
4364                                       "recover: 4 shares, 4 sharesets", s)
4365                     self.failUnlessIn("The whole cycle is expected to examine "
4366                                       "5 shares in 5 sharesets and to recover: "
4367                                       "5 shares, 5 sharesets", s)
4368
4369                     return ac.set_hook('after_cycle')
4370                 d3.addCallback(_check_html_in_cycle)
4371
4372                 d3.addCallback(lambda ign: self._assert_sharecount(server, immutable_si_0, 0))
4373                 d3.addCallback(lambda ign: self._assert_sharecount(server, immutable_si_1, 1))
4374                 d3.addCallback(lambda ign: self._assert_leasecount(server, immutable_si_1, (1, 0)))
4375                 d3.addCallback(lambda ign: self._assert_sharecount(server, mutable_si_2,   0))
4376                 d3.addCallback(lambda ign: self._assert_sharecount(server, mutable_si_3,   1))
4377                 d3.addCallback(lambda ign: self._assert_leasecount(server, mutable_si_3,   (1, 0)))
4378
4379                 def _after_first_cycle(ignored):
4380                     s = ac.get_state()
4381                     last = s["history"][0]
4382
4383                     self.failUnlessEqual(last["expiration-enabled"], True)
4384                     cem = last["configured-expiration-mode"]
4385                     self.failUnlessEqual(cem[0], "cutoff-date")
4386                     self.failUnlessEqual(cem[1], None)
4387                     self.failUnlessEqual(cem[2], then)
4388                     self.failUnlessEqual(cem[3][0], "mutable")
4389                     self.failUnlessEqual(cem[3][1], "immutable")
4390
4391                     rec = last["space-recovered"]
4392                     self.failUnlessEqual(rec["examined-buckets"], 4)
4393                     self.failUnlessEqual(rec["examined-shares"], 4)
4394                     self.failUnlessEqual(rec["actual-buckets"], 2)
4395                     self.failUnlessEqual(rec["actual-shares"], 2)
4396                     # different platforms have different notions of "blocks used by
4397                     # this file", so merely assert that it's a number
4398                     self.failUnless(rec["actual-diskbytes"] >= 0,
4399                                     rec["actual-diskbytes"])
4400                 d3.addCallback(_after_first_cycle)
4401
4402                 d3.addCallback(lambda ign: self.render1(webstatus))
4403                 def _check_html_after_cycle(html):
4404                     s = remove_tags(html)
4405                     self.failUnlessIn("Expiration Enabled:"
4406                                       " expired leases will be removed", s)
4407                     date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
4408                     substr = "Leases created or last renewed before %s will be considered expired." % date
4409                     self.failUnlessIn(substr, s)
4410                     self.failUnlessIn(" recovered: 2 shares, 2 sharesets (1 mutable / 1 immutable), ", s)
4411                 d3.addCallback(_check_html_after_cycle)
4412                 d3.addBoth(self._wait_for_yield, ac)
4413                 return d3
4414             d2.addCallback(_then)
4415             return d2
4416         d.addCallback(_do_test)
4417         return d
4418
4419     def test_bad_mode(self):
4420         e = self.failUnlessRaises(AssertionError,
4421                                   ExpirationPolicy, enabled=True, mode="bogus")
4422         self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
4423
4424     def test_parse_duration(self):
4425         DAY = 24*60*60
4426         MONTH = 31*DAY
4427         YEAR = 365*DAY
4428         p = time_format.parse_duration
4429         self.failUnlessEqual(p("7days"), 7*DAY)
4430         self.failUnlessEqual(p("31day"), 31*DAY)
4431         self.failUnlessEqual(p("60 days"), 60*DAY)
4432         self.failUnlessEqual(p("2mo"), 2*MONTH)
4433         self.failUnlessEqual(p("3 month"), 3*MONTH)
4434         self.failUnlessEqual(p("2years"), 2*YEAR)
4435         e = self.failUnlessRaises(ValueError, p, "2kumquats")
4436         self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
4437
4438     def test_parse_date(self):
4439         p = time_format.parse_date
4440         self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
4441         self.failUnlessEqual(p("2009-03-18"), 1237334400)
4442
4443     def test_limited_history(self):
4444         server = self.create("test_limited_history", detached=True)
4445
4446         # finish as fast as possible
4447         RETAINED = 2
4448         CYCLES = 4
4449         ac = server.get_accounting_crawler()
4450         ac._leasedb.retained_history_entries = RETAINED
4451         ac.slow_start = 0
4452         ac.cpu_slice = 500
4453         ac.allowed_cpu_proportion = 1.0
4454         ac.minimum_cycle_time = 0
4455
4456         # create a few shares, with some leases on them
4457         d = self.make_shares(server)
4458         def _do_test(ign):
4459             server.setServiceParent(self.sparent)
4460
4461             d2 = ac.set_hook('after_cycle')
4462             def _after_cycle(cycle):
4463                 if cycle < CYCLES:
4464                     return ac.set_hook('after_cycle').addCallback(_after_cycle)
4465
4466                 state = ac.get_state()
4467                 self.failUnlessIn("history", state)
4468                 h = state["history"]
4469                 self.failUnlessEqual(len(h), RETAINED)
4470                 self.failUnlessEqual(max(h.keys()), CYCLES)
4471                 self.failUnlessEqual(min(h.keys()), CYCLES-RETAINED+1)
4472             d2.addCallback(_after_cycle)
4473             d2.addBoth(self._wait_for_yield, ac)
4474         d.addCallback(_do_test)
4475         return d
4476
4477     def render_json(self, page):
4478         d = self.render1(page, args={"t": ["json"]})
4479         return d
4480
4481
4482 class AccountingCrawlerWithDiskBackend(WithDiskBackend, AccountingCrawlerTest, unittest.TestCase):
4483     pass
4484
4485
4486 #class AccountingCrawlerWithMockCloudBackend(WithMockCloudBackend, AccountingCrawlerTest, unittest.TestCase):
4487 #    pass
4488
4489
4490 class WebStatusWithDiskBackend(WithDiskBackend, WebRenderingMixin, unittest.TestCase):
4491     def test_no_server(self):
4492         w = StorageStatus(None)
4493         html = w.renderSynchronously()
4494         self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
4495
4496     def test_status(self):
4497         server = self.create("test_status")
4498
4499         w = StorageStatus(server, "nickname")
4500         d = self.render1(w)
4501         def _check_html(html):
4502             self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4503             s = remove_tags(html)
4504             self.failUnlessIn("Server Nickname: nickname", s)
4505             self.failUnlessIn("Server Nodeid: %s"  % base32.b2a(server.get_serverid()), s)
4506             self.failUnlessIn("Accepting new shares: Yes", s)
4507             self.failUnlessIn("Reserved space: - 0 B (0)", s)
4508         d.addCallback(_check_html)
4509         d.addCallback(lambda ign: self.render_json(w))
4510         def _check_json(json):
4511             data = simplejson.loads(json)
4512             s = data["stats"]
4513             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
4514             self.failUnlessEqual(s["storage_server.reserved_space"], 0)
4515             self.failUnlessIn("bucket-counter", data)
4516             self.failUnlessIn("lease-checker", data)
4517         d.addCallback(_check_json)
4518         return d
4519
4520     @mock.patch('allmydata.util.fileutil.get_disk_stats')
4521     def test_status_no_disk_stats(self, mock_get_disk_stats):
4522         mock_get_disk_stats.side_effect = AttributeError()
4523
4524         # Some platforms may have no disk stats API. Make sure the code can handle that
4525         # (test runs on all platforms).
4526         server = self.create("test_status_no_disk_stats")
4527
4528         w = StorageStatus(server)
4529         html = w.renderSynchronously()
4530         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4531         s = remove_tags(html)
4532         self.failUnlessIn("Accepting new shares: Yes", s)
4533         self.failUnlessIn("Total disk space: ?", s)
4534         self.failUnlessIn("Space Available to Tahoe: ?", s)
4535         self.failUnless(server.get_available_space() is None)
4536
4537     @mock.patch('allmydata.util.fileutil.get_disk_stats')
4538     def test_status_bad_disk_stats(self, mock_get_disk_stats):
4539         mock_get_disk_stats.side_effect = OSError()
4540
4541         # If the API to get disk stats exists but a call to it fails, then the status should
4542         # show that no shares will be accepted, and get_available_space() should be 0.
4543         server = self.create("test_status_bad_disk_stats")
4544
4545         w = StorageStatus(server)
4546         html = w.renderSynchronously()
4547         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4548         s = remove_tags(html)
4549         self.failUnlessIn("Accepting new shares: No", s)
4550         self.failUnlessIn("Total disk space: ?", s)
4551         self.failUnlessIn("Space Available to Tahoe: ?", s)
4552         self.failUnlessEqual(server.get_available_space(), 0)
4553
4554     @mock.patch('allmydata.util.fileutil.get_disk_stats')
4555     def test_status_right_disk_stats(self, mock_get_disk_stats):
4556         GB = 1000000000
4557         total            = 5*GB
4558         free_for_root    = 4*GB
4559         free_for_nonroot = 3*GB
4560         reserved_space   = 1*GB
4561         used = total - free_for_root
4562         avail = max(free_for_nonroot - reserved_space, 0)
4563         mock_get_disk_stats.return_value = {
4564             'total': total,
4565             'free_for_root': free_for_root,
4566             'free_for_nonroot': free_for_nonroot,
4567             'used': used,
4568             'avail': avail,
4569         }
4570
4571         server = self.create("test_status_right_disk_stats", reserved_space=GB)
4572         expecteddir = server.backend._sharedir
4573
4574         w = StorageStatus(server)
4575         html = w.renderSynchronously()
4576
4577         self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
4578                     (mock_get_disk_stats.call_args_list, expecteddir, reserved_space))
4579
4580         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4581         s = remove_tags(html)
4582         self.failUnlessIn("Total disk space: 5.00 GB", s)
4583         self.failUnlessIn("Disk space used: - 1.00 GB", s)
4584         self.failUnlessIn("Disk space free (root): 4.00 GB", s)
4585         self.failUnlessIn("Disk space free (non-root): 3.00 GB", s)
4586         self.failUnlessIn("Reserved space: - 1.00 GB", s)
4587         self.failUnlessIn("Space Available to Tahoe: 2.00 GB", s)
4588         self.failUnlessEqual(server.get_available_space(), 2*GB)
4589
4590     def test_readonly(self):
4591         server = self.create("test_readonly", readonly=True)
4592
4593         w = StorageStatus(server)
4594         html = w.renderSynchronously()
4595         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4596         s = remove_tags(html)
4597         self.failUnlessIn("Accepting new shares: No", s)
4598
4599     def test_reserved(self):
4600         server = self.create("test_reserved", reserved_space=10e6)
4601
4602         w = StorageStatus(server)
4603         html = w.renderSynchronously()
4604         self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4605         s = remove_tags(html)
4606         self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4607
4608     def test_util(self):
4609         w = StorageStatus(None)
4610         self.failUnlessEqual(w.render_space(None, None), "?")
4611         self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
4612         self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
4613         self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
4614         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
4615         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)
4616
4617
4618 class WebStatusWithMockCloudBackend(WithMockCloudBackend, WebRenderingMixin, unittest.TestCase):
4619     def test_status(self):
4620         server = self.create("test_status")
4621
4622         w = StorageStatus(server, "nickname")
4623         d = self.render1(w)
4624         def _check_html(html):
4625             self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4626             s = remove_tags(html)
4627             self.failUnlessIn("Server Nickname: nickname", s)
4628             self.failUnlessIn("Server Nodeid: %s"  % base32.b2a(server.get_serverid()), s)
4629             self.failUnlessIn("Accepting new shares: Yes", s)
4630         d.addCallback(_check_html)
4631         d.addCallback(lambda ign: self.render_json(w))
4632         def _check_json(json):
4633             data = simplejson.loads(json)
4634             s = data["stats"]
4635             self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
4636             self.failUnlessIn("bucket-counter", data)
4637             self.failUnlessIn("lease-checker", data)
4638         d.addCallback(_check_json)
4639         return d