2 import time, os.path, platform, re, simplejson, struct, itertools, urllib
3 from collections import deque
4 from cStringIO import StringIO
8 from twisted.trial import unittest
10 from twisted.internet import defer
11 from twisted.internet.task import Clock
12 from allmydata.util.deferredutil import for_items
13 from twisted.web.iweb import IBodyProducer, UNKNOWN_LENGTH
14 from twisted.web.http_headers import Headers
15 from twisted.protocols.ftp import FileConsumer
16 from twisted.web.client import ResponseDone
18 from twisted.python.failure import Failure
19 from foolscap.logging.log import OPERATIONAL, INFREQUENT, WEIRD
20 from foolscap.logging.web import LogEvent
22 from allmydata import interfaces
23 from allmydata.util.assertutil import precondition
24 from allmydata.util import fileutil, hashutil, base32, time_format
25 from allmydata.storage.server import StorageServer
26 from allmydata.storage.backends.null.null_backend import NullBackend
27 from allmydata.storage.backends.disk.disk_backend import DiskBackend
28 from allmydata.storage.backends.disk.immutable import load_immutable_disk_share, \
29 create_immutable_disk_share, ImmutableDiskShare
30 from allmydata.storage.backends.disk.mutable import create_mutable_disk_share, MutableDiskShare
31 from allmydata.storage.backends.cloud.cloud_backend import CloudBackend
32 from allmydata.storage.backends.cloud.cloud_common import CloudError, CloudServiceError, \
33 ContainerItem, ContainerListing
34 from allmydata.storage.backends.cloud import mock_cloud, cloud_common
35 from allmydata.storage.backends.cloud.mock_cloud import MockContainer
36 from allmydata.storage.backends.cloud.openstack import openstack_container
37 from allmydata.storage.backends.cloud.googlestorage import googlestorage_container
38 from allmydata.storage.bucket import BucketWriter, BucketReader
39 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir
40 from allmydata.storage.leasedb import SHARETYPE_IMMUTABLE, SHARETYPE_MUTABLE
41 from allmydata.storage.expiration import ExpirationPolicy
42 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
44 from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
45 LayoutInvalid, MDMFSIGNABLEHEADER, \
46 SIGNED_PREFIX, MDMFHEADER, \
47 MDMFOFFSETS, SDMFSlotWriteProxy, \
50 VERIFICATION_KEY_SIZE, \
52 from allmydata.interfaces import BadWriteEnablerError, RIStorageServer
53 from allmydata.test.common import LoggingServiceParent, ShouldFailMixin, CrawlerTestMixin, \
55 from allmydata.test.common_util import ReallyEqualMixin
56 from allmydata.test.common_web import WebRenderingMixin
57 from allmydata.test.no_network import NoNetworkServer
58 from allmydata.web.storage import StorageStatus, remove_prefix
62 def __init__(self, server):
64 def add_share(self, storage_index, shnum, used_space, sharetype, commit=True):
66 def add_or_renew_default_lease(self, storage_index, shnum, commit=True):
68 def mark_share_as_stable(self, storage_index, shnum, used_space, commit=True):
71 class FakeStatsProvider:
72 def count(self, name, delta=1):
74 def register_producer(self, producer):
78 class ServiceParentMixin:
80 self.sparent = LoggingServiceParent()
81 self.sparent.startService()
82 self._lease_secret = itertools.count()
85 return self.sparent.stopService()
89 def workdir(self, name):
90 return os.path.join("storage", self.__class__.__name__, name)
93 class BucketTestMixin(WorkdirMixin):
94 def make_workdir(self, name):
95 basedir = self.workdir(name)
96 tmpdir = os.path.join(basedir, "tmp")
97 incoming = os.path.join(tmpdir, "bucket")
98 final = os.path.join(basedir, "bucket")
99 fileutil.make_dirs(tmpdir)
100 return incoming, final
102 def bucket_writer_closed(self, bw, consumed):
105 def add_latency(self, category, latency):
108 def count(self, name, delta=1):
112 class Bucket(BucketTestMixin, unittest.TestCase):
113 def test_create(self):
114 incoming, final = self.make_workdir("test_create")
115 account = FakeAccount(self)
116 d = defer.succeed(None)
117 d.addCallback(lambda ign: create_immutable_disk_share(incoming, final, allocated_data_length=200,
118 storage_index="si1", shnum=0))
119 def _got_share(share):
120 bw = BucketWriter(account, share, FakeCanary())
121 d2 = defer.succeed(None)
122 d2.addCallback(lambda ign: bw.remote_write(0, "a"*25))
123 d2.addCallback(lambda ign: bw.remote_write(25, "b"*25))
124 d2.addCallback(lambda ign: bw.remote_write(50, "c"*25))
125 d2.addCallback(lambda ign: bw.remote_write(75, "d"*7))
126 d2.addCallback(lambda ign: bw.remote_close())
128 d.addCallback(_got_share)
131 def test_readwrite(self):
132 incoming, final = self.make_workdir("test_readwrite")
133 account = FakeAccount(self)
134 d = defer.succeed(None)
135 d.addCallback(lambda ign: create_immutable_disk_share(incoming, final, allocated_data_length=200,
136 storage_index="si1", shnum=0))
137 def _got_share(share):
138 bw = BucketWriter(account, share, FakeCanary())
139 d2 = defer.succeed(None)
140 d2.addCallback(lambda ign: bw.remote_write(0, "a"*25))
141 d2.addCallback(lambda ign: bw.remote_write(25, "b"*25))
142 d2.addCallback(lambda ign: bw.remote_write(50, "c"*7)) # last block may be short
143 d2.addCallback(lambda ign: bw.remote_close())
147 br = BucketReader(account, share)
148 d3 = defer.succeed(None)
149 d3.addCallback(lambda ign: br.remote_read(0, 25))
150 d3.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
151 d3.addCallback(lambda ign: br.remote_read(25, 25))
152 d3.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
153 d3.addCallback(lambda ign: br.remote_read(50, 7))
154 d3.addCallback(lambda res: self.failUnlessEqual(res, "c"*7))
156 d2.addCallback(_read)
158 d.addCallback(_got_share)
161 def test_read_past_end_of_share_data(self):
162 # test vector for immutable files (hard-coded contents of an immutable share
165 containerdata = struct.pack('>LLL', 1, 1, 1)
167 # A Tahoe-LAFS storage client would send as the share_data a
168 # complicated string involving hash trees and a URI Extension Block
169 # -- see allmydata/immutable/layout.py . This test, which is
170 # simulating a client, just sends 'a'.
172 extra_data = 'b' * ImmutableDiskShare.LEASE_SIZE
173 share_file_data = containerdata + share_data + extra_data
175 incoming, final = self.make_workdir("test_read_past_end_of_share_data")
177 fileutil.write(final, share_file_data)
178 d = defer.succeed(None)
179 d.addCallback(lambda ign: load_immutable_disk_share(final))
180 def _got_share(share):
181 mockstorageserver = mock.Mock()
182 account = FakeAccount(mockstorageserver)
185 br = BucketReader(account, share)
187 d2 = br.remote_read(0, len(share_data))
188 d2.addCallback(lambda res: self.failUnlessEqual(res, share_data))
190 # Read past the end of share data to get the cancel secret.
191 read_length = len(share_data) + len(extra_data)
192 d2.addCallback(lambda ign: br.remote_read(0, read_length))
193 d2.addCallback(lambda res: self.failUnlessEqual(res, share_data))
195 # Read past the end of share data by 1 byte.
196 d2.addCallback(lambda ign: br.remote_read(0, len(share_data)+1))
197 d2.addCallback(lambda res: self.failUnlessEqual(res, share_data))
199 d.addCallback(_got_share)
208 def callRemote(self, methname, *args, **kwargs):
210 meth = getattr(self.target, "remote_" + methname)
211 return meth(*args, **kwargs)
213 if methname == "slot_readv":
215 if "writev" in methname:
216 self.write_count += 1
218 return defer.maybeDeferred(_call)
221 class BucketProxy(BucketTestMixin, unittest.TestCase):
222 def make_bucket(self, name, size):
223 incoming, final = self.make_workdir(name)
224 account = FakeAccount(self)
226 d = defer.succeed(None)
227 d.addCallback(lambda ign: create_immutable_disk_share(incoming, final, size,
228 storage_index="si1", shnum=0))
229 def _got_share(share):
230 bw = BucketWriter(account, share, FakeCanary())
234 d.addCallback(_got_share)
237 def test_create(self):
238 d = self.make_bucket("test_create", 500)
239 def _made_bucket( (bw, rb, sharefile) ):
240 bp = WriteBucketProxy(rb, None,
245 uri_extension_size_max=500)
246 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp), bp)
247 d.addCallback(_made_bucket)
250 def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
251 # Let's pretend each share has 100 bytes of data, and that there are
252 # 4 segments (25 bytes each), and 8 shares total. So the two
253 # per-segment merkle trees (crypttext_hash_tree,
254 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
255 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
256 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
257 # long. That should make the whole share:
259 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
260 # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
262 sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
264 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
266 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
268 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
270 uri_extension = "s" + "E"*498 + "e"
272 d = self.make_bucket(name, sharesize)
273 def _made_bucket( (bw, rb, sharefile) ):
274 bp = wbp_class(rb, None,
279 uri_extension_size_max=len(uri_extension))
282 d2.addCallback(lambda ign: bp.put_block(0, "a"*25))
283 d2.addCallback(lambda ign: bp.put_block(1, "b"*25))
284 d2.addCallback(lambda ign: bp.put_block(2, "c"*25))
285 d2.addCallback(lambda ign: bp.put_block(3, "d"*20))
286 d2.addCallback(lambda ign: bp.put_crypttext_hashes(crypttext_hashes))
287 d2.addCallback(lambda ign: bp.put_block_hashes(block_hashes))
288 d2.addCallback(lambda ign: bp.put_share_hashes(share_hashes))
289 d2.addCallback(lambda ign: bp.put_uri_extension(uri_extension))
290 d2.addCallback(lambda ign: bp.close())
292 d2.addCallback(lambda ign: load_immutable_disk_share(sharefile))
294 d.addCallback(_made_bucket)
296 # now read everything back
297 def _start_reading(share):
298 br = BucketReader(FakeAccount(self), share)
301 server = NoNetworkServer("abc", None)
302 rbp = rbp_class(rb, server, storage_index="")
303 self.failUnlessIn("to peer", repr(rbp))
304 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
306 d2 = defer.succeed(None)
307 d2.addCallback(lambda ign: rbp.get_block_data(0, 25, 25))
308 d2.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
309 d2.addCallback(lambda ign: rbp.get_block_data(1, 25, 25))
310 d2.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
311 d2.addCallback(lambda ign: rbp.get_block_data(2, 25, 25))
312 d2.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
313 d2.addCallback(lambda ign: rbp.get_block_data(3, 25, 20))
314 d2.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
316 d2.addCallback(lambda ign: rbp.get_crypttext_hashes())
317 d2.addCallback(lambda res: self.failUnlessEqual(res, crypttext_hashes))
318 d2.addCallback(lambda ign: rbp.get_block_hashes(set(range(4))))
319 d2.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
320 d2.addCallback(lambda ign: rbp.get_share_hashes())
321 d2.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
322 d2.addCallback(lambda ign: rbp.get_uri_extension())
323 d2.addCallback(lambda res: self.failUnlessEqual(res, uri_extension))
325 d.addCallback(_start_reading)
328 def test_readwrite_v1(self):
329 return self._do_test_readwrite("test_readwrite_v1",
330 0x24, WriteBucketProxy, ReadBucketProxy)
332 def test_readwrite_v2(self):
333 return self._do_test_readwrite("test_readwrite_v2",
334 0x44, WriteBucketProxy_v2, ReadBucketProxy)
337 class Seek(unittest.TestCase, WorkdirMixin):
339 basedir = self.workdir("test_seek")
340 fileutil.make_dirs(basedir)
341 filename = os.path.join(basedir, "testfile")
342 fileutil.write(filename, "start")
344 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
345 # files. mode="a" preserves previous contents but does not allow
346 # seeking-to-create-holes. mode="r+" allows both.
347 f = open(filename, "rb+")
354 filelen = os.stat(filename).st_size
355 self.failUnlessEqual(filelen, 100+3)
356 f2 = open(filename, "rb")
358 self.failUnlessEqual(f2.read(5), "start")
363 class CloudCommon(unittest.TestCase, ShouldFailMixin, WorkdirMixin):
364 def test_concat(self):
365 x = deque([[1, 2], (), xrange(3, 6)])
366 self.failUnlessEqual(cloud_common.concat(x), [1, 2, 3, 4, 5])
368 def test_list_objects_truncated_badly(self):
369 # If a container misbehaves by not producing listings with increasing keys,
370 # that should cause an incident.
371 basedir = self.workdir("test_list_objects_truncated_badly")
372 fileutil.make_dirs(basedir)
374 class BadlyTruncatingMockContainer(MockContainer):
375 def _list_some_objects(self, container_name, prefix='', marker=None):
376 contents = [ContainerItem("", None, "", 0, None, None)]
377 return defer.succeed(ContainerListing(container_name, "", "", 0, "true", contents))
380 def call_log_msg(*args, **kwargs):
381 s["level"] = max(s["level"], kwargs["level"])
382 self.patch(cloud_common.log, 'msg', call_log_msg)
384 container = BadlyTruncatingMockContainer(basedir)
385 d = self.shouldFail(AssertionError,
386 'truncated badly', "Not making progress in list_objects",
387 lambda: container.list_objects(prefix=""))
388 d.addCallback(lambda ign: self.failUnless(s["level"] >= WEIRD, s["level"]))
391 def test_cloud_share_base(self):
392 basedir = self.workdir("test_cloud_share_base")
393 fileutil.make_dirs(basedir)
395 container = MockContainer(basedir)
396 base = cloud_common.CloudShareBase(container, "si1", 1)
397 base._data_length = 42
398 base._total_size = 100
400 self.failUnlessIn("CloudShareBase", repr(base))
401 self.failUnlessEqual(base.get_storage_index(), "si1")
402 self.failUnlessEqual(base.get_storage_index_string(), "onutc")
403 self.failUnlessEqual(base.get_shnum(), 1)
404 self.failUnlessEqual(base.get_data_length(), 42)
405 self.failUnlessEqual(base.get_size(), 100)
406 self.failUnlessEqual(os.path.normpath(base._get_path()),
407 os.path.normpath(os.path.join(basedir, "shares", "on", "onutc", "1")))
409 # TODO: test cloud_common.delete_chunks
412 class OpenStackCloudBackend(ServiceParentMixin, WorkdirMixin, ShouldFailMixin, unittest.TestCase):
413 PROVIDER = "rackspace.com"
414 AUTH_SERVICE_URL = "auth_service_url"
415 USERNAME = "username"
416 CONTAINER = "container"
418 PUBLIC_STORAGE_URL = "https://public.storage.example/a"
419 INTERNAL_STORAGE_URL = "https://internal.storage.example/a"
420 AUTH_TOKEN = "auth_token"
422 TEST_SHARE_PREFIX = "shares/te/"
423 TEST_SHARE_NAME = TEST_SHARE_PREFIX + "test"
424 TEST_SHARE_MODIFIED = "2013-02-14T21:30:00Z"
425 TEST_SHARE_DATA = "share"
426 TEST_SHARE_HASH = "sharehash"
427 TEST_LISTING_JSON = ('[{"name": "%s", "bytes": %d, "last_modified": "%s", "hash": "%s"}]'
428 % (TEST_SHARE_NAME, len(TEST_SHARE_DATA), TEST_SHARE_MODIFIED, TEST_SHARE_HASH))
430 def _patch_agent(self):
433 class MockResponse(object):
434 def __init__(mock_self, response_code, response_phrase, response_headers, response_body):
435 mock_self.code = response_code
436 mock_self.phrase = response_phrase
437 mock_self.headers = Headers(response_headers)
438 mock_self._body = response_body
440 def deliverBody(mock_self, protocol):
441 protocol.dataReceived(mock_self._body)
442 protocol.connectionLost(Failure(ResponseDone()))
444 class MockAgent(object):
445 def __init__(mock_self, reactor):
448 def request(mock_self, method, url, headers, bodyProducer=None):
449 self.failUnlessIn((method, url), self._requests)
450 (expected_headers, expected_body,
451 response_code, response_phrase, response_headers, response_body) = self._requests[(method, url)]
453 self.failUnlessIsInstance(headers, Headers)
454 for (key, values) in expected_headers.iteritems():
455 self.failUnlessEqual(headers.getRawHeaders(key), values, str((headers, key)))
457 d = defer.succeed(None)
458 if bodyProducer is None:
459 self.failUnlessEqual(expected_body, "")
461 self.failUnless(IBodyProducer.providedBy(bodyProducer))
463 d = bodyProducer.startProducing(FileConsumer(body))
464 d.addCallback(lambda ign: self.failUnlessEqual(body.getvalue(), expected_body))
465 d.addCallback(lambda ign: self.failUnlessIn(bodyProducer.length,
466 (len(expected_body), UNKNOWN_LENGTH)))
467 d.addCallback(lambda ign: MockResponse(response_code, response_phrase, response_headers, response_body))
470 self.patch(openstack_container, 'Agent', MockAgent)
472 def _set_request(self, method, url, expected_headers, expected_body,
473 response_code, response_phrase, response_headers, response_body):
474 precondition(isinstance(expected_headers, dict), expected_headers)
475 precondition(isinstance(response_headers, dict), response_headers)
476 self._requests[(method, url)] = (expected_headers, expected_body,
477 response_code, response_phrase, response_headers, response_body)
479 def _make_server(self, name):
480 # This is for the v1 auth protocol.
481 #self._set_request('GET', self.AUTH_SERVICE_URL, {
482 # 'X-Auth-User': [self.USERNAME],
483 # 'X-Auth-Key': [self.API_KEY],
485 # 204, "No Content", {
486 # 'X-Storage-Url': [self.STORAGE_URL],
487 # 'X-Auth-Token': [self.AUTH_TOKEN],
490 self._set_request('POST', self.AUTH_SERVICE_URL, {
491 'Content-Type': ['application/json'],
492 }, '{"auth": {"RAX-KSKEY:apiKeyCredentials": {"username": "username", "apiKey": "api_key"}}}',
495 {"access": {"token": {"id": "%s"},
496 "serviceCatalog": [{"endpoints": [{"region": "FOO", "publicURL": "%s", "internalURL": "%s"}],
497 "type": "object-store"}],
498 "user": {"RAX-AUTH:defaultRegion": "", "name": "%s"}
500 }''' % (self.AUTH_TOKEN, self.PUBLIC_STORAGE_URL, self.INTERNAL_STORAGE_URL, self.USERNAME))
503 'openstack.provider': self.PROVIDER,
504 'openstack.url': self.AUTH_SERVICE_URL,
505 'openstack.username': self.USERNAME,
506 'openstack.container': self.CONTAINER,
508 from allmydata.node import _None
509 class MockConfig(object):
510 def get_config(mock_self, section, option, default=_None, boolean=False):
511 self.failUnlessEqual(section, "storage")
513 self.failUnlessIn(option, storage_config)
514 return storage_config.get(option, default)
515 def get_private_config(mock_self, filename):
516 return fileutil.read(os.path.join(privatedir, filename))
518 self.workdir = self.workdir(name)
519 privatedir = os.path.join(self.workdir, "private")
520 fileutil.make_dirs(privatedir)
521 fileutil.write(os.path.join(privatedir, "openstack_api_key"), self.API_KEY)
523 self.config = MockConfig()
525 self.container = openstack_container.configure_openstack_container(self.workdir, self.config)
526 backend = CloudBackend(self.container)
527 self.server = StorageServer("\x00" * 20, backend, self.workdir,
528 stats_provider=FakeStatsProvider(), clock=self.clock)
529 self.server.setServiceParent(self.sparent)
530 self.failUnless(self.server.backend._container is self.container,
531 (self.server.backend._container, self.container))
533 def _shutdown(self, res):
534 # avoid unclean reactor error
535 self.container._auth_client.shutdown()
539 def test_authentication_client(self):
541 self._make_server("test_authentication_client")
543 d = self.container._auth_client.get_auth_info()
544 def _check(auth_info):
545 self.failUnlessEqual(auth_info.public_storage_url, self.PUBLIC_STORAGE_URL)
546 self.failUnlessEqual(auth_info.internal_storage_url, self.INTERNAL_STORAGE_URL)
547 self.failUnlessEqual(auth_info.auth_token, self.AUTH_TOKEN)
548 d.addCallback(_check)
549 d.addBoth(self._shutdown)
552 def test_openstack_container(self):
555 # Set up the requests that we expect to receive.
556 self._set_request('GET', "/".join((self.PUBLIC_STORAGE_URL, self.CONTAINER, "unexpected")), {
557 'X-Auth-Token': [self.AUTH_TOKEN],
559 404, "Not Found", {}, "")
561 self._set_request('PUT', "/".join((self.PUBLIC_STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), {
562 'X-Auth-Token': [self.AUTH_TOKEN],
563 'Content-Type': ['application/octet-stream'],
564 #'Content-Length': [len(self.TEST_SHARE_DATA)],
565 }, self.TEST_SHARE_DATA,
566 204, "No Content", {}, "")
568 quoted_prefix = urllib.quote(self.TEST_SHARE_PREFIX, safe='')
569 self._set_request('GET', "%s/%s?format=json&prefix=%s"
570 % (self.PUBLIC_STORAGE_URL, self.CONTAINER, quoted_prefix), {
571 'X-Auth-Token': [self.AUTH_TOKEN],
573 200, "OK", {}, self.TEST_LISTING_JSON)
575 self._set_request('GET', "/".join((self.PUBLIC_STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), {
576 'X-Auth-Token': [self.AUTH_TOKEN],
578 200, "OK", {}, self.TEST_SHARE_DATA)
580 self._make_server("test_openstack_container")
582 d = defer.succeed(None)
583 d.addCallback(lambda ign: self.shouldFail(CloudError, "404", None,
584 self.container.get_object, "unexpected"))
586 d.addCallback(lambda ign: self.container.put_object(self.TEST_SHARE_NAME, self.TEST_SHARE_DATA))
587 d.addCallback(lambda res: self.failUnless(res is None, res))
589 d.addCallback(lambda ign: self.container.list_objects(prefix=self.TEST_SHARE_PREFIX))
590 def _check_listing(listing):
591 self.failUnlessEqual(listing.name, self.CONTAINER)
592 self.failUnlessEqual(listing.prefix, self.TEST_SHARE_PREFIX)
593 self.failUnlessEqual(listing.is_truncated, "false")
594 self.failUnlessEqual(len(listing.contents), 1)
595 item = listing.contents[0]
596 self.failUnlessEqual(item.key, self.TEST_SHARE_NAME)
597 self.failUnlessEqual(item.modification_date, self.TEST_SHARE_MODIFIED)
598 self.failUnlessEqual(item.etag, self.TEST_SHARE_HASH)
599 self.failUnlessEqual(item.size, len(self.TEST_SHARE_DATA))
600 d.addCallback(_check_listing)
602 d.addCallback(lambda ign: self.container.get_object(self.TEST_SHARE_NAME))
603 d.addCallback(lambda res: self.failUnlessEqual(res, self.TEST_SHARE_DATA))
605 def _set_up_delete(ign):
606 self._set_request('DELETE', "/".join((self.PUBLIC_STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), {
607 'X-Auth-Token': [self.AUTH_TOKEN],
609 204, "No Content", {}, "")
611 # this changes the response to the request set up above
612 self._set_request('GET', "%s/%s?format=json&prefix=%s"
613 % (self.PUBLIC_STORAGE_URL, self.CONTAINER, quoted_prefix), {
614 'X-Auth-Token': [self.AUTH_TOKEN],
617 d.addCallback(_set_up_delete)
619 d.addCallback(lambda ign: self.container.delete_object(self.TEST_SHARE_NAME))
620 d.addCallback(lambda res: self.failUnless(res is None, res))
622 d.addCallback(lambda ign: self.container.list_objects(prefix=self.TEST_SHARE_PREFIX))
623 def _check_listing_after_delete(listing):
624 self.failUnlessEqual(listing.name, self.CONTAINER)
625 self.failUnlessEqual(listing.prefix, self.TEST_SHARE_PREFIX)
626 self.failUnlessEqual(listing.is_truncated, "false")
627 self.failUnlessEqual(len(listing.contents), 0)
628 d.addCallback(_check_listing_after_delete)
630 d.addBoth(self._shutdown)
635 class GoogleStorageBackend(ShouldFailMixin, unittest.TestCase):
637 Tests for the Google Storage API backend.
639 All code references in docstrings/comments are to classes/functions in
640 allmydata.storage.backends.cloud.googlestorage.googlestorage_container
641 unless noted otherwise.
644 def test_authentication_credentials(self):
646 AuthenticationClient.get_authorization_header() initializes a
647 SignedJwtAssertionCredentials with the correct parameters.
649 # Somewhat fragile tests, but better than nothing.
650 auth = googlestorage_container.AuthenticationClient("u@example.com", "xxx123")
651 self.assertEqual(auth._credentials.service_account_name, "u@example.com")
652 self.assertEqual(auth._credentials.private_key, "xxx123".encode("base64").strip())
654 def test_authentication_initial(self):
656 When AuthenticationClient() is created, it refreshes its access token.
658 from oauth2client.client import SignedJwtAssertionCredentials
659 auth = googlestorage_container.AuthenticationClient(
660 "u@example.com", "xxx123",
661 _credentialsClass=mock.create_autospec(SignedJwtAssertionCredentials),
662 _deferToThread=defer.maybeDeferred)
663 self.assertEqual(auth._credentials.refresh.call_count, 1)
665 def test_authentication_expired(self):
667 AuthenticationClient.get_authorization_header() refreshes its
668 credentials if the access token has expired.
670 from oauth2client.client import SignedJwtAssertionCredentials
671 auth = googlestorage_container.AuthenticationClient(
672 "u@example.com", "xxx123",
673 _credentialsClass=mock.create_autospec(SignedJwtAssertionCredentials),
674 _deferToThread=defer.maybeDeferred)
675 auth._credentials.apply = lambda d: d.__setitem__('Authorization', 'xxx')
676 auth._credentials.access_token_expired = True
677 auth.get_authorization_header()
678 self.assertEqual(auth._credentials.refresh.call_count, 2)
680 def test_authentication_no_refresh(self):
682 AuthenticationClient.get_authorization_header() does not refresh its
683 credentials if the access token has not expired.
685 from oauth2client.client import SignedJwtAssertionCredentials
686 auth = googlestorage_container.AuthenticationClient(
687 "u@example.com", "xxx123",
688 _credentialsClass=mock.create_autospec(SignedJwtAssertionCredentials),
689 _deferToThread=defer.maybeDeferred)
690 auth._credentials.apply = lambda d: d.__setitem__('Authorization', 'xxx')
691 auth._credentials.access_token_expired = False
692 auth.get_authorization_header()
693 self.assertEqual(auth._credentials.refresh.call_count, 1)
695 def test_authentication_header(self):
697 AuthenticationClient.get_authorization_header() returns a value to be
698 used for the Authorization header.
700 from oauth2client.client import SignedJwtAssertionCredentials
701 class NoNetworkCreds(SignedJwtAssertionCredentials):
702 def refresh(self, http):
703 self.access_token = "xxx"
704 auth = googlestorage_container.AuthenticationClient(
705 "u@example.com", "xxx123",
706 _credentialsClass=NoNetworkCreds,
707 _deferToThread=defer.maybeDeferred)
709 auth.get_authorization_header().addCallback(result.append)
710 self.assertEqual(result, ["Bearer xxx"])
712 def test_authentication_one_refresh(self):
714 AuthenticationClient._refresh_if_necessary() only runs one refresh
717 # The second call shouldn't happen until the first Deferred fires!
718 results = [defer.Deferred(), defer.succeed(None)]
721 def fakeDeferToThread(f, *args):
722 return results.pop(0)
724 from oauth2client.client import SignedJwtAssertionCredentials
725 auth = googlestorage_container.AuthenticationClient(
726 "u@example.com", "xxx123",
727 _credentialsClass=mock.create_autospec(SignedJwtAssertionCredentials),
728 _deferToThread=fakeDeferToThread)
729 # Initial authorization call happens...
730 self.assertEqual(len(results), 1)
731 # ... and still isn't finished, so next one doesn't run yet:
732 auth._refresh_if_necessary(force=True)
733 self.assertEqual(len(results), 1)
734 # When first one finishes, second one can run:
736 self.assertEqual(len(results), 0)
738 def test_authentication_refresh_call(self):
740 AuthenticationClient._refresh_if_necessary() runs the
741 authentication refresh in a thread, since it blocks, with a
742 httplib2.Http instance.
744 from httplib2 import Http
745 from oauth2client.client import SignedJwtAssertionCredentials
746 class NoNetworkCreds(SignedJwtAssertionCredentials):
747 def refresh(cred_self, http):
748 cred_self.access_token = "xxx"
749 self.assertIsInstance(http, Http)
750 self.thread = thread.get_ident()
751 auth = googlestorage_container.AuthenticationClient(
752 "u@example.com", "xxx123",
753 _credentialsClass=NoNetworkCreds)
755 def gotResult(ignore):
756 self.assertNotEqual(thread.get_ident(), self.thread)
757 return auth.get_authorization_header().addCallback(gotResult)
761 def allocate(self, account, storage_index, sharenums, size, canary=None):
762 # These secrets are not used, but clients still provide them.
763 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
764 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
766 canary = FakeCanary()
767 return defer.maybeDeferred(account.remote_allocate_buckets,
768 storage_index, renew_secret, cancel_secret,
769 sharenums, size, canary)
771 def _write_and_close(self, ign, i, bw):
772 d = defer.succeed(None)
773 d.addCallback(lambda ign: bw.remote_write(0, "%25d" % i))
774 d.addCallback(lambda ign: bw.remote_close())
777 def _close_writer(self, ign, i, bw):
778 return bw.remote_close()
780 def _abort_writer(self, ign, i, bw):
781 return bw.remote_abort()
784 class ServerTest(ServerMixin, ShouldFailMixin):
785 def test_create(self):
786 server = self.create("test_create")
787 aa = server.get_accountant().get_anonymous_account()
788 self.failUnless(RIStorageServer.providedBy(aa), aa)
790 def test_declares_fixed_1528(self):
791 server = self.create("test_declares_fixed_1528")
792 aa = server.get_accountant().get_anonymous_account()
794 ver = aa.remote_get_version()
795 sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
796 self.failUnless(sv1.get('prevents-read-past-end-of-share-data'), sv1)
798 def test_has_immutable_readv(self):
799 server = self.create("test_has_immutable_readv")
800 aa = server.get_accountant().get_anonymous_account()
802 ver = aa.remote_get_version()
803 sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
804 self.failUnless(sv1.get('has-immutable-readv'), sv1)
806 # TODO: test that we actually support it
808 def test_declares_maximum_share_sizes(self):
809 server = self.create("test_declares_maximum_share_sizes")
810 aa = server.get_accountant().get_anonymous_account()
812 ver = aa.remote_get_version()
813 sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
814 self.failUnlessIn('maximum-immutable-share-size', sv1)
815 self.failUnlessIn('maximum-mutable-share-size', sv1)
817 def test_declares_available_space(self):
818 ss = self.create("test_declares_available_space")
819 ver = ss.remote_get_version()
820 sv1 = ver['http://allmydata.org/tahoe/protocols/storage/v1']
821 self.failUnlessIn('available-space', sv1)
823 def test_create_share(self):
824 server = self.create("test_create_share")
825 backend = server.backend
826 aa = server.get_accountant().get_anonymous_account()
828 d = self.allocate(aa, "si1", [0], 75)
829 def _allocated( (already, writers) ):
830 self.failUnlessEqual(already, set())
831 self.failUnlessEqual(set(writers.keys()), set([0]))
833 d2 = defer.succeed(None)
834 d2.addCallback(lambda ign: writers[0].remote_write(0, "data"))
835 d2.addCallback(lambda ign: writers[0].remote_close())
837 d2.addCallback(lambda ign: backend.get_shareset("si1").get_share(0))
838 d2.addCallback(lambda share: self.failUnless(interfaces.IShareForReading.providedBy(share)))
840 d2.addCallback(lambda ign: backend.get_shareset("si1").get_shares())
841 def _check( (shares, corrupted) ):
842 self.failUnlessEqual(len(shares), 1, str(shares))
843 self.failUnlessEqual(len(corrupted), 0, str(corrupted))
844 d2.addCallback(_check)
846 d.addCallback(_allocated)
849 def test_dont_overfill_dirs(self):
851 This test asserts that if you add a second share whose storage index
852 share lots of leading bits with an extant share (but isn't the exact
853 same storage index), this won't add an entry to the share directory.
855 server = self.create("test_dont_overfill_dirs")
856 aa = server.get_accountant().get_anonymous_account()
857 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
860 def _write_and_get_children( (already, writers) ):
861 d = for_items(self._write_and_close, writers)
862 d.addCallback(lambda ign: sorted(fileutil.listdir(storedir)))
865 d = self.allocate(aa, "storageindex", [0], 25)
866 d.addCallback(_write_and_get_children)
868 def _got_children(children_of_storedir):
869 # Now store another one under another storageindex that has leading
870 # chars the same as the first storageindex.
871 d2 = self.allocate(aa, "storageindey", [0], 25)
872 d2.addCallback(_write_and_get_children)
873 d2.addCallback(lambda res: self.failUnlessEqual(res, children_of_storedir))
875 d.addCallback(_got_children)
878 def OFF_test_allocate(self):
879 server = self.create("test_allocate")
880 aa = server.get_accountant().get_anonymous_account()
882 self.failUnlessEqual(aa.remote_get_buckets("allocate"), {})
884 already,writers = self.allocate(aa, "allocate", [0,1,2], 75)
885 self.failUnlessEqual(already, set())
886 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
888 # while the buckets are open, they should not count as readable
889 self.failUnlessEqual(aa.remote_get_buckets("allocate"), {})
892 for i,wb in writers.items():
893 wb.remote_write(0, "%25d" % i)
895 # aborting a bucket that was already closed is a no-op
898 # now they should be readable
899 b = aa.remote_get_buckets("allocate")
900 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
901 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
903 self.failUnlessIn("BucketReader", b_str)
904 self.failUnlessIn("mfwgy33dmf2g 0", b_str)
906 # now if we ask about writing again, the server should offer those
907 # three buckets as already present. It should offer them even if we
908 # don't ask about those specific ones.
909 already,writers = self.allocate(aa, "allocate", [2,3,4], 75)
910 self.failUnlessEqual(already, set([0,1,2]))
911 self.failUnlessEqual(set(writers.keys()), set([3,4]))
913 # while those two buckets are open for writing, the server should
914 # refuse to offer them to uploaders
916 already2,writers2 = self.allocate(aa, "allocate", [2,3,4,5], 75)
917 self.failUnlessEqual(already2, set([0,1,2]))
918 self.failUnlessEqual(set(writers2.keys()), set([5]))
920 # aborting the writes should remove the tempfiles
921 for i,wb in writers2.items():
923 already2,writers2 = self.allocate(aa, "allocate", [2,3,4,5], 75)
924 self.failUnlessEqual(already2, set([0,1,2]))
925 self.failUnlessEqual(set(writers2.keys()), set([5]))
927 for i,wb in writers2.items():
929 for i,wb in writers.items():
932 # The following share file content was generated with
933 # storage.immutable.ShareFile from Tahoe-LAFS v1.8.2
934 # with share data == 'a'. The total size of this input
936 shareversionnumber = '\x00\x00\x00\x01'
937 sharedatalength = '\x00\x00\x00\x01'
938 numberofleases = '\x00\x00\x00\x01'
940 ownernumber = '\x00\x00\x00\x00'
942 cancelsecret = 'y'*32
943 expirationtime = '\x00(\xde\x80'
945 containerdata = shareversionnumber + sharedatalength + numberofleases
946 client_data = (shareinputdata + ownernumber + renewsecret +
947 cancelsecret + expirationtime + nextlease)
948 share_data = containerdata + client_data
949 testnodeid = 'testnodeidxxxxxxxxxx'
951 def test_write_and_read_share(self):
953 Write a new share, read it, and test the server and backends'
954 handling of simultaneous and successive attempts to write the same
957 server = self.create("test_write_and_read_share")
958 aa = server.get_accountant().get_anonymous_account()
959 canary = FakeCanary()
961 shareset = server.backend.get_shareset('teststorage_index')
962 self.failIf(shareset.has_incoming(0))
964 # Populate incoming with the sharenum: 0.
965 d = aa.remote_allocate_buckets('teststorage_index', 'x'*32, 'y'*32, frozenset((0,)), 1, canary)
966 def _allocated( (already, writers) ):
967 # This is a white-box test: Inspect incoming and fail unless the sharenum: 0 is listed there.
968 self.failUnless(shareset.has_incoming(0))
970 # Attempt to create a second share writer with the same sharenum.
971 d2 = aa.remote_allocate_buckets('teststorage_index', 'x'*32, 'y'*32, frozenset((0,)), 1, canary)
973 # Show that no sharewriter results from a remote_allocate_buckets
974 # with the same si and sharenum, until BucketWriter.remote_close()
976 d2.addCallback(lambda (already2, writers2): self.failIf(writers2))
978 # Test allocated size.
979 d2.addCallback(lambda ign: server.allocated_size())
980 d2.addCallback(lambda space: self.failUnlessEqual(space, 1))
982 # Write 'a' to shnum 0. Only tested together with close and read.
983 d2.addCallback(lambda ign: writers[0].remote_write(0, 'a'))
985 # Preclose: Inspect final, failUnless nothing there.
986 d2.addCallback(lambda ign: server.backend.get_shareset('teststorage_index').get_shares())
987 def _check( (shares, corrupted) ):
988 self.failUnlessEqual(len(shares), 0, str(shares))
989 self.failUnlessEqual(len(corrupted), 0, str(corrupted))
990 d2.addCallback(_check)
992 d2.addCallback(lambda ign: writers[0].remote_close())
994 # Postclose: fail unless written data is in final.
995 d2.addCallback(lambda ign: server.backend.get_shareset('teststorage_index').get_shares())
996 def _got_shares( (sharesinfinal, corrupted) ):
997 self.failUnlessEqual(len(sharesinfinal), 1, str(sharesinfinal))
998 self.failUnlessEqual(len(corrupted), 0, str(corrupted))
1000 d3 = defer.succeed(None)
1001 d3.addCallback(lambda ign: sharesinfinal[0].read_share_data(0, 73))
1002 d3.addCallback(lambda contents: self.failUnlessEqual(contents, self.shareinputdata))
1004 d2.addCallback(_got_shares)
1006 # Exercise the case that the share we're asking to allocate is
1007 # already (completely) uploaded.
1008 d2.addCallback(lambda ign: aa.remote_allocate_buckets('teststorage_index',
1009 'x'*32, 'y'*32, set((0,)), 1, canary))
1011 d.addCallback(_allocated)
1014 def test_read_old_share(self):
1016 This tests whether the code correctly finds and reads shares written out by
1017 pre-pluggable-backends (Tahoe-LAFS <= v1.8.2) servers. There is a similar test
1018 in test_download, but that one is from the perspective of the client and exercises
1019 a deeper stack of code. This one is for exercising just the StorageServer and backend.
1021 server = self.create("test_read_old_share")
1022 aa = server.get_accountant().get_anonymous_account()
1024 # Contruct a file with the appropriate contents.
1025 datalen = len(self.share_data)
1026 sharedir = server.backend.get_shareset('teststorage_index')._get_sharedir()
1027 fileutil.make_dirs(sharedir)
1028 fileutil.write(os.path.join(sharedir, "0"), self.share_data)
1030 # Now begin the test.
1031 d = aa.remote_get_buckets('teststorage_index')
1032 def _got_buckets(bs):
1033 self.failUnlessEqual(len(bs), 1)
1034 self.failUnlessIn(0, bs)
1037 d2 = defer.succeed(None)
1038 d2.addCallback(lambda ign: b.remote_read(0, datalen))
1039 d2.addCallback(lambda res: self.failUnlessEqual(res, self.shareinputdata))
1041 # If you try to read past the end you get as much input data as is there.
1042 d2.addCallback(lambda ign: b.remote_read(0, datalen+20))
1043 d2.addCallback(lambda res: self.failUnlessEqual(res, self.shareinputdata))
1045 # If you start reading past the end of the file you get the empty string.
1046 d2.addCallback(lambda ign: b.remote_read(datalen+1, 3))
1047 d2.addCallback(lambda res: self.failUnlessEqual(res, ''))
1049 d.addCallback(_got_buckets)
1052 def test_bad_container_version(self):
1053 server = self.create("test_bad_container_version")
1054 aa = server.get_accountant().get_anonymous_account()
1056 d = self.allocate(aa, "allocate", [0,1], 20)
1057 def _allocated( (already, writers) ):
1058 d2 = defer.succeed(None)
1059 d2.addCallback(lambda ign: writers[0].remote_write(0, "\xff"*10))
1060 d2.addCallback(lambda ign: writers[0].remote_close())
1061 d2.addCallback(lambda ign: writers[1].remote_write(1, "\xaa"*10))
1062 d2.addCallback(lambda ign: writers[1].remote_close())
1064 d.addCallback(_allocated)
1066 d.addCallback(lambda ign: server.backend.get_shareset("allocate").get_share(0))
1067 def _write_invalid_version(share0):
1068 f = open(share0._get_path(), "rb+")
1071 f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
1074 d.addCallback(_write_invalid_version)
1076 # This should ignore the corrupted share; see ticket #1566.
1077 d.addCallback(lambda ign: aa.remote_get_buckets("allocate"))
1078 d.addCallback(lambda b: self.failUnlessEqual(set(b.keys()), set([1])))
1080 # Also if there are only corrupted shares.
1081 d.addCallback(lambda ign: server.backend.get_shareset("allocate").get_share(1))
1082 d.addCallback(lambda share: share.unlink())
1083 d.addCallback(lambda ign: aa.remote_get_buckets("allocate"))
1084 d.addCallback(lambda b: self.failUnlessEqual(b, {}))
1087 def test_advise_corruption(self):
1088 server = self.create("test_advise_corruption")
1089 aa = server.get_accountant().get_anonymous_account()
1091 si0_s = base32.b2a("si0")
1092 aa.remote_advise_corrupt_share("immutable", "si0", 0,
1093 "This share smells funny.\n")
1094 reportdir = os.path.join(server._statedir, "corruption-advisories")
1095 self.failUnless(os.path.exists(reportdir), reportdir)
1096 reports = fileutil.listdir(reportdir)
1097 self.failUnlessEqual(len(reports), 1)
1098 report_si0 = reports[0]
1099 self.failUnlessIn(si0_s, str(report_si0))
1100 report = fileutil.read(os.path.join(reportdir, report_si0))
1102 self.failUnlessIn("type: immutable", report)
1103 self.failUnlessIn("storage_index: %s" % si0_s, report)
1104 self.failUnlessIn("share_number: 0", report)
1105 self.failUnlessIn("This share smells funny.", report)
1107 # test the RIBucketWriter version too
1108 si1_s = base32.b2a("si1")
1109 d = self.allocate(aa, "si1", [1], 75)
1110 def _allocated( (already, writers) ):
1111 self.failUnlessEqual(already, set())
1112 self.failUnlessEqual(set(writers.keys()), set([1]))
1114 d2 = defer.succeed(None)
1115 d2.addCallback(lambda ign: writers[1].remote_write(0, "data"))
1116 d2.addCallback(lambda ign: writers[1].remote_close())
1118 d2.addCallback(lambda ign: aa.remote_get_buckets("si1"))
1119 def _got_buckets(b):
1120 self.failUnlessEqual(set(b.keys()), set([1]))
1121 b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
1123 reports = fileutil.listdir(reportdir)
1124 self.failUnlessEqual(len(reports), 2)
1125 report_si1 = [r for r in reports if si1_s in r][0]
1126 report = fileutil.read(os.path.join(reportdir, report_si1))
1128 self.failUnlessIn("type: immutable", report)
1129 self.failUnlessIn("storage_index: %s" % (si1_s,), report)
1130 self.failUnlessIn("share_number: 1", report)
1131 self.failUnlessIn("This share tastes like dust.", report)
1132 d2.addCallback(_got_buckets)
1134 d.addCallback(_allocated)
1137 def compare_leases(self, leases_a, leases_b, with_timestamps=True):
1138 self.failUnlessEqual(len(leases_a), len(leases_b))
1139 for i in range(len(leases_a)):
1142 self.failUnlessEqual(a.owner_num, b.owner_num)
1144 self.failUnlessEqual(a.renewal_time, b.renewal_time)
1145 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1147 def OFF_test_immutable_leases(self):
1148 server = self.create("test_immutable_leases")
1149 aa = server.get_accountant().get_anonymous_account()
1150 sa = server.get_accountant().get_starter_account()
1152 canary = FakeCanary()
1153 sharenums = range(5)
1156 # create a random non-numeric file in the bucket directory, to
1157 # exercise the code that's supposed to ignore those.
1158 bucket_dir = os.path.join(self.workdir("test_leases"),
1159 "shares", storage_index_to_dir("six"))
1160 os.makedirs(bucket_dir)
1161 fileutil.write(os.path.join(bucket_dir, "ignore_me.txt"),
1162 "you ought to be ignoring me\n")
1164 already,writers = aa.remote_allocate_buckets("si1", "", "",
1165 sharenums, size, canary)
1166 self.failUnlessEqual(len(already), 0)
1167 self.failUnlessEqual(len(writers), 5)
1168 for wb in writers.values():
1171 leases = aa.get_leases("si1")
1172 self.failUnlessEqual(len(leases), 5)
1174 aa.add_share("six", 0, 0, SHARETYPE_IMMUTABLE)
1175 # adding a share does not immediately add a lease
1176 self.failUnlessEqual(len(aa.get_leases("six")), 0)
1178 aa.add_or_renew_default_lease("six", 0)
1179 self.failUnlessEqual(len(aa.get_leases("six")), 1)
1181 # add-lease on a missing storage index is silently ignored
1182 self.failUnlessEqual(aa.remote_add_lease("si18", "", ""), None)
1183 self.failUnlessEqual(len(aa.get_leases("si18")), 0)
1185 all_leases = aa.get_leases("si1")
1187 # renew the lease directly
1188 aa.remote_renew_lease("si1", "")
1189 self.failUnlessEqual(len(aa.get_leases("si1")), 5)
1190 self.compare_leases(all_leases, aa.get_leases("si1"), with_timestamps=False)
1192 # Now allocate more leases using a different account.
1193 # A new lease should be allocated for every share in the shareset.
1194 sa.remote_renew_lease("si1", "")
1195 self.failUnlessEqual(len(aa.get_leases("si1")), 5)
1196 self.failUnlessEqual(len(sa.get_leases("si1")), 5)
1198 all_leases2 = sa.get_leases("si1")
1200 sa.remote_renew_lease("si1", "")
1201 self.compare_leases(all_leases2, sa.get_leases("si1"), with_timestamps=False)
1204 class MutableServerMixin:
1205 def write_enabler(self, we_tag):
1206 return hashutil.tagged_hash("we_blah", we_tag)
1208 def renew_secret(self, tag):
1209 return hashutil.tagged_hash("renew_blah", str(tag))
1211 def cancel_secret(self, tag):
1212 return hashutil.tagged_hash("cancel_blah", str(tag))
1214 def allocate(self, aa, storage_index, we_tag, sharenums, size):
1215 write_enabler = self.write_enabler(we_tag)
1217 # These secrets are not used, but clients still provide them.
1218 lease_tag = "%d" % (self._lease_secret.next(),)
1219 renew_secret = self.renew_secret(lease_tag)
1220 cancel_secret = self.cancel_secret(lease_tag)
1222 rstaraw = aa.remote_slot_testv_and_readv_and_writev
1223 testandwritev = dict( [ (shnum, ([], [], None) )
1224 for shnum in sharenums ] )
1227 d = defer.succeed(None)
1228 d.addCallback(lambda ign: rstaraw(storage_index,
1229 (write_enabler, renew_secret, cancel_secret),
1232 def _check( (did_write, readv_data) ):
1233 self.failUnless(did_write)
1234 self.failUnless(isinstance(readv_data, dict))
1235 self.failUnlessEqual(len(readv_data), 0)
1236 d.addCallback(_check)
1240 class MutableServerTest(MutableServerMixin, ShouldFailMixin):
1241 def test_create(self):
1242 server = self.create("test_create")
1243 aa = server.get_accountant().get_anonymous_account()
1244 self.failUnless(RIStorageServer.providedBy(aa), aa)
1246 def test_bad_magic(self):
1247 server = self.create("test_bad_magic")
1248 aa = server.get_accountant().get_anonymous_account()
1249 read = aa.remote_slot_readv
1251 d = self.allocate(aa, "si1", "we1", set([0,1]), 10)
1252 d.addCallback(lambda ign: server.backend.get_shareset("si1").get_share(0))
1253 def _got_share(share0):
1254 f = open(share0._get_path(), "rb+")
1257 f.write("BAD MAGIC")
1260 d.addCallback(_got_share)
1262 # This should ignore the corrupted share; see ticket #1566.
1263 d.addCallback(lambda ign: read("si1", [0,1], [(0,10)]) )
1264 d.addCallback(lambda res: self.failUnlessEqual(res, {1: ['']}))
1266 # Also if there are only corrupted shares.
1267 d.addCallback(lambda ign: server.backend.get_shareset("si1").get_share(1))
1268 d.addCallback(lambda share: share.unlink())
1269 d.addCallback(lambda ign: read("si1", [0], [(0,10)]) )
1270 d.addCallback(lambda res: self.failUnlessEqual(res, {}))
1273 def test_container_size(self):
1274 server = self.create("test_container_size")
1275 aa = server.get_accountant().get_anonymous_account()
1276 read = aa.remote_slot_readv
1277 rstaraw = aa.remote_slot_testv_and_readv_and_writev
1278 secrets = ( self.write_enabler("we1"),
1279 self.renew_secret("we1"),
1280 self.cancel_secret("we1") )
1281 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1283 d = self.allocate(aa, "si1", "we1", set([0,1,2]), 100)
1284 d.addCallback(lambda ign: rstaraw("si1", secrets,
1285 {0: ([], [(0,data)], len(data)+12)},
1287 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[],1:[],2:[]}) ))
1289 # Trying to make the container too large (by sending a write vector
1290 # whose offset is too high) will raise an exception.
1291 TOOBIG = MutableDiskShare.MAX_SIZE + 10
1292 d.addCallback(lambda ign: self.shouldFail(DataTooLargeError,
1293 'make container too large', None,
1294 lambda: rstaraw("si1", secrets,
1295 {0: ([], [(TOOBIG,data)], None)},
1298 d.addCallback(lambda ign: rstaraw("si1", secrets,
1299 {0: ([], [(0,data)], None)},
1301 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[],1:[],2:[]}) ))
1303 d.addCallback(lambda ign: read("si1", [0], [(0,10)]))
1304 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data[:10]]}))
1306 # Sending a new_length shorter than the current length truncates the
1308 d.addCallback(lambda ign: rstaraw("si1", secrets,
1311 d.addCallback(lambda ign: read("si1", [0], [(0,10)]))
1312 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data[:9]]}))
1314 # Sending a new_length longer than the current length doesn't change
1316 d.addCallback(lambda ign: rstaraw("si1", secrets,
1319 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[],1:[],2:[]}) ))
1320 d.addCallback(lambda ign: read("si1", [0], [(0, 20)]))
1321 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data[:9]]}))
1323 # Sending a write vector whose start is after the end of the current
1324 # data doesn't reveal "whatever was there last time" (palimpsest),
1325 # but instead fills with zeroes.
1327 # To test this, we fill the data area with a recognizable pattern.
1328 pattern = ''.join([chr(i) for i in range(100)])
1329 d.addCallback(lambda ign: rstaraw("si1", secrets,
1330 {0: ([], [(0, pattern)], None)},
1332 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[],1:[],2:[]}) ))
1333 # Then truncate the data...
1334 d.addCallback(lambda ign: rstaraw("si1", secrets,
1337 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[],1:[],2:[]}) ))
1338 # Just confirm that you get an empty string if you try to read from
1339 # past the (new) endpoint now.
1340 d.addCallback(lambda ign: rstaraw("si1", secrets,
1341 {0: ([], [], None)},
1343 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[''],1:[''],2:['']}) ))
1345 # Then the extend the file by writing a vector which starts out past
1347 d.addCallback(lambda ign: rstaraw("si1", secrets,
1348 {0: ([], [(50, 'hellothere')], None)},
1350 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[],1:[],2:[]}) ))
1351 # Now if you read the stuff between 20 (where we earlier truncated)
1352 # and 50, it had better be all zeroes.
1353 d.addCallback(lambda ign: rstaraw("si1", secrets,
1354 {0: ([], [], None)},
1356 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:['\x00'*30],1:[''],2:['']}) ))
1358 # Also see if the server explicitly declares that it supports this
1360 d.addCallback(lambda ign: aa.remote_get_version())
1361 def _check_declaration(ver):
1362 storage_v1_ver = ver["http://allmydata.org/tahoe/protocols/storage/v1"]
1363 self.failUnless(storage_v1_ver.get("fills-holes-with-zero-bytes"))
1364 d.addCallback(_check_declaration)
1366 # If the size is dropped to zero the share is deleted.
1367 d.addCallback(lambda ign: rstaraw("si1", secrets,
1368 {0: ([], [(0,data)], 0)},
1370 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[],1:[],2:[]}) ))
1372 d.addCallback(lambda ign: read("si1", [0], [(0,10)]))
1373 d.addCallback(lambda res: self.failUnlessEqual(res, {}))
1376 def test_allocate(self):
1377 server = self.create("test_allocate")
1378 aa = server.get_accountant().get_anonymous_account()
1379 read = aa.remote_slot_readv
1380 write = aa.remote_slot_testv_and_readv_and_writev
1382 d = self.allocate(aa, "si1", "we1", set([0,1,2]), 100)
1384 d.addCallback(lambda ign: read("si1", [0], [(0, 10)]))
1385 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [""]}))
1386 d.addCallback(lambda ign: read("si1", [], [(0, 10)]))
1387 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [""], 1: [""], 2: [""]}))
1388 d.addCallback(lambda ign: read("si1", [0], [(100, 10)]))
1389 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [""]}))
1391 # try writing to one
1392 secrets = ( self.write_enabler("we1"),
1393 self.renew_secret("we1"),
1394 self.cancel_secret("we1") )
1395 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1397 d.addCallback(lambda ign: write("si1", secrets,
1398 {0: ([], [(0,data)], None)},
1400 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[],1:[],2:[]}) ))
1402 d.addCallback(lambda ign: read("si1", [0], [(0,20)]))
1403 d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["00000000001111111111"]}))
1404 d.addCallback(lambda ign: read("si1", [0], [(95,10)]))
1405 d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["99999"]}))
1406 #d.addCallback(lambda ign: s0.remote_get_length())
1407 #d.addCallback(lambda res: self.failUnlessEqual(res, 100))
1409 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
1410 d.addCallback(lambda ign: self.shouldFail(BadWriteEnablerError, 'bad write enabler',
1411 "The write enabler was recorded by nodeid "
1412 "'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'.",
1413 lambda: write("si1", bad_secrets, {}, []) ))
1415 # this testv should fail
1416 d.addCallback(lambda ign: write("si1", secrets,
1417 {0: ([(0, 12, "eq", "444444444444"),
1418 (20, 5, "eq", "22222"),],
1422 d.addCallback(lambda res: self.failUnlessEqual(res, (False,
1423 {0: ["000000000011", "22222"],
1426 d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1427 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1429 # as should this one
1430 d.addCallback(lambda ign: write("si1", secrets,
1431 {0: ([(10, 5, "lt", "11111"),],
1435 d.addCallback(lambda res: self.failUnlessEqual(res, (False,
1439 d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1440 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1443 def test_operators(self):
1444 # test operators, the data we're comparing is '11111' in all cases.
1445 # test both fail+pass, reset data after each one.
1446 server = self.create("test_operators")
1447 aa = server.get_accountant().get_anonymous_account()
1449 secrets = ( self.write_enabler("we1"),
1450 self.renew_secret("we1"),
1451 self.cancel_secret("we1") )
1452 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1453 write = aa.remote_slot_testv_and_readv_and_writev
1454 read = aa.remote_slot_readv
1457 return write("si1", secrets,
1458 {0: ([], [(0,data)], None)},
1461 d = defer.succeed(None)
1462 d.addCallback(_reset)
1465 d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "lt", "11110"),],
1469 d.addCallback(lambda res: self.failUnlessEqual(res, (False, {0: ["11111"]}) ))
1470 d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1471 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1472 d.addCallback(lambda ign: read("si1", [], [(0,100)]))
1473 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1474 d.addCallback(_reset)
1476 d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "lt", "11111"),],
1480 d.addCallback(lambda res: self.failUnlessEqual(res, (False, {0: ["11111"]}) ))
1481 d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1482 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1483 d.addCallback(_reset)
1485 d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "lt", "11112"),],
1489 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0: ["11111"]}) ))
1490 d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1491 d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["y"*100]}))
1492 d.addCallback(_reset)
1495 d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "le", "11110"),],
1499 d.addCallback(lambda res: self.failUnlessEqual(res, (False, {0: ["11111"]}) ))
1500 d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1501 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1502 d.addCallback(_reset)
1504 d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "le", "11111"),],
1508 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0: ["11111"]}) ))
1509 d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1510 d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["y"*100]}))
1511 d.addCallback(_reset)
1513 d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "le", "11112"),],
1517 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0: ["11111"]}) ))
1518 d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1519 d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["y"*100]}))
1520 d.addCallback(_reset)
1523 d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "eq", "11112"),],
1527 d.addCallback(lambda res: self.failUnlessEqual(res, (False, {0: ["11111"]}) ))
1528 d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1529 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1530 d.addCallback(_reset)
1532 d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "eq", "11111"),],
1536 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0: ["11111"]}) ))
1537 d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1538 d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["y"*100]}))
1539 d.addCallback(_reset)
1542 d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "ne", "11111"),],
1546 d.addCallback(lambda res: self.failUnlessEqual(res, (False, {0: ["11111"]}) ))
1547 d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1548 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1549 d.addCallback(_reset)
1551 d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "ne", "11112"),],
1555 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0: ["11111"]}) ))
1556 d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1557 d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["y"*100]}))
1558 d.addCallback(_reset)
1561 d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "ge", "11110"),],
1565 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0: ["11111"]}) ))
1566 d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1567 d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["y"*100]}))
1568 d.addCallback(_reset)
1570 d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "ge", "11111"),],
1574 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0: ["11111"]}) ))
1575 d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1576 d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["y"*100]}))
1577 d.addCallback(_reset)
1579 d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "ge", "11112"),],
1583 d.addCallback(lambda res: self.failUnlessEqual(res, (False, {0: ["11111"]}) ))
1584 d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1585 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1586 d.addCallback(_reset)
1589 d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "gt", "11110"),],
1593 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0: ["11111"]}) ))
1594 d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1595 d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["y"*100]}))
1596 d.addCallback(_reset)
1598 d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "gt", "11111"),],
1602 d.addCallback(lambda res: self.failUnlessEqual(res, (False, {0: ["11111"]}) ))
1603 d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1604 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1605 d.addCallback(_reset)
1607 d.addCallback(lambda ign: write("si1", secrets, {0: ([(10, 5, "gt", "11112"),],
1611 d.addCallback(lambda res: self.failUnlessEqual(res, (False, {0: ["11111"]}) ))
1612 d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1613 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1614 d.addCallback(_reset)
1616 # finally, test some operators against empty shares
1617 d.addCallback(lambda ign: write("si1", secrets, {1: ([(10, 5, "eq", "11112"),],
1621 d.addCallback(lambda res: self.failUnlessEqual(res, (False, {0: ["11111"]}) ))
1622 d.addCallback(lambda ign: read("si1", [0], [(0,100)]))
1623 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
1624 d.addCallback(_reset)
1627 def test_readv(self):
1628 server = self.create("test_readv")
1629 aa = server.get_accountant().get_anonymous_account()
1631 secrets = ( self.write_enabler("we1"),
1632 self.renew_secret("we1"),
1633 self.cancel_secret("we1") )
1634 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1635 write = aa.remote_slot_testv_and_readv_and_writev
1636 read = aa.remote_slot_readv
1637 data = [("%d" % i) * 100 for i in range(3)]
1639 d = defer.succeed(None)
1640 d.addCallback(lambda ign: write("si1", secrets,
1641 {0: ([], [(0,data[0])], None),
1642 1: ([], [(0,data[1])], None),
1643 2: ([], [(0,data[2])], None),
1645 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {}) ))
1647 d.addCallback(lambda ign: read("si1", [], [(0, 10)]))
1648 d.addCallback(lambda res: self.failUnlessEqual(res, {0: ["0"*10],
1653 def test_writev(self):
1654 # This is run for both the disk and cloud backends, but it is particularly
1655 # designed to exercise the cloud backend's implementation of chunking for
1656 # mutable shares, assuming that PREFERRED_CHUNK_SIZE has been patched to 500.
1657 # Note that the header requires 472 bytes, so only the first 28 bytes of data are
1658 # in the first chunk.
1660 server = self.create("test_writev")
1661 aa = server.get_accountant().get_anonymous_account()
1662 read = aa.remote_slot_readv
1663 rstaraw = aa.remote_slot_testv_and_readv_and_writev
1664 secrets = ( self.write_enabler("we1"),
1665 self.renew_secret("we1"),
1666 self.cancel_secret("we1") )
1668 def _check(ign, writev, expected_data, expected_write_loads, expected_write_stores,
1669 expected_read_loads, should_exist):
1670 d2 = rstaraw("si1", secrets, {0: writev}, [])
1672 d2.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[]}) ))
1674 d2.addCallback(lambda res: self.failUnlessEqual(res, (True, {}) ))
1675 d2.addCallback(lambda ign: self.check_load_store_counts(expected_write_loads,
1676 expected_write_stores))
1677 d2.addCallback(lambda ign: self.reset_load_store_counts())
1679 d2.addCallback(lambda ign: read("si1", [0], [(0, len(expected_data) + 1)]))
1680 if expected_data == "":
1681 d2.addCallback(lambda res: self.failUnlessEqual(res, {}))
1683 d2.addCallback(lambda res: self.failUnlessEqual(res, {0: [expected_data]}))
1684 d2.addCallback(lambda ign: self.check_load_store_counts(expected_read_loads, 0))
1685 d2.addCallback(lambda ign: self.reset_load_store_counts())
1688 self.reset_load_store_counts()
1689 d = self.allocate(aa, "si1", "we1", set([0]), 2725)
1690 d.addCallback(_check, ([], [(0, "a"*10)], None),
1693 d.addCallback(_check, ([], [(20, "b"*18)], None),
1694 "a"*10 + "\x00"*10 + "b"*18,
1696 d.addCallback(_check, ([], [(1038, "c")], None),
1697 "a"*10 + "\x00"*10 + "b"*18 + "\x00"*(490+500+10) + "c",
1699 d.addCallback(_check, ([], [(0, "d"*1038)], None),
1702 d.addCallback(_check, ([], [(2167, "a"*54)], None),
1703 "d"*1038 + "c" + "\x00"*1128 + "a"*54,
1705 # This pattern was observed from the MDMF publisher in v1.9.1.
1706 # Notice the duplicated write of length 41 bytes at offset 0.
1707 d.addCallback(_check, ([], [(2167, "e"*54), (123, "f"*347), (2221, "g"*32), (470, "h"*136),
1708 (0, "i"*41), (606, "j"*66), (672, "k"*93), (59, "l"*64),
1709 (41, "m"*18), (0, "i"*41)], None),
1710 "i"*41 + "m"*18 + "l"*64 + "f"*347 + "h"*136 + "j"*66 + "k"*93 + "d"*273 + "c" + "\x00"*1128 +
1713 # This should delete all chunks.
1714 d.addCallback(_check, ([], [], 0),
1717 d.addCallback(_check, ([], [(2167, "e"*54), (123, "f"*347), (2221, "g"*32), (470, "h"*136),
1718 (0, "i"*41), (606, "j"*66), (672, "k"*93), (59, "l"*64),
1719 (41, "m"*18), (0, "i"*41)], None),
1720 "i"*41 + "m"*18 + "l"*64 + "f"*347 + "h"*136 + "j"*66 + "k"*93 + "\x00"*1402 +
1725 def test_remove(self):
1726 server = self.create("test_remove")
1727 aa = server.get_accountant().get_anonymous_account()
1728 readv = aa.remote_slot_readv
1729 writev = aa.remote_slot_testv_and_readv_and_writev
1730 secrets = ( self.write_enabler("we1"),
1731 self.renew_secret("we1"),
1732 self.cancel_secret("we1") )
1734 d = defer.succeed(None)
1735 d.addCallback(lambda ign: self.allocate(aa, "si1", "we1", set([0,1,2]), 100))
1736 # delete sh0 by setting its size to zero
1737 d.addCallback(lambda ign: writev("si1", secrets,
1740 # the answer should mention all the shares that existed before the
1742 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {0:[],1:[],2:[]}) ))
1743 # but a new read should show only sh1 and sh2
1744 d.addCallback(lambda ign: readv("si1", [], [(0,10)]))
1745 d.addCallback(lambda res: self.failUnlessEqual(res, {1: [""], 2: [""]}))
1747 # delete sh1 by setting its size to zero
1748 d.addCallback(lambda ign: writev("si1", secrets,
1751 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {1:[],2:[]}) ))
1752 d.addCallback(lambda ign: readv("si1", [], [(0,10)]))
1753 d.addCallback(lambda res: self.failUnlessEqual(res, {2: [""]}))
1755 # delete sh2 by setting its size to zero
1756 d.addCallback(lambda ign: writev("si1", secrets,
1759 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {2:[]}) ))
1760 d.addCallback(lambda ign: readv("si1", [], [(0,10)]))
1761 d.addCallback(lambda res: self.failUnlessEqual(res, {}))
1763 d.addCallback(lambda ign: server.backend.get_shareset("si1").get_overhead())
1764 d.addCallback(lambda overhead: self.failUnlessEqual(overhead, 0))
1766 # and the shareset directory should now be gone. This check is only
1767 # applicable to the disk backend.
1768 def _check_gone(ign):
1769 si = base32.b2a("si1")
1770 # note: this is a detail of the disk backend, and may change in the future
1772 prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1773 sidir = os.path.join(prefixdir, si)
1774 self.failUnless(os.path.exists(prefixdir), prefixdir)
1775 self.failIf(os.path.exists(sidir), sidir)
1777 if isinstance(server.backend, DiskBackend):
1778 d.addCallback(_check_gone)
1781 def compare_leases(self, leases_a, leases_b, with_timestamps=True):
1782 self.failUnlessEqual(len(leases_a), len(leases_b))
1783 for i in range(len(leases_a)):
1786 self.failUnlessEqual(a.owner_num, b.owner_num)
1788 self.failUnlessEqual(a.renewal_time, b.renewal_time)
1789 self.failUnlessEqual(a.expiration_time, b.expiration_time)
1791 def test_mutable_leases(self):
1792 server = self.create("test_mutable_leases")
1793 aa = server.get_accountant().get_anonymous_account()
1794 sa = server.get_accountant().get_starter_account()
1797 return ( self.write_enabler("we1"),
1798 self.renew_secret("we1-%d" % n),
1799 self.cancel_secret("we1-%d" % n) )
1800 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1801 aa_write = aa.remote_slot_testv_and_readv_and_writev
1802 sa_write = sa.remote_slot_testv_and_readv_and_writev
1803 read = aa.remote_slot_readv
1805 # There is no such method as remote_cancel_lease -- see ticket #1528.
1806 self.failIf(hasattr(aa, 'remote_cancel_lease'),
1807 "aa should not have a 'remote_cancel_lease' method/attribute")
1809 # create a random non-numeric file in the bucket directory, to
1810 # exercise the code that's supposed to ignore those.
1811 bucket_dir = os.path.join(self.workdir("test_leases"),
1812 "shares", storage_index_to_dir("six"))
1813 os.makedirs(bucket_dir)
1814 fileutil.write(os.path.join(bucket_dir, "ignore_me.txt"),
1815 "you ought to be ignoring me\n")
1817 create_mutable_disk_share(os.path.join(bucket_dir, "0"), server.get_serverid(),
1818 secrets(0)[0], storage_index="six", shnum=0)
1820 aa.add_share("six", 0, 0, SHARETYPE_MUTABLE)
1821 # adding a share does not immediately add a lease
1822 self.failUnlessEqual(len(aa.get_leases("six")), 0)
1824 aa.add_or_renew_default_lease("six", 0)
1825 self.failUnlessEqual(len(aa.get_leases("six")), 1)
1827 d = defer.succeed(None)
1829 d.addCallback(lambda ign: aa_write("si0", secrets(1), {0: ([], [(0,data)], None)}, []))
1830 d.addCallback(lambda res: self.failUnlessEqual(res, (True, {})))
1832 # add-lease on a missing storage index is silently ignored
1833 d.addCallback(lambda ign: aa.remote_add_lease("si18", "", ""))
1834 d.addCallback(lambda res: self.failUnless(res is None, res))
1835 d.addCallback(lambda ign: self.failUnlessEqual(len(aa.get_leases("si18")), 0))
1837 # create a lease by writing
1838 d.addCallback(lambda ign: aa_write("si1", secrets(2), {0: ([], [(0,data)], None)}, []))
1839 d.addCallback(lambda ign: self.failUnlessEqual(len(aa.get_leases("si1")), 1))
1842 d.addCallback(lambda ign: aa.remote_renew_lease("si1", secrets(2)[1]))
1843 d.addCallback(lambda ign: self.failUnlessEqual(len(aa.get_leases("si1")), 1))
1845 # now allocate another lease using a different account
1846 d.addCallback(lambda ign: sa_write("si1", secrets(3), {0: ([], [(0,data)], None)}, []))
1848 aa_leases = aa.get_leases("si1")
1849 sa_leases = sa.get_leases("si1")
1851 self.failUnlessEqual(len(aa_leases), 1)
1852 self.failUnlessEqual(len(sa_leases), 1)
1854 d2 = defer.succeed(None)
1855 d2.addCallback(lambda ign: aa.remote_renew_lease("si1", secrets(2)[1]))
1856 d2.addCallback(lambda ign: self.compare_leases(aa_leases, aa.get_leases("si1"),
1857 with_timestamps=False))
1859 d2.addCallback(lambda ign: sa.remote_renew_lease("si1", "shouldn't matter"))
1860 d2.addCallback(lambda ign: self.compare_leases(sa_leases, sa.get_leases("si1"),
1861 with_timestamps=False))
1863 # Get a new copy of the leases, with the current timestamps. Reading
1864 # data should leave the timestamps alone.
1865 d2.addCallback(lambda ign: aa.get_leases("si1"))
1866 def _check2(new_aa_leases):
1867 # reading shares should not modify the timestamp
1868 d3 = read("si1", [], [(0, 200)])
1869 d3.addCallback(lambda ign: self.compare_leases(new_aa_leases, aa.get_leases("si1"),
1870 with_timestamps=False))
1872 d3.addCallback(lambda ign: aa_write("si1", secrets(2),
1873 {0: ([], [(500, "make me bigger")], None)}, []))
1874 d3.addCallback(lambda ign: self.compare_leases(new_aa_leases, aa.get_leases("si1"),
1875 with_timestamps=False))
1877 d2.addCallback(_check2)
1879 d.addCallback(_check)
1883 class ServerWithNullBackend(ServiceParentMixin, WorkdirMixin, ServerMixin, unittest.TestCase):
1884 def test_null_backend(self):
1885 workdir = self.workdir("test_null_backend")
1886 backend = NullBackend()
1887 server = StorageServer("\x00" * 20, backend, workdir)
1888 server.setServiceParent(self.sparent)
1889 aa = server.get_accountant().get_anonymous_account()
1891 d = self.allocate(aa, "vid", [0,1,2], 75)
1892 def _allocated( (already, writers) ):
1893 self.failUnlessEqual(already, set())
1894 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
1896 d2 = for_items(self._write_and_close, writers)
1898 # The shares should be present but have no data.
1899 d2.addCallback(lambda ign: aa.remote_get_buckets("vid"))
1900 def _check(buckets):
1901 self.failUnlessEqual(set(buckets.keys()), set([0,1,2]))
1902 d3 = defer.succeed(None)
1903 d3.addCallback(lambda ign: buckets[0].remote_read(0, 25))
1904 d3.addCallback(lambda res: self.failUnlessEqual(res, ""))
1906 d2.addCallback(_check)
1908 d.addCallback(_allocated)
1912 class WithMockCloudBackend(ServiceParentMixin, WorkdirMixin):
1913 def create(self, name, detached=False, readonly=False, reserved_space=0, klass=StorageServer):
1915 workdir = self.workdir(name)
1916 self._container = MockContainer(workdir)
1917 backend = CloudBackend(self._container)
1918 server = klass("\x00" * 20, backend, workdir,
1919 stats_provider=FakeStatsProvider())
1921 server.setServiceParent(self.sparent)
1924 def reset_load_store_counts(self):
1925 self._container.reset_load_store_counts()
1927 def check_load_store_counts(self, expected_load_count, expected_store_count):
1928 self.failUnlessEqual((self._container.get_load_count(), self._container.get_store_count()),
1929 (expected_load_count, expected_store_count))
1932 class WithDiskBackend(ServiceParentMixin, WorkdirMixin):
1933 def create(self, name, detached=False, readonly=False, reserved_space=0, klass=StorageServer):
1934 workdir = self.workdir(name)
1935 backend = DiskBackend(workdir, readonly=readonly, reserved_space=reserved_space)
1936 server = klass("\x00" * 20, backend, workdir,
1937 stats_provider=FakeStatsProvider())
1939 server.setServiceParent(self.sparent)
1942 def reset_load_store_counts(self):
1945 def check_load_store_counts(self, expected_loads, expected_stores):
1949 class ServerWithMockCloudBackend(WithMockCloudBackend, ServerTest, unittest.TestCase):
1951 ServiceParentMixin.setUp(self)
1953 # A smaller chunk size causes the tests to exercise more cases in the chunking implementation.
1954 self.patch(cloud_common, 'PREFERRED_CHUNK_SIZE', 500)
1956 # This causes ContainerListMixin to be exercised.
1957 self.patch(mock_cloud, 'MAX_KEYS', 2)
1959 def test_bad_container_version(self):
1960 return ServerTest.test_bad_container_version(self)
1961 test_bad_container_version.todo = "The cloud backend hasn't been modified to fix ticket #1566."
1964 def _describe_level(self, level):
1965 return getattr(LogEvent, 'LEVELMAP', {}).get(level, str(level))
1967 def _test_cloud_retry(self, name, failure_count, levels):
1968 self.patch(cloud_common, 'BACKOFF_SECONDS_BEFORE_RETRY', (0, 0.1, 0.2))
1971 old_put_object = MockContainer._put_object
1972 def call_put_object(self, ign, object_name, data, content_type=None, metadata={}):
1974 if t['count'] <= failure_count:
1975 return defer.fail(CloudServiceError("XML", 500, "Internal error", "response"))
1977 return old_put_object(self, ign, object_name, data, content_type=content_type, metadata=metadata)
1978 self.patch(MockContainer, '_put_object', call_put_object)
1980 def call_log_msg(*args, **kwargs):
1981 # the log message and parameters should not include the data
1982 self.failIfIn("%25d" % (0,), repr( (args, kwargs) ))
1983 level = kwargs.get("level", OPERATIONAL)
1984 if level > OPERATIONAL:
1985 levels.append(level)
1986 self.patch(cloud_common.log, 'msg', call_log_msg)
1988 server = self.create(name)
1989 aa = server.get_accountant().get_anonymous_account()
1991 d = self.allocate(aa, "vid", [0], 75)
1992 d.addCallback(lambda (already, writers): for_items(self._write_and_close, writers))
1995 def test_cloud_retry_fail(self):
1996 levels = [] # list of logging levels above OPERATIONAL for calls to log.msg
1997 d = self._test_cloud_retry("test_cloud_retry_fail", 4, levels)
1998 # shouldFail would check repr(res.value.args[0]) which is not what we want
2000 if isinstance(res, Failure):
2001 res.trap(cloud_common.CloudError)
2002 self.failUnlessIn(", 500, 'Internal error', 'response')", str(res.value))
2003 # the stringified exception should not include the data
2004 self.failIfIn("%25d" % (0,), str(res.value))
2005 desc = ", ".join(map(self._describe_level, levels))
2006 self.failUnlessEqual(levels, [INFREQUENT]*4 + [WEIRD], desc)
2008 self.fail("was supposed to raise CloudError, not get %r" % (res,))
2012 def test_cloud_retry_succeed(self):
2013 levels = [] # list of logging levels above OPERATIONAL for calls to log.msg
2014 d = self._test_cloud_retry("test_cloud_retry_succeed", 3, levels)
2016 desc = ", ".join(map(self._describe_level, levels))
2017 self.failUnlessEqual(levels, [INFREQUENT]*3 + [WEIRD], desc)
2022 class ServerWithDiskBackend(WithDiskBackend, ServerTest, unittest.TestCase):
2023 # The following tests are for behaviour that is only supported by a disk backend.
2025 def test_readonly(self):
2026 server = self.create("test_readonly", readonly=True)
2027 aa = server.get_accountant().get_anonymous_account()
2029 d = self.allocate(aa, "vid", [0,1,2], 75)
2030 def _allocated( (already, writers) ):
2031 self.failUnlessEqual(already, set())
2032 self.failUnlessEqual(writers, {})
2034 stats = server.get_stats()
2035 self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"], 0)
2036 if "storage_server.disk_avail" in stats:
2037 # Some platforms may not have an API to get disk stats.
2038 # But if there are stats, readonly_storage means disk_avail=0
2039 self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
2040 d.addCallback(_allocated)
2043 def test_large_share(self):
2044 syslow = platform.system().lower()
2045 if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
2046 raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
2048 avail = fileutil.get_available_space('.', 512*2**20)
2049 if avail <= 4*2**30:
2050 raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
2052 server = self.create("test_large_share")
2053 aa = server.get_accountant().get_anonymous_account()
2055 d = self.allocate(aa, "allocate", [0], 2**32+2)
2056 def _allocated( (already, writers) ):
2057 self.failUnlessEqual(already, set())
2058 self.failUnlessEqual(set(writers.keys()), set([0]))
2060 shnum, bucket = writers.items()[0]
2062 # This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
2063 d2 = defer.succeed(None)
2064 d2.addCallback(lambda ign: bucket.remote_write(2**32, "ab"))
2065 d2.addCallback(lambda ign: bucket.remote_close())
2067 d2.addCallback(lambda ign: aa.remote_get_buckets("allocate"))
2068 d2.addCallback(lambda readers: readers[shnum].remote_read(2**32, 2))
2069 d2.addCallback(lambda res: self.failUnlessEqual(res, "ab"))
2071 d.addCallback(_allocated)
2074 def test_remove_incoming(self):
2075 server = self.create("test_remove_incoming")
2076 aa = server.get_accountant().get_anonymous_account()
2078 d = self.allocate(aa, "vid", range(3), 25)
2079 def _write_and_check( (already, writers) ):
2080 d2 = defer.succeed(None)
2081 for i, bw in sorted(writers.items()):
2082 incoming_share_home = bw._share._get_path()
2083 d2.addCallback(self._write_and_close, i, bw)
2086 incoming_si_dir = os.path.dirname(incoming_share_home)
2087 incoming_prefix_dir = os.path.dirname(incoming_si_dir)
2088 incoming_dir = os.path.dirname(incoming_prefix_dir)
2090 self.failIf(os.path.exists(incoming_si_dir), incoming_si_dir)
2091 self.failIf(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
2092 self.failUnless(os.path.exists(incoming_dir), incoming_dir)
2093 d2.addCallback(_check)
2095 d.addCallback(_write_and_check)
2098 def test_abort(self):
2099 # remote_abort, when called on a writer, should make sure that
2100 # the allocated size of the bucket is not counted by the storage
2101 # server when accounting for space.
2102 server = self.create("test_abort")
2103 aa = server.get_accountant().get_anonymous_account()
2105 d = self.allocate(aa, "allocate", [0, 1, 2], 150)
2106 def _allocated( (already, writers) ):
2107 self.failIfEqual(server.allocated_size(), 0)
2109 # Now abort the writers.
2110 d2 = for_items(self._abort_writer, writers)
2111 d2.addCallback(lambda ign: self.failUnlessEqual(server.allocated_size(), 0))
2113 d.addCallback(_allocated)
2116 def test_disconnect(self):
2117 # simulate a disconnection
2118 server = self.create("test_disconnect")
2119 aa = server.get_accountant().get_anonymous_account()
2120 canary = FakeCanary()
2122 d = self.allocate(aa, "disconnect", [0,1,2], 75, canary)
2123 def _allocated( (already, writers) ):
2124 self.failUnlessEqual(already, set())
2125 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
2126 for (f,args,kwargs) in canary.disconnectors.values():
2128 d.addCallback(_allocated)
2130 # returning from _allocated ought to delete the incoming shares
2131 d.addCallback(lambda ign: self.allocate(aa, "disconnect", [0,1,2], 75))
2132 def _allocated2( (already, writers) ):
2133 self.failUnlessEqual(already, set())
2134 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
2135 d.addCallback(_allocated2)
2138 @mock.patch('allmydata.util.fileutil.get_disk_stats')
2139 def test_reserved_space(self, mock_get_disk_stats):
2140 reserved_space=10000
2141 mock_get_disk_stats.return_value = {
2142 'free_for_nonroot': 15000,
2143 'avail': max(15000 - reserved_space, 0),
2146 server = self.create("test_reserved_space", reserved_space=reserved_space)
2147 aa = server.get_accountant().get_anonymous_account()
2149 # 15k available, 10k reserved, leaves 5k for shares
2151 # a newly created and filled share incurs this much overhead, beyond
2152 # the size we request.
2154 LEASE_SIZE = 4+32+32+4
2155 canary = FakeCanary(True)
2157 d = self.allocate(aa, "vid1", [0,1,2], 1000, canary)
2158 def _allocated( (already, writers) ):
2159 self.failUnlessEqual(len(writers), 3)
2160 # now the StorageServer should have 3000 bytes provisionally
2161 # allocated, allowing only 2000 more to be claimed
2162 self.failUnlessEqual(len(server._active_writers), 3)
2163 self.writers = writers
2166 # allocating 1001-byte shares only leaves room for one
2167 d2 = self.allocate(aa, "vid2", [0,1,2], 1001, canary)
2168 def _allocated2( (already2, writers2) ):
2169 self.failUnlessEqual(len(writers2), 1)
2170 self.failUnlessEqual(len(server._active_writers), 4)
2172 # we abandon the first set, so their provisional allocation should be
2174 d3 = for_items(self._abort_writer, self.writers)
2175 #def _del_writers(ign):
2177 #d3.addCallback(_del_writers)
2178 d3.addCallback(lambda ign: self.failUnlessEqual(len(server._active_writers), 1))
2180 # and we close the second set, so their provisional allocation should
2181 # become real, long-term allocation, and grows to include the
2183 d3.addCallback(lambda ign: for_items(self._write_and_close, writers2))
2184 d3.addCallback(lambda ign: self.failUnlessEqual(len(server._active_writers), 0))
2186 d2.addCallback(_allocated2)
2188 allocated = 1001 + OVERHEAD + LEASE_SIZE
2190 # we have to manually increase available, since we're not doing real
2193 mock_get_disk_stats.return_value = {
2194 'free_for_nonroot': 15000 - allocated,
2195 'avail': max(15000 - allocated - reserved_space, 0),
2197 d2.addCallback(_mock)
2199 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
2200 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
2201 d2.addCallback(lambda ign: self.allocate(aa, "vid3", range(100), 100, canary))
2202 def _allocated3( (already3, writers3) ):
2203 self.failUnlessEqual(len(writers3), 39)
2204 self.failUnlessEqual(len(server._active_writers), 39)
2206 d3 = for_items(self._abort_writer, writers3)
2207 d3.addCallback(lambda ign: self.failUnlessEqual(len(server._active_writers), 0))
2208 d3.addCallback(lambda ign: server.disownServiceParent())
2210 d2.addCallback(_allocated3)
2211 d.addCallback(_allocated)
2214 def OFF_test_immutable_leases(self):
2215 server = self.create("test_immutable_leases")
2216 aa = server.get_accountant().get_anonymous_account()
2217 canary = FakeCanary()
2218 sharenums = range(5)
2224 rs.append(hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
2225 cs.append(hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
2227 d = aa.remote_allocate_buckets("si0", rs[0], cs[0],
2228 sharenums, size, canary)
2229 def _allocated( (already, writers) ):
2230 self.failUnlessEqual(len(already), 0)
2231 self.failUnlessEqual(len(writers), 5)
2233 d2 = for_items(self._close_writer, writers)
2235 d2.addCallback(lambda ign: list(aa.get_leases("si0")))
2236 d2.addCallback(lambda leases: self.failUnlessEqual(len(leases), 1))
2238 d2.addCallback(lambda ign: aa.remote_allocate_buckets("si1", rs[1], cs[1],
2239 sharenums, size, canary))
2241 d.addCallback(_allocated)
2243 def _allocated2( (already, writers) ):
2244 d2 = for_items(self._close_writer, writers)
2246 # take out a second lease on si1
2247 d2.addCallback(lambda ign: aa.remote_allocate_buckets("si1", rs[2], cs[2],
2248 sharenums, size, canary))
2250 d.addCallback(_allocated2)
2252 def _allocated2a( (already, writers) ):
2253 self.failUnlessEqual(len(already), 5)
2254 self.failUnlessEqual(len(writers), 0)
2256 d2 = defer.succeed(None)
2257 d2.addCallback(lambda ign: list(aa.get_leases("si1")))
2258 d2.addCallback(lambda leases: self.failUnlessEqual(len(leases), 2))
2260 # and a third lease, using add-lease
2261 d2.addCallback(lambda ign: aa.remote_add_lease("si1", rs[3], cs[3]))
2263 d2.addCallback(lambda ign: list(aa.get_leases("si1")))
2264 d2.addCallback(lambda leases: self.failUnlessEqual(len(leases), 3))
2266 # add-lease on a missing storage index is silently ignored
2267 d2.addCallback(lambda ign: aa.remote_add_lease("si18", "", ""))
2268 d2.addCallback(lambda res: self.failUnlessEqual(res, None))
2270 # check that si0 is readable
2271 d2.addCallback(lambda ign: aa.remote_get_buckets("si0"))
2272 d2.addCallback(lambda readers: self.failUnlessEqual(len(readers), 5))
2274 # renew the first lease. Only the proper renew_secret should work
2275 d2.addCallback(lambda ign: aa.remote_renew_lease("si0", rs[0]))
2276 d2.addCallback(lambda ign: self.shouldFail(IndexError, 'wrong secret 1', None,
2277 lambda: aa.remote_renew_lease("si0", cs[0]) ))
2278 d2.addCallback(lambda ign: self.shouldFail(IndexError, 'wrong secret 2', None,
2279 lambda: aa.remote_renew_lease("si0", rs[1]) ))
2281 # check that si0 is still readable
2282 d2.addCallback(lambda ign: aa.remote_get_buckets("si0"))
2283 d2.addCallback(lambda readers: self.failUnlessEqual(len(readers), 5))
2285 # There is no such method as remote_cancel_lease for now -- see
2287 d2.addCallback(lambda ign: self.failIf(hasattr(aa, 'remote_cancel_lease'),
2288 "aa should not have a 'remote_cancel_lease' method/attribute"))
2290 # test overlapping uploads
2291 d2.addCallback(lambda ign: aa.remote_allocate_buckets("si3", rs[4], cs[4],
2292 sharenums, size, canary))
2294 d.addCallback(_allocated2a)
2296 def _allocated4( (already, writers) ):
2297 self.failUnlessEqual(len(already), 0)
2298 self.failUnlessEqual(len(writers), 5)
2300 d2 = defer.succeed(None)
2301 d2.addCallback(lambda ign: aa.remote_allocate_buckets("si3", rs[5], cs[5],
2302 sharenums, size, canary))
2303 def _allocated5( (already2, writers2) ):
2304 self.failUnlessEqual(len(already2), 0)
2305 self.failUnlessEqual(len(writers2), 0)
2307 d3 = for_items(self._close_writer, writers)
2309 d3.addCallback(lambda ign: list(aa.get_leases("si3")))
2310 d3.addCallback(lambda leases: self.failUnlessEqual(len(leases), 1))
2312 d3.addCallback(lambda ign: aa.remote_allocate_buckets("si3", rs[5], cs[5],
2313 sharenums, size, canary))
2315 d2.addCallback(_allocated5)
2317 def _allocated6( (already3, writers3) ):
2318 self.failUnlessEqual(len(already3), 5)
2319 self.failUnlessEqual(len(writers3), 0)
2321 d3 = defer.succeed(None)
2322 d3.addCallback(lambda ign: list(aa.get_leases("si3")))
2323 d3.addCallback(lambda leases: self.failUnlessEqual(len(leases), 2))
2325 d2.addCallback(_allocated6)
2327 d.addCallback(_allocated4)
2331 class MutableServerWithMockCloudBackend(WithMockCloudBackend, MutableServerTest, unittest.TestCase):
2333 ServiceParentMixin.setUp(self)
2335 # A smaller chunk size causes the tests to exercise more cases in the chunking implementation.
2336 self.patch(cloud_common, 'PREFERRED_CHUNK_SIZE', 500)
2338 # This causes ContainerListMixin to be exercised.
2339 self.patch(mock_cloud, 'MAX_KEYS', 2)
2341 def test_bad_magic(self):
2342 return MutableServerTest.test_bad_magic(self)
2343 test_bad_magic.todo = "The cloud backend hasn't been modified to fix ticket #1566."
2346 class MutableServerWithDiskBackend(WithDiskBackend, MutableServerTest, unittest.TestCase):
2347 # There are no mutable tests specific to a disk backend.
2351 class MDMFProxies(WithDiskBackend, ShouldFailMixin, unittest.TestCase):
2352 def init(self, name):
2353 self._lease_secret = itertools.count()
2354 self.server = self.create(name)
2355 self.aa = self.server.get_accountant().get_anonymous_account()
2356 self.rref = RemoteBucket()
2357 self.rref.target = self.aa
2358 self.secrets = (self.write_enabler("we_secret"),
2359 self.renew_secret("renew_secret"),
2360 self.cancel_secret("cancel_secret"))
2361 self.segment = "aaaaaa"
2363 self.salt = "a" * 16
2364 self.block_hash = "a" * 32
2365 self.block_hash_tree = [self.block_hash for i in xrange(6)]
2366 self.share_hash = self.block_hash
2367 self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
2368 self.signature = "foobarbaz"
2369 self.verification_key = "vvvvvv"
2370 self.encprivkey = "private"
2371 self.root_hash = self.block_hash
2372 self.salt_hash = self.root_hash
2373 self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
2374 self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
2375 self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
2376 # blockhashes and salt hashes are serialized in the same way,
2377 # only we lop off the first element and store that in the
2379 self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
2381 def write_enabler(self, we_tag):
2382 return hashutil.tagged_hash("we_blah", we_tag)
2384 def renew_secret(self, tag):
2385 return hashutil.tagged_hash("renew_blah", str(tag))
2387 def cancel_secret(self, tag):
2388 return hashutil.tagged_hash("cancel_blah", str(tag))
2390 def build_test_mdmf_share(self, tail_segment=False, empty=False):
2391 # Start with the checkstring
2392 data = struct.pack(">BQ32s",
2396 self.checkstring = data
2397 # Next, the encoding parameters
2399 data += struct.pack(">BBQQ",
2405 data += struct.pack(">BBQQ",
2411 data += struct.pack(">BBQQ",
2416 # Now we'll build the offsets.
2418 if not tail_segment and not empty:
2420 sharedata += self.salt + self.block
2423 sharedata += self.salt + self.block
2424 sharedata += self.salt + "a"
2426 # The encrypted private key comes after the shares + salts
2427 offset_size = struct.calcsize(MDMFOFFSETS)
2428 encrypted_private_key_offset = len(data) + offset_size
2429 # The share has chain comes after the private key
2430 sharehashes_offset = encrypted_private_key_offset + \
2431 len(self.encprivkey)
2433 # The signature comes after the share hash chain.
2434 signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
2436 verification_key_offset = signature_offset + len(self.signature)
2437 verification_key_end = verification_key_offset + \
2438 len(self.verification_key)
2440 share_data_offset = offset_size
2441 share_data_offset += PRIVATE_KEY_SIZE
2442 share_data_offset += SIGNATURE_SIZE
2443 share_data_offset += VERIFICATION_KEY_SIZE
2444 share_data_offset += SHARE_HASH_CHAIN_SIZE
2446 blockhashes_offset = share_data_offset + len(sharedata)
2447 eof_offset = blockhashes_offset + len(self.block_hash_tree_s)
2449 data += struct.pack(MDMFOFFSETS,
2450 encrypted_private_key_offset,
2453 verification_key_offset,
2454 verification_key_end,
2460 self.offsets['enc_privkey'] = encrypted_private_key_offset
2461 self.offsets['block_hash_tree'] = blockhashes_offset
2462 self.offsets['share_hash_chain'] = sharehashes_offset
2463 self.offsets['signature'] = signature_offset
2464 self.offsets['verification_key'] = verification_key_offset
2465 self.offsets['share_data'] = share_data_offset
2466 self.offsets['verification_key_end'] = verification_key_end
2467 self.offsets['EOF'] = eof_offset
2470 data += self.encprivkey
2472 data += self.share_hash_chain_s
2474 data += self.signature
2475 # and the verification key
2476 data += self.verification_key
2477 # Then we'll add in gibberish until we get to the right point.
2478 nulls = "".join([" " for i in xrange(len(data), share_data_offset)])
2481 # Then the share data
2484 data += self.block_hash_tree_s
2487 def write_test_share_to_server(self,
2492 I write some data for the read tests to read to self.aa
2494 If tail_segment=True, then I will write a share that has a
2495 smaller tail segment than other segments.
2497 write = self.aa.remote_slot_testv_and_readv_and_writev
2498 data = self.build_test_mdmf_share(tail_segment, empty)
2499 # Finally, we write the whole thing to the storage server in one
2501 testvs = [(0, 1, "eq", "")]
2503 tws[0] = (testvs, [(0, data)], None)
2505 d = write(storage_index, self.secrets, tws, readv)
2506 d.addCallback(lambda res: self.failUnless(res[0]))
2509 def build_test_sdmf_share(self, empty=False):
2513 sharedata = self.segment * 6
2514 self.sharedata = sharedata
2515 blocksize = len(sharedata) / 3
2516 block = sharedata[:blocksize]
2517 self.blockdata = block
2518 prefix = struct.pack(">BQ32s16s BBQQ",
2528 post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
2529 signature_offset = post_offset + len(self.verification_key)
2530 sharehashes_offset = signature_offset + len(self.signature)
2531 blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
2532 sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
2533 encprivkey_offset = sharedata_offset + len(block)
2534 eof_offset = encprivkey_offset + len(self.encprivkey)
2535 offsets = struct.pack(">LLLLQQ",
2542 final_share = "".join([prefix,
2544 self.verification_key,
2546 self.share_hash_chain_s,
2547 self.block_hash_tree_s,
2551 self.offsets['signature'] = signature_offset
2552 self.offsets['share_hash_chain'] = sharehashes_offset
2553 self.offsets['block_hash_tree'] = blockhashes_offset
2554 self.offsets['share_data'] = sharedata_offset
2555 self.offsets['enc_privkey'] = encprivkey_offset
2556 self.offsets['EOF'] = eof_offset
2559 def write_sdmf_share_to_server(self,
2562 # Some tests need SDMF shares to verify that we can still
2563 # read them. This method writes one, which resembles but is not
2565 write = self.aa.remote_slot_testv_and_readv_and_writev
2566 share = self.build_test_sdmf_share(empty)
2567 testvs = [(0, 1, "eq", "")]
2569 tws[0] = (testvs, [(0, share)], None)
2571 d = write(storage_index, self.secrets, tws, readv)
2572 d.addCallback(lambda res: self.failUnless(res[0]))
2576 def test_read(self):
2577 self.init("test_read")
2579 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2580 d = defer.succeed(None)
2581 d.addCallback(lambda ign: self.write_test_share_to_server("si1"))
2583 # Check that every method equals what we expect it to.
2584 def _check_block_and_salt((block, salt)):
2585 self.failUnlessEqual(block, self.block)
2586 self.failUnlessEqual(salt, self.salt)
2589 d.addCallback(lambda ignored, i=i:
2590 mr.get_block_and_salt(i))
2591 d.addCallback(_check_block_and_salt)
2593 d.addCallback(lambda ignored:
2594 mr.get_encprivkey())
2595 d.addCallback(lambda encprivkey:
2596 self.failUnlessEqual(self.encprivkey, encprivkey))
2598 d.addCallback(lambda ignored:
2599 mr.get_blockhashes())
2600 d.addCallback(lambda blockhashes:
2601 self.failUnlessEqual(self.block_hash_tree, blockhashes))
2603 d.addCallback(lambda ignored:
2604 mr.get_sharehashes())
2605 d.addCallback(lambda sharehashes:
2606 self.failUnlessEqual(self.share_hash_chain, sharehashes))
2608 d.addCallback(lambda ignored:
2610 d.addCallback(lambda signature:
2611 self.failUnlessEqual(signature, self.signature))
2613 d.addCallback(lambda ignored:
2614 mr.get_verification_key())
2615 d.addCallback(lambda verification_key:
2616 self.failUnlessEqual(verification_key, self.verification_key))
2618 d.addCallback(lambda ignored:
2620 d.addCallback(lambda seqnum:
2621 self.failUnlessEqual(seqnum, 0))
2623 d.addCallback(lambda ignored:
2625 d.addCallback(lambda root_hash:
2626 self.failUnlessEqual(self.root_hash, root_hash))
2628 d.addCallback(lambda ignored:
2630 d.addCallback(lambda seqnum:
2631 self.failUnlessEqual(0, seqnum))
2633 d.addCallback(lambda ignored:
2634 mr.get_encoding_parameters())
2635 def _check_encoding_parameters((k, n, segsize, datalen)):
2636 self.failUnlessEqual(k, 3)
2637 self.failUnlessEqual(n, 10)
2638 self.failUnlessEqual(segsize, 6)
2639 self.failUnlessEqual(datalen, 36)
2640 d.addCallback(_check_encoding_parameters)
2642 d.addCallback(lambda ignored:
2643 mr.get_checkstring())
2644 d.addCallback(lambda checkstring:
2645 self.failUnlessEqual(checkstring, checkstring))
2648 def test_read_with_different_tail_segment_size(self):
2649 self.init("test_read_with_different_tail_segment_size")
2651 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2652 d = defer.succeed(None)
2653 d.addCallback(lambda ign: self.write_test_share_to_server("si1", tail_segment=True))
2655 d.addCallback(lambda ign: mr.get_block_and_salt(5))
2656 def _check_tail_segment(results):
2657 block, salt = results
2658 self.failUnlessEqual(len(block), 1)
2659 self.failUnlessEqual(block, "a")
2660 d.addCallback(_check_tail_segment)
2663 def test_get_block_with_invalid_segnum(self):
2664 self.init("test_get_block_with_invalid_segnum")
2666 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2667 d = defer.succeed(None)
2668 d.addCallback(lambda ign: self.write_test_share_to_server("si1"))
2669 d.addCallback(lambda ignored:
2670 self.shouldFail(LayoutInvalid, "test invalid segnum",
2672 mr.get_block_and_salt, 7))
2675 def test_get_encoding_parameters_first(self):
2676 self.init("test_get_encoding_parameters_first")
2678 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2679 d = defer.succeed(None)
2680 d.addCallback(lambda ign: self.write_test_share_to_server("si1"))
2681 d.addCallback(lambda ign: mr.get_encoding_parameters())
2682 def _check_encoding_parameters((k, n, segment_size, datalen)):
2683 self.failUnlessEqual(k, 3)
2684 self.failUnlessEqual(n, 10)
2685 self.failUnlessEqual(segment_size, 6)
2686 self.failUnlessEqual(datalen, 36)
2687 d.addCallback(_check_encoding_parameters)
2690 def test_get_seqnum_first(self):
2691 self.init("test_get_seqnum_first")
2693 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2694 d = defer.succeed(None)
2695 d.addCallback(lambda ign: self.write_test_share_to_server("si1"))
2696 d.addCallback(lambda ign: mr.get_seqnum())
2697 d.addCallback(lambda seqnum:
2698 self.failUnlessEqual(seqnum, 0))
2701 def test_get_root_hash_first(self):
2702 self.init("test_root_hash_first")
2704 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2705 d = defer.succeed(None)
2706 d.addCallback(lambda ign: self.write_test_share_to_server("si1"))
2707 d.addCallback(lambda ign: mr.get_root_hash())
2708 d.addCallback(lambda root_hash:
2709 self.failUnlessEqual(root_hash, self.root_hash))
2712 def test_get_checkstring_first(self):
2713 self.init("test_checkstring_first")
2715 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2716 d = defer.succeed(None)
2717 d.addCallback(lambda ign: self.write_test_share_to_server("si1"))
2718 d.addCallback(lambda ign: mr.get_checkstring())
2719 d.addCallback(lambda checkstring:
2720 self.failUnlessEqual(checkstring, self.checkstring))
2723 def test_write_read_vectors(self):
2724 self.init("test_write_read_vectors")
2726 # When writing for us, the storage server will return to us a
2727 # read vector, along with its result. If a write fails because
2728 # the test vectors failed, this read vector can help us to
2729 # diagnose the problem. This test ensures that the read vector
2730 # is working appropriately.
2731 mw = self._make_new_mw("si1", 0)
2734 mw.put_block(self.block, i, self.salt)
2735 mw.put_encprivkey(self.encprivkey)
2736 mw.put_blockhashes(self.block_hash_tree)
2737 mw.put_sharehashes(self.share_hash_chain)
2738 mw.put_root_hash(self.root_hash)
2739 mw.put_signature(self.signature)
2740 mw.put_verification_key(self.verification_key)
2742 d = mw.finish_publishing()
2744 self.failUnless(len(results), 2)
2745 result, readv = results
2746 self.failUnless(result)
2748 self.old_checkstring = mw.get_checkstring()
2749 mw.set_checkstring("")
2750 d.addCallback(_then)
2751 d.addCallback(lambda ignored:
2752 mw.finish_publishing())
2753 def _then_again(results):
2754 self.failUnlessEqual(len(results), 2)
2755 result, readvs = results
2757 self.failUnlessIn(0, readvs)
2758 readv = readvs[0][0]
2759 self.failUnlessEqual(readv, self.old_checkstring)
2760 d.addCallback(_then_again)
2761 # The checkstring remains the same for the rest of the process.
2764 def test_private_key_after_share_hash_chain(self):
2765 self.init("test_private_key_after_share_hash_chain")
2767 mw = self._make_new_mw("si1", 0)
2768 d = defer.succeed(None)
2770 d.addCallback(lambda ignored, i=i:
2771 mw.put_block(self.block, i, self.salt))
2772 d.addCallback(lambda ignored:
2773 mw.put_encprivkey(self.encprivkey))
2774 d.addCallback(lambda ignored:
2775 mw.put_sharehashes(self.share_hash_chain))
2777 # Now try to put the private key again.
2778 d.addCallback(lambda ignored:
2779 self.shouldFail(LayoutInvalid, "test repeat private key",
2781 mw.put_encprivkey, self.encprivkey))
2784 def test_signature_after_verification_key(self):
2785 self.init("test_signature_after_verification_key")
2787 mw = self._make_new_mw("si1", 0)
2788 d = defer.succeed(None)
2789 # Put everything up to and including the verification key.
2791 d.addCallback(lambda ignored, i=i:
2792 mw.put_block(self.block, i, self.salt))
2793 d.addCallback(lambda ignored:
2794 mw.put_encprivkey(self.encprivkey))
2795 d.addCallback(lambda ignored:
2796 mw.put_blockhashes(self.block_hash_tree))
2797 d.addCallback(lambda ignored:
2798 mw.put_sharehashes(self.share_hash_chain))
2799 d.addCallback(lambda ignored:
2800 mw.put_root_hash(self.root_hash))
2801 d.addCallback(lambda ignored:
2802 mw.put_signature(self.signature))
2803 d.addCallback(lambda ignored:
2804 mw.put_verification_key(self.verification_key))
2805 # Now try to put the signature again. This should fail
2806 d.addCallback(lambda ignored:
2807 self.shouldFail(LayoutInvalid, "signature after verification",
2809 mw.put_signature, self.signature))
2812 def test_uncoordinated_write(self):
2813 self.init("test_uncoordinated_write")
2815 # Make two mutable writers, both pointing to the same storage
2816 # server, both at the same storage index, and try writing to the
2818 mw1 = self._make_new_mw("si1", 0)
2819 mw2 = self._make_new_mw("si1", 0)
2821 def _check_success(results):
2822 result, readvs = results
2823 self.failUnless(result)
2825 def _check_failure(results):
2826 result, readvs = results
2829 def _write_share(mw):
2831 mw.put_block(self.block, i, self.salt)
2832 mw.put_encprivkey(self.encprivkey)
2833 mw.put_blockhashes(self.block_hash_tree)
2834 mw.put_sharehashes(self.share_hash_chain)
2835 mw.put_root_hash(self.root_hash)
2836 mw.put_signature(self.signature)
2837 mw.put_verification_key(self.verification_key)
2838 return mw.finish_publishing()
2839 d = _write_share(mw1)
2840 d.addCallback(_check_success)
2841 d.addCallback(lambda ignored:
2843 d.addCallback(_check_failure)
2846 def test_invalid_salt_size(self):
2847 self.init("test_invalid_salt_size")
2849 # Salts need to be 16 bytes in size. Writes that attempt to
2850 # write more or less than this should be rejected.
2851 mw = self._make_new_mw("si1", 0)
2852 invalid_salt = "a" * 17 # 17 bytes
2853 another_invalid_salt = "b" * 15 # 15 bytes
2854 d = defer.succeed(None)
2855 d.addCallback(lambda ignored:
2856 self.shouldFail(LayoutInvalid, "salt too big",
2858 mw.put_block, self.block, 0, invalid_salt))
2859 d.addCallback(lambda ignored:
2860 self.shouldFail(LayoutInvalid, "salt too small",
2862 mw.put_block, self.block, 0,
2863 another_invalid_salt))
2866 def test_write_test_vectors(self):
2867 self.init("test_write_test_vectors")
2869 # If we give the write proxy a bogus test vector at
2870 # any point during the process, it should fail to write when we
2872 def _check_failure(results):
2873 self.failUnlessEqual(len(results), 2)
2877 def _check_success(results):
2878 self.failUnlessEqual(len(results), 2)
2880 self.failUnless(results)
2882 mw = self._make_new_mw("si1", 0)
2883 mw.set_checkstring("this is a lie")
2885 mw.put_block(self.block, i, self.salt)
2886 mw.put_encprivkey(self.encprivkey)
2887 mw.put_blockhashes(self.block_hash_tree)
2888 mw.put_sharehashes(self.share_hash_chain)
2889 mw.put_root_hash(self.root_hash)
2890 mw.put_signature(self.signature)
2891 mw.put_verification_key(self.verification_key)
2892 d = mw.finish_publishing()
2893 d.addCallback(_check_failure)
2894 d.addCallback(lambda ignored:
2895 mw.set_checkstring(""))
2896 d.addCallback(lambda ignored:
2897 mw.finish_publishing())
2898 d.addCallback(_check_success)
2902 def serialize_blockhashes(self, blockhashes):
2903 return "".join(blockhashes)
2905 def serialize_sharehashes(self, sharehashes):
2906 ret = "".join([struct.pack(">H32s", i, sharehashes[i])
2907 for i in sorted(sharehashes.keys())])
2911 def test_write(self):
2912 self.init("test_write")
2914 # This translates to a file with 6 6-byte segments, and with 2-byte
2916 mw = self._make_new_mw("si1", 0)
2917 # Test writing some blocks.
2918 read = self.aa.remote_slot_readv
2919 expected_private_key_offset = struct.calcsize(MDMFHEADER)
2920 expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
2921 PRIVATE_KEY_SIZE + \
2923 VERIFICATION_KEY_SIZE + \
2924 SHARE_HASH_CHAIN_SIZE
2925 written_block_size = 2 + len(self.salt)
2926 written_block = self.block + self.salt
2928 mw.put_block(self.block, i, self.salt)
2930 mw.put_encprivkey(self.encprivkey)
2931 mw.put_blockhashes(self.block_hash_tree)
2932 mw.put_sharehashes(self.share_hash_chain)
2933 mw.put_root_hash(self.root_hash)
2934 mw.put_signature(self.signature)
2935 mw.put_verification_key(self.verification_key)
2937 d = mw.finish_publishing()
2938 d.addCallback(lambda (result, ign): self.failUnless(result, "publish failed"))
2941 d.addCallback(lambda ign, i=i: read("si1", [0],
2942 [(expected_sharedata_offset + (i * written_block_size),
2943 written_block_size)]))
2944 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [written_block]}))
2946 d.addCallback(lambda ign: self.failUnlessEqual(len(self.encprivkey), 7))
2947 d.addCallback(lambda ign: read("si1", [0], [(expected_private_key_offset, 7)]))
2948 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [self.encprivkey]}))
2950 expected_block_hash_offset = expected_sharedata_offset + (6 * written_block_size)
2951 d.addCallback(lambda ign: self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6))
2952 d.addCallback(lambda ign, ebho=expected_block_hash_offset:
2953 read("si1", [0], [(ebho, 32 * 6)]))
2954 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [self.block_hash_tree_s]}))
2956 expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey)
2957 d.addCallback(lambda ign, esho=expected_share_hash_offset:
2958 read("si1", [0], [(esho, (32 + 2) * 6)]))
2959 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [self.share_hash_chain_s]}))
2961 d.addCallback(lambda ign: read("si1", [0], [(9, 32)]))
2962 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [self.root_hash]}))
2964 expected_signature_offset = expected_share_hash_offset + len(self.share_hash_chain_s)
2965 d.addCallback(lambda ign: self.failUnlessEqual(len(self.signature), 9))
2966 d.addCallback(lambda ign, esigo=expected_signature_offset:
2967 read("si1", [0], [(esigo, 9)]))
2968 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [self.signature]}))
2970 expected_verification_key_offset = expected_signature_offset + len(self.signature)
2971 d.addCallback(lambda ign: self.failUnlessEqual(len(self.verification_key), 6))
2972 d.addCallback(lambda ign, evko=expected_verification_key_offset:
2973 read("si1", [0], [(evko, 6)]))
2974 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [self.verification_key]}))
2976 def _check_other_fields(ign, ebho=expected_block_hash_offset,
2977 esho=expected_share_hash_offset,
2978 esigo=expected_signature_offset,
2979 evko=expected_verification_key_offset):
2980 signable = mw.get_signable()
2981 verno, seq, roothash, k, N, segsize, datalen = struct.unpack(">BQ32sBBQQ",
2983 self.failUnlessEqual(verno, 1)
2984 self.failUnlessEqual(seq, 0)
2985 self.failUnlessEqual(roothash, self.root_hash)
2986 self.failUnlessEqual(k, 3)
2987 self.failUnlessEqual(N, 10)
2988 self.failUnlessEqual(segsize, 6)
2989 self.failUnlessEqual(datalen, 36)
2991 def _check_field(res, offset, fmt, which, value):
2992 encoded = struct.pack(fmt, value)
2993 d3 = defer.succeed(None)
2994 d3.addCallback(lambda ign: read("si1", [0], [(offset, len(encoded))]))
2995 d3.addCallback(lambda res: self.failUnlessEqual(res, {0: [encoded]}, which))
2998 d2 = defer.succeed(None)
2999 d2.addCallback(_check_field, 0, ">B", "version number", verno)
3000 d2.addCallback(_check_field, 1, ">Q", "sequence number", seq)
3001 d2.addCallback(_check_field, 41, ">B", "k", k)
3002 d2.addCallback(_check_field, 42, ">B", "N", N)
3003 d2.addCallback(_check_field, 43, ">Q", "segment size", segsize)
3004 d2.addCallback(_check_field, 51, ">Q", "data length", datalen)
3005 d2.addCallback(_check_field, 59, ">Q", "private key offset",
3006 expected_private_key_offset)
3007 d2.addCallback(_check_field, 67, ">Q", "share hash offset", esho)
3008 d2.addCallback(_check_field, 75, ">Q", "signature offset", esigo)
3009 d2.addCallback(_check_field, 83, ">Q", "verification key offset", evko)
3010 d2.addCallback(_check_field, 91, ">Q", "end of verification key",
3011 evko + len(self.verification_key))
3012 d2.addCallback(_check_field, 99, ">Q", "sharedata offset",
3013 expected_sharedata_offset)
3014 d2.addCallback(_check_field, 107, ">Q", "block hash offset", ebho)
3015 d2.addCallback(_check_field, 115, ">Q", "eof offset",
3016 ebho + len(self.block_hash_tree_s))
3018 d.addCallback(_check_other_fields)
3023 def _make_new_mw(self, si, share, datalength=36):
3024 # This is a file of size 36 bytes. Since it has a segment
3025 # size of 6, we know that it has 6 byte segments, which will
3026 # be split into blocks of 2 bytes because our FEC k
3028 mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
3032 def test_write_rejected_with_too_many_blocks(self):
3033 self.init("test_write_rejected_with_too_many_blocks")
3035 mw = self._make_new_mw("si0", 0)
3037 # Try writing too many blocks. We should not be able to write
3039 # blocks into each share.
3040 d = defer.succeed(None)
3042 d.addCallback(lambda ignored, i=i:
3043 mw.put_block(self.block, i, self.salt))
3044 d.addCallback(lambda ignored:
3045 self.shouldFail(LayoutInvalid, "too many blocks",
3047 mw.put_block, self.block, 7, self.salt))
3050 def test_write_rejected_with_invalid_salt(self):
3051 self.init("test_write_rejected_with_invalid_salt")
3053 # Try writing an invalid salt. Salts are 16 bytes -- any more or
3054 # less should cause an error.
3055 mw = self._make_new_mw("si1", 0)
3056 bad_salt = "a" * 17 # 17 bytes
3057 d = defer.succeed(None)
3058 d.addCallback(lambda ignored:
3059 self.shouldFail(LayoutInvalid, "test_invalid_salt",
3060 None, mw.put_block, self.block, 7, bad_salt))
3063 def test_write_rejected_with_invalid_root_hash(self):
3064 self.init("test_write_rejected_with_invalid_root_hash")
3066 # Try writing an invalid root hash. This should be SHA256d, and
3067 # 32 bytes long as a result.
3068 mw = self._make_new_mw("si2", 0)
3069 # 17 bytes != 32 bytes
3070 invalid_root_hash = "a" * 17
3071 d = defer.succeed(None)
3072 # Before this test can work, we need to put some blocks + salts,
3073 # a block hash tree, and a share hash tree. Otherwise, we'll see
3074 # failures that match what we are looking for, but are caused by
3075 # the constraints imposed on operation ordering.
3077 d.addCallback(lambda ignored, i=i:
3078 mw.put_block(self.block, i, self.salt))
3079 d.addCallback(lambda ignored:
3080 mw.put_encprivkey(self.encprivkey))
3081 d.addCallback(lambda ignored:
3082 mw.put_blockhashes(self.block_hash_tree))
3083 d.addCallback(lambda ignored:
3084 mw.put_sharehashes(self.share_hash_chain))
3085 d.addCallback(lambda ignored:
3086 self.shouldFail(LayoutInvalid, "invalid root hash",
3087 None, mw.put_root_hash, invalid_root_hash))
3090 def test_write_rejected_with_invalid_blocksize(self):
3091 self.init("test_write_rejected_with_invalid_blocksize")
3093 # The blocksize implied by the writer that we get from
3094 # _make_new_mw is 2bytes -- any more or any less than this
3095 # should be cause for failure, unless it is the tail segment, in
3096 # which case it may not be failure.
3098 mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
3100 # 1 bytes != 2 bytes
3101 d = defer.succeed(None)
3102 d.addCallback(lambda ignored, invalid_block=invalid_block:
3103 self.shouldFail(LayoutInvalid, "test blocksize too small",
3104 None, mw.put_block, invalid_block, 0,
3106 invalid_block = invalid_block * 3
3107 # 3 bytes != 2 bytes
3108 d.addCallback(lambda ignored:
3109 self.shouldFail(LayoutInvalid, "test blocksize too large",
3111 mw.put_block, invalid_block, 0, self.salt))
3113 d.addCallback(lambda ignored, i=i:
3114 mw.put_block(self.block, i, self.salt))
3115 # Try to put an invalid tail segment
3116 d.addCallback(lambda ignored:
3117 self.shouldFail(LayoutInvalid, "test invalid tail segment",
3119 mw.put_block, self.block, 5, self.salt))
3121 d.addCallback(lambda ignored:
3122 mw.put_block(valid_block, 5, self.salt))
3125 def test_write_enforces_order_constraints(self):
3126 self.init("test_write_enforces_order_constraints")
3128 # We require that the MDMFSlotWriteProxy be interacted with in a
3132 # 1: write blocks and salts
3133 # 2: Write the encrypted private key
3134 # 3: Write the block hashes
3135 # 4: Write the share hashes
3136 # 5: Write the root hash and salt hash
3137 # 6: Write the signature and verification key
3138 # 7: Write the file.
3140 # Some of these can be performed out-of-order, and some can't.
3141 # The dependencies that I want to test here are:
3142 # - Private key before block hashes
3143 # - share hashes and block hashes before root hash
3144 # - root hash before signature
3145 # - signature before verification key
3146 mw0 = self._make_new_mw("si0", 0)
3148 d = defer.succeed(None)
3150 d.addCallback(lambda ign, i=i:
3151 mw0.put_block(self.block, i, self.salt))
3153 # Try to write the share hash chain without writing the
3154 # encrypted private key
3155 d.addCallback(lambda ignored:
3156 self.shouldFail(LayoutInvalid, "share hash chain before "
3159 lambda: mw0.put_sharehashes(self.share_hash_chain) ))
3161 # Write the private key.
3162 d.addCallback(lambda ign: mw0.put_encprivkey(self.encprivkey))
3164 # Now write the block hashes and try again
3165 d.addCallback(lambda ignored:
3166 mw0.put_blockhashes(self.block_hash_tree))
3168 # We haven't yet put the root hash on the share, so we shouldn't
3169 # be able to sign it.
3170 d.addCallback(lambda ignored:
3171 self.shouldFail(LayoutInvalid, "signature before root hash",
3173 lambda: mw0.put_signature(self.signature) ))
3175 d.addCallback(lambda ignored:
3176 self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
3178 # ..and, since that fails, we also shouldn't be able to put the
3180 d.addCallback(lambda ignored:
3181 self.shouldFail(LayoutInvalid, "key before signature",
3183 lambda: mw0.put_verification_key(self.verification_key) ))
3185 # Now write the share hashes.
3186 d.addCallback(lambda ign: mw0.put_sharehashes(self.share_hash_chain))
3188 # We should be able to write the root hash now too
3189 d.addCallback(lambda ign: mw0.put_root_hash(self.root_hash))
3191 # We should still be unable to put the verification key
3192 d.addCallback(lambda ignored:
3193 self.shouldFail(LayoutInvalid, "key before signature",
3195 lambda: mw0.put_verification_key(self.verification_key) ))
3197 d.addCallback(lambda ign: mw0.put_signature(self.signature))
3199 # We shouldn't be able to write the offsets to the remote server
3200 # until the offset table is finished; IOW, until we have written
3201 # the verification key.
3202 d.addCallback(lambda ignored:
3203 self.shouldFail(LayoutInvalid, "offsets before verification key",
3205 mw0.finish_publishing))
3207 d.addCallback(lambda ignored:
3208 mw0.put_verification_key(self.verification_key))
3211 def test_end_to_end(self):
3212 self.init("test_end_to_end")
3214 mw = self._make_new_mw("si1", 0)
3215 # Write a share using the mutable writer, and make sure that the
3216 # reader knows how to read everything back to us.
3217 d = defer.succeed(None)
3219 d.addCallback(lambda ignored, i=i:
3220 mw.put_block(self.block, i, self.salt))
3221 d.addCallback(lambda ignored:
3222 mw.put_encprivkey(self.encprivkey))
3223 d.addCallback(lambda ignored:
3224 mw.put_blockhashes(self.block_hash_tree))
3225 d.addCallback(lambda ignored:
3226 mw.put_sharehashes(self.share_hash_chain))
3227 d.addCallback(lambda ignored:
3228 mw.put_root_hash(self.root_hash))
3229 d.addCallback(lambda ignored:
3230 mw.put_signature(self.signature))
3231 d.addCallback(lambda ignored:
3232 mw.put_verification_key(self.verification_key))
3233 d.addCallback(lambda ignored:
3234 mw.finish_publishing())
3236 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3237 def _check_block_and_salt((block, salt)):
3238 self.failUnlessEqual(block, self.block)
3239 self.failUnlessEqual(salt, self.salt)
3242 d.addCallback(lambda ignored, i=i:
3243 mr.get_block_and_salt(i))
3244 d.addCallback(_check_block_and_salt)
3246 d.addCallback(lambda ignored:
3247 mr.get_encprivkey())
3248 d.addCallback(lambda encprivkey:
3249 self.failUnlessEqual(self.encprivkey, encprivkey))
3251 d.addCallback(lambda ignored:
3252 mr.get_blockhashes())
3253 d.addCallback(lambda blockhashes:
3254 self.failUnlessEqual(self.block_hash_tree, blockhashes))
3256 d.addCallback(lambda ignored:
3257 mr.get_sharehashes())
3258 d.addCallback(lambda sharehashes:
3259 self.failUnlessEqual(self.share_hash_chain, sharehashes))
3261 d.addCallback(lambda ignored:
3263 d.addCallback(lambda signature:
3264 self.failUnlessEqual(signature, self.signature))
3266 d.addCallback(lambda ignored:
3267 mr.get_verification_key())
3268 d.addCallback(lambda verification_key:
3269 self.failUnlessEqual(verification_key, self.verification_key))
3271 d.addCallback(lambda ignored:
3273 d.addCallback(lambda seqnum:
3274 self.failUnlessEqual(seqnum, 0))
3276 d.addCallback(lambda ignored:
3278 d.addCallback(lambda root_hash:
3279 self.failUnlessEqual(self.root_hash, root_hash))
3281 d.addCallback(lambda ignored:
3282 mr.get_encoding_parameters())
3283 def _check_encoding_parameters((k, n, segsize, datalen)):
3284 self.failUnlessEqual(k, 3)
3285 self.failUnlessEqual(n, 10)
3286 self.failUnlessEqual(segsize, 6)
3287 self.failUnlessEqual(datalen, 36)
3288 d.addCallback(_check_encoding_parameters)
3290 d.addCallback(lambda ignored:
3291 mr.get_checkstring())
3292 d.addCallback(lambda checkstring:
3293 self.failUnlessEqual(checkstring, mw.get_checkstring()))
3296 def test_is_sdmf(self):
3297 self.init("test_is_sdmf")
3299 # The MDMFSlotReadProxy should also know how to read SDMF files,
3300 # since it will encounter them on the grid. Callers use the
3301 # is_sdmf method to test this.
3302 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3303 d = defer.succeed(None)
3304 d.addCallback(lambda ign: self.write_sdmf_share_to_server("si1"))
3305 d.addCallback(lambda ign: mr.is_sdmf())
3306 d.addCallback(lambda issdmf: self.failUnless(issdmf))
3309 def test_reads_sdmf(self):
3310 self.init("test_reads_sdmf")
3312 # The slot read proxy should, naturally, know how to tell us
3313 # about data in the SDMF format
3314 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3315 d = defer.succeed(None)
3316 d.addCallback(lambda ign: self.write_sdmf_share_to_server("si1"))
3317 d.addCallback(lambda ign: mr.is_sdmf())
3318 d.addCallback(lambda issdmf: self.failUnless(issdmf))
3320 # What do we need to read?
3323 d.addCallback(lambda ignored:
3324 mr.get_block_and_salt(0))
3325 def _check_block_and_salt(results):
3326 block, salt = results
3327 # Our original file is 36 bytes long. Then each share is 12
3328 # bytes in size. The share is composed entirely of the
3329 # letter a. self.block contains 2 as, so 6 * self.block is
3330 # what we are looking for.
3331 self.failUnlessEqual(block, self.block * 6)
3332 self.failUnlessEqual(salt, self.salt)
3333 d.addCallback(_check_block_and_salt)
3336 d.addCallback(lambda ignored:
3337 mr.get_blockhashes())
3338 d.addCallback(lambda blockhashes:
3339 self.failUnlessEqual(self.block_hash_tree,
3343 d.addCallback(lambda ignored:
3344 mr.get_sharehashes())
3345 d.addCallback(lambda sharehashes:
3346 self.failUnlessEqual(self.share_hash_chain,
3349 d.addCallback(lambda ignored:
3350 mr.get_encprivkey())
3351 d.addCallback(lambda encprivkey:
3352 self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
3353 d.addCallback(lambda ignored:
3354 mr.get_verification_key())
3355 d.addCallback(lambda verification_key:
3356 self.failUnlessEqual(verification_key,
3357 self.verification_key,
3360 d.addCallback(lambda ignored:
3362 d.addCallback(lambda signature:
3363 self.failUnlessEqual(signature, self.signature, signature))
3365 # - The sequence number
3366 d.addCallback(lambda ignored:
3368 d.addCallback(lambda seqnum:
3369 self.failUnlessEqual(seqnum, 0, seqnum))
3372 d.addCallback(lambda ignored:
3374 d.addCallback(lambda root_hash:
3375 self.failUnlessEqual(root_hash, self.root_hash, root_hash))
3378 def test_only_reads_one_segment_sdmf(self):
3379 self.init("test_only_reads_one_segment_sdmf")
3381 # SDMF shares have only one segment, so it doesn't make sense to
3382 # read more segments than that. The reader should know this and
3383 # complain if we try to do that.
3384 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3385 d = defer.succeed(None)
3386 d.addCallback(lambda ign: self.write_sdmf_share_to_server("si1"))
3387 d.addCallback(lambda ign: mr.is_sdmf())
3388 d.addCallback(lambda issdmf: self.failUnless(issdmf))
3389 d.addCallback(lambda ignored:
3390 self.shouldFail(LayoutInvalid, "test bad segment",
3392 mr.get_block_and_salt, 1))
3395 def test_read_with_prefetched_mdmf_data(self):
3396 self.init("test_read_with_prefetched_mdmf_data")
3398 # The MDMFSlotReadProxy will prefill certain fields if you pass
3399 # it data that you have already fetched. This is useful for
3400 # cases like the Servermap, which prefetches ~2kb of data while
3401 # finding out which shares are on the remote peer so that it
3402 # doesn't waste round trips.
3403 mdmf_data = self.build_test_mdmf_share()
3404 def _make_mr(ignored, length):
3405 mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
3408 d = defer.succeed(None)
3409 d.addCallback(lambda ign: self.write_test_share_to_server("si1"))
3411 # This should be enough to fill in both the encoding parameters
3412 # and the table of offsets, which will complete the version
3413 # information tuple.
3414 d.addCallback(_make_mr, 123)
3415 d.addCallback(lambda mr:
3417 def _check_verinfo(verinfo):
3418 self.failUnless(verinfo)
3419 self.failUnlessEqual(len(verinfo), 9)
3429 self.failUnlessEqual(seqnum, 0)
3430 self.failUnlessEqual(root_hash, self.root_hash)
3431 self.failUnlessEqual(segsize, 6)
3432 self.failUnlessEqual(datalen, 36)
3433 self.failUnlessEqual(k, 3)
3434 self.failUnlessEqual(n, 10)
3435 expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
3443 self.failUnlessEqual(expected_prefix, prefix)
3444 self.failUnlessEqual(self.rref.read_count, 0)
3445 d.addCallback(_check_verinfo)
3447 # This is not enough data to read a block and a share, so the
3448 # wrapper should attempt to read this from the remote server.
3449 d.addCallback(_make_mr, 123)
3450 d.addCallback(lambda mr:
3451 mr.get_block_and_salt(0))
3452 def _check_block_and_salt((block, salt)):
3453 self.failUnlessEqual(block, self.block)
3454 self.failUnlessEqual(salt, self.salt)
3455 self.failUnlessEqual(self.rref.read_count, 1)
3457 # This should be enough data to read one block.
3458 d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140)
3459 d.addCallback(lambda mr:
3460 mr.get_block_and_salt(0))
3461 d.addCallback(_check_block_and_salt)
3464 def test_read_with_prefetched_sdmf_data(self):
3465 self.init("test_read_with_prefetched_sdmf_data")
3467 sdmf_data = self.build_test_sdmf_share()
3468 def _make_mr(ignored, length):
3469 mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
3472 d = defer.succeed(None)
3473 d.addCallback(lambda ign: self.write_sdmf_share_to_server("si1"))
3475 # This should be enough to get us the encoding parameters,
3476 # offset table, and everything else we need to build a verinfo
3478 d.addCallback(_make_mr, 123)
3479 d.addCallback(lambda mr:
3481 def _check_verinfo(verinfo):
3482 self.failUnless(verinfo)
3483 self.failUnlessEqual(len(verinfo), 9)
3493 self.failUnlessEqual(seqnum, 0)
3494 self.failUnlessEqual(root_hash, self.root_hash)
3495 self.failUnlessEqual(salt, self.salt)
3496 self.failUnlessEqual(segsize, 36)
3497 self.failUnlessEqual(datalen, 36)
3498 self.failUnlessEqual(k, 3)
3499 self.failUnlessEqual(n, 10)
3500 expected_prefix = struct.pack(SIGNED_PREFIX,
3509 self.failUnlessEqual(expected_prefix, prefix)
3510 self.failUnlessEqual(self.rref.read_count, 0)
3511 d.addCallback(_check_verinfo)
3512 # This shouldn't be enough to read any share data.
3513 d.addCallback(_make_mr, 123)
3514 d.addCallback(lambda mr:
3515 mr.get_block_and_salt(0))
3516 def _check_block_and_salt((block, salt)):
3517 self.failUnlessEqual(block, self.block * 6)
3518 self.failUnlessEqual(salt, self.salt)
3519 # TODO: Fix the read routine so that it reads only the data
3520 # that it has cached if it can't read all of it.
3521 self.failUnlessEqual(self.rref.read_count, 2)
3523 # This should be enough to read share data.
3524 d.addCallback(_make_mr, self.offsets['share_data'])
3525 d.addCallback(lambda mr:
3526 mr.get_block_and_salt(0))
3527 d.addCallback(_check_block_and_salt)
3530 def test_read_with_empty_mdmf_file(self):
3531 self.init("test_read_with_empty_mdmf_file")
3533 # Some tests upload a file with no contents to test things
3534 # unrelated to the actual handling of the content of the file.
3535 # The reader should behave intelligently in these cases.
3536 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3537 d = defer.succeed(None)
3538 d.addCallback(lambda ign: self.write_test_share_to_server("si1", empty=True))
3539 # We should be able to get the encoding parameters, and they
3540 # should be correct.
3541 d.addCallback(lambda ignored:
3542 mr.get_encoding_parameters())
3543 def _check_encoding_parameters(params):
3544 self.failUnlessEqual(len(params), 4)
3545 k, n, segsize, datalen = params
3546 self.failUnlessEqual(k, 3)
3547 self.failUnlessEqual(n, 10)
3548 self.failUnlessEqual(segsize, 0)
3549 self.failUnlessEqual(datalen, 0)
3550 d.addCallback(_check_encoding_parameters)
3552 # We should not be able to fetch a block, since there are no
3554 d.addCallback(lambda ignored:
3555 self.shouldFail(LayoutInvalid, "get block on empty file",
3557 mr.get_block_and_salt, 0))
3560 def test_read_with_empty_sdmf_file(self):
3561 self.init("test_read_with_empty_sdmf_file")
3563 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3564 d = defer.succeed(None)
3565 d.addCallback(lambda ign: self.write_sdmf_share_to_server("si1", empty=True))
3566 # We should be able to get the encoding parameters, and they
3568 d.addCallback(lambda ignored:
3569 mr.get_encoding_parameters())
3570 def _check_encoding_parameters(params):
3571 self.failUnlessEqual(len(params), 4)
3572 k, n, segsize, datalen = params
3573 self.failUnlessEqual(k, 3)
3574 self.failUnlessEqual(n, 10)
3575 self.failUnlessEqual(segsize, 0)
3576 self.failUnlessEqual(datalen, 0)
3577 d.addCallback(_check_encoding_parameters)
3579 # It does not make sense to get a block in this format, so we
3580 # should not be able to.
3581 d.addCallback(lambda ignored:
3582 self.shouldFail(LayoutInvalid, "get block on an empty file",
3584 mr.get_block_and_salt, 0))
3587 def test_verinfo_with_sdmf_file(self):
3588 self.init("test_verinfo_with_sdmf_file")
3590 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3591 # We should be able to get the version information.
3592 d = defer.succeed(None)
3593 d.addCallback(lambda ign: self.write_sdmf_share_to_server("si1"))
3594 d.addCallback(lambda ignored:
3596 def _check_verinfo(verinfo):
3597 self.failUnless(verinfo)
3598 self.failUnlessEqual(len(verinfo), 9)
3608 self.failUnlessEqual(seqnum, 0)
3609 self.failUnlessEqual(root_hash, self.root_hash)
3610 self.failUnlessEqual(salt, self.salt)
3611 self.failUnlessEqual(segsize, 36)
3612 self.failUnlessEqual(datalen, 36)
3613 self.failUnlessEqual(k, 3)
3614 self.failUnlessEqual(n, 10)
3615 expected_prefix = struct.pack(">BQ32s16s BBQQ",
3624 self.failUnlessEqual(prefix, expected_prefix)
3625 self.failUnlessEqual(offsets, self.offsets)
3626 d.addCallback(_check_verinfo)
3629 def test_verinfo_with_mdmf_file(self):
3630 self.init("test_verinfo_with_mdmf_file")
3632 mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3633 d = defer.succeed(None)
3634 d.addCallback(lambda ign: self.write_test_share_to_server("si1"))
3635 d.addCallback(lambda ignored:
3637 def _check_verinfo(verinfo):
3638 self.failUnless(verinfo)
3639 self.failUnlessEqual(len(verinfo), 9)
3649 self.failUnlessEqual(seqnum, 0)
3650 self.failUnlessEqual(root_hash, self.root_hash)
3652 self.failUnlessEqual(segsize, 6)
3653 self.failUnlessEqual(datalen, 36)
3654 self.failUnlessEqual(k, 3)
3655 self.failUnlessEqual(n, 10)
3656 expected_prefix = struct.pack(">BQ32s BBQQ",
3664 self.failUnlessEqual(prefix, expected_prefix)
3665 self.failUnlessEqual(offsets, self.offsets)
3666 d.addCallback(_check_verinfo)
3669 def test_sdmf_writer(self):
3670 self.init("test_sdmf_writer")
3672 # Go through the motions of writing an SDMF share to the storage
3673 # server. Then read the storage server to see that the share got
3674 # written in the way that we think it should have.
3676 # We do this first so that the necessary instance variables get
3677 # set the way we want them for the tests below.
3678 data = self.build_test_sdmf_share()
3679 sdmfr = SDMFSlotWriteProxy(0,
3684 # Put the block and salt.
3685 sdmfr.put_block(self.blockdata, 0, self.salt)
3687 # Put the encprivkey
3688 sdmfr.put_encprivkey(self.encprivkey)
3690 # Put the block and share hash chains
3691 sdmfr.put_blockhashes(self.block_hash_tree)
3692 sdmfr.put_sharehashes(self.share_hash_chain)
3693 sdmfr.put_root_hash(self.root_hash)
3696 sdmfr.put_signature(self.signature)
3698 # Put the verification key
3699 sdmfr.put_verification_key(self.verification_key)
3701 # Now check to make sure that nothing has been written yet.
3702 self.failUnlessEqual(self.rref.write_count, 0)
3704 # Now finish publishing
3705 d = sdmfr.finish_publishing()
3706 d.addCallback(lambda ign: self.failUnlessEqual(self.rref.write_count, 1))
3707 d.addCallback(lambda ign: self.aa.remote_slot_readv("si1", [0], [(0, len(data))]))
3708 d.addCallback(lambda res: self.failUnlessEqual(res, {0: [data]}))
3711 def test_sdmf_writer_preexisting_share(self):
3712 self.init("test_sdmf_writer_preexisting_share")
3714 data = self.build_test_sdmf_share()
3715 d = defer.succeed(None)
3716 d.addCallback(lambda ign: self.write_sdmf_share_to_server("si1"))
3718 # Now there is a share on the storage server. To successfully
3719 # write, we need to set the checkstring correctly. When we
3720 # don't, no write should occur.
3721 sdmfw = SDMFSlotWriteProxy(0,
3726 sdmfw.put_block(self.blockdata, 0, self.salt)
3728 # Put the encprivkey
3729 sdmfw.put_encprivkey(self.encprivkey)
3731 # Put the block and share hash chains
3732 sdmfw.put_blockhashes(self.block_hash_tree)
3733 sdmfw.put_sharehashes(self.share_hash_chain)
3736 sdmfw.put_root_hash(self.root_hash)
3739 sdmfw.put_signature(self.signature)
3741 # Put the verification key
3742 sdmfw.put_verification_key(self.verification_key)
3744 # We shouldn't have a checkstring yet
3745 self.failUnlessEqual(sdmfw.get_checkstring(), "")
3747 d2 = sdmfw.finish_publishing()
3749 self.failIf(results[0])
3750 # this is the correct checkstring
3751 self._expected_checkstring = results[1][0][0]
3752 return self._expected_checkstring
3753 d2.addCallback(_then)
3754 d2.addCallback(sdmfw.set_checkstring)
3755 d2.addCallback(lambda ign: sdmfw.get_checkstring())
3756 d2.addCallback(lambda checkstring: self.failUnlessEqual(checkstring,
3757 self._expected_checkstring))
3758 d2.addCallback(lambda ign: sdmfw.finish_publishing())
3759 d2.addCallback(lambda res: self.failUnless(res[0], res))
3760 d2.addCallback(lambda ign: self.aa.remote_slot_readv("si1", [0], [(1, 8)]))
3761 d2.addCallback(lambda res: self.failUnlessEqual(res, {0: [struct.pack(">Q", 1)]}))
3762 d2.addCallback(lambda ign: self.aa.remote_slot_readv("si1", [0], [(9, len(data) - 9)]))
3763 d2.addCallback(lambda res: self.failUnlessEqual(res, {0: [data[9:]]}))
3765 d.addCallback(_written)
3769 class Stats(WithDiskBackend, unittest.TestCase):
3770 def test_latencies(self):
3771 server = self.create("test_latencies")
3772 for i in range(10000):
3773 server.add_latency("allocate", 1.0 * i)
3774 for i in range(1000):
3775 server.add_latency("renew", 1.0 * i)
3777 server.add_latency("write", 1.0 * i)
3779 server.add_latency("cancel", 2.0 * i)
3780 server.add_latency("get", 5.0)
3782 output = server.get_latencies()
3784 self.failUnlessEqual(sorted(output.keys()),
3785 sorted(["allocate", "renew", "cancel", "write", "get"]))
3786 self.failUnlessEqual(len(server.latencies["allocate"]), 1000)
3787 self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1, output)
3788 self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
3789 self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
3790 self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
3791 self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
3792 self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
3793 self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
3794 self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
3796 self.failUnlessEqual(len(server.latencies["renew"]), 1000)
3797 self.failUnless(abs(output["renew"]["mean"] - 500) < 1, output)
3798 self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1, output)
3799 self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
3800 self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
3801 self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
3802 self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
3803 self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
3804 self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
3806 self.failUnlessEqual(len(server.latencies["write"]), 20)
3807 self.failUnless(abs(output["write"]["mean"] - 9) < 1, output)
3808 self.failUnless(output["write"]["01_0_percentile"] is None, output)
3809 self.failUnless(abs(output["write"]["10_0_percentile"] - 2) < 1, output)
3810 self.failUnless(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
3811 self.failUnless(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
3812 self.failUnless(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
3813 self.failUnless(output["write"]["99_0_percentile"] is None, output)
3814 self.failUnless(output["write"]["99_9_percentile"] is None, output)
3816 self.failUnlessEqual(len(server.latencies["cancel"]), 10)
3817 self.failUnless(abs(output["cancel"]["mean"] - 9) < 1, output)
3818 self.failUnless(output["cancel"]["01_0_percentile"] is None, output)
3819 self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1, output)
3820 self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
3821 self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
3822 self.failUnless(output["cancel"]["95_0_percentile"] is None, output)
3823 self.failUnless(output["cancel"]["99_0_percentile"] is None, output)
3824 self.failUnless(output["cancel"]["99_9_percentile"] is None, output)
3826 self.failUnlessEqual(len(server.latencies["get"]), 1)
3827 self.failUnless(output["get"]["mean"] is None, output)
3828 self.failUnless(output["get"]["01_0_percentile"] is None, output)
3829 self.failUnless(output["get"]["10_0_percentile"] is None, output)
3830 self.failUnless(output["get"]["50_0_percentile"] is None, output)
3831 self.failUnless(output["get"]["90_0_percentile"] is None, output)
3832 self.failUnless(output["get"]["95_0_percentile"] is None, output)
3833 self.failUnless(output["get"]["99_0_percentile"] is None, output)
3834 self.failUnless(output["get"]["99_9_percentile"] is None, output)
3838 s = re.sub(r'<[^>]*>', ' ', s)
3839 s = re.sub(r'\s+', ' ', s)
3843 class BucketCounterTest(WithDiskBackend, CrawlerTestMixin, ReallyEqualMixin, unittest.TestCase):
3844 def test_bucket_counter(self):
3845 server = self.create("test_bucket_counter", detached=True)
3846 bucket_counter = server.bucket_counter
3848 # finish as fast as possible
3849 bucket_counter.slow_start = 0
3850 bucket_counter.cpu_slice = 100.0
3852 d = server.bucket_counter.set_hook('after_prefix')
3854 server.setServiceParent(self.sparent)
3856 w = StorageStatus(server)
3858 # this sample is before the crawler has started doing anything
3859 html = w.renderSynchronously()
3860 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
3861 s = remove_tags(html)
3862 self.failUnlessIn("Accepting new shares: Yes", s)
3863 self.failUnlessIn("Reserved space: - 0 B (0)", s)
3864 self.failUnlessIn("Total sharesets: Not computed yet", s)
3865 self.failUnlessIn("Next crawl in", s)
3867 def _after_first_prefix(prefix):
3868 server.bucket_counter.save_state()
3869 state = bucket_counter.get_state()
3870 self.failUnlessEqual(prefix, state["last-complete-prefix"])
3871 self.failUnlessEqual(prefix, bucket_counter.prefixes[0])
3873 html = w.renderSynchronously()
3874 s = remove_tags(html)
3875 self.failUnlessIn(" Current crawl ", s)
3876 self.failUnlessIn(" (next work in ", s)
3878 return bucket_counter.set_hook('after_cycle')
3879 d.addCallback(_after_first_prefix)
3881 def _after_first_cycle(cycle):
3882 self.failUnlessEqual(cycle, 0)
3883 progress = bucket_counter.get_progress()
3884 self.failUnlessReallyEqual(progress["cycle-in-progress"], False)
3885 d.addCallback(_after_first_cycle)
3886 d.addBoth(self._wait_for_yield, bucket_counter)
3888 def _after_yield(ign):
3889 html = w.renderSynchronously()
3890 s = remove_tags(html)
3891 self.failUnlessIn("Total sharesets: 0 (the number of", s)
3892 self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
3893 d.addCallback(_after_yield)
3896 def test_bucket_counter_cleanup(self):
3897 server = self.create("test_bucket_counter_cleanup", detached=True)
3898 bucket_counter = server.bucket_counter
3900 # finish as fast as possible
3901 bucket_counter.slow_start = 0
3902 bucket_counter.cpu_slice = 100.0
3904 d = bucket_counter.set_hook('after_prefix')
3906 server.setServiceParent(self.sparent)
3908 def _after_first_prefix(prefix):
3909 bucket_counter.save_state()
3910 state = bucket_counter.state
3911 self.failUnlessEqual(prefix, state["last-complete-prefix"])
3912 self.failUnlessEqual(prefix, bucket_counter.prefixes[0])
3914 # now sneak in and mess with its state, to make sure it cleans up
3915 # properly at the end of the cycle
3916 state["bucket-counts"][-12] = {}
3917 bucket_counter.save_state()
3919 return bucket_counter.set_hook('after_cycle')
3920 d.addCallback(_after_first_prefix)
3922 def _after_first_cycle(cycle):
3923 self.failUnlessEqual(cycle, 0)
3924 progress = bucket_counter.get_progress()
3925 self.failUnlessReallyEqual(progress["cycle-in-progress"], False)
3927 s = bucket_counter.get_state()
3928 self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
3929 d.addCallback(_after_first_cycle)
3930 d.addBoth(self._wait_for_yield, bucket_counter)
3933 def test_bucket_counter_eta(self):
3934 server = self.create("test_bucket_counter_eta", detached=True)
3935 bucket_counter = server.bucket_counter
3937 # finish as fast as possible
3938 bucket_counter.slow_start = 0
3939 bucket_counter.cpu_slice = 100.0
3941 d = bucket_counter.set_hook('after_prefix')
3943 server.setServiceParent(self.sparent)
3945 w = StorageStatus(server)
3947 def _check_1(prefix1):
3948 # no ETA is available yet
3949 html = w.renderSynchronously()
3950 s = remove_tags(html)
3951 self.failUnlessIn("complete (next work", s)
3953 return bucket_counter.set_hook('after_prefix')
3954 d.addCallback(_check_1)
3956 def _check_2(prefix2):
3957 # an ETA based upon elapsed time should be available.
3958 html = w.renderSynchronously()
3959 s = remove_tags(html)
3960 self.failUnlessIn("complete (ETA ", s)
3961 d.addCallback(_check_2)
3962 d.addBoth(self._wait_for_yield, bucket_counter)
3966 class AccountingCrawlerTest(CrawlerTestMixin, WebRenderingMixin, ReallyEqualMixin):
3967 def make_shares(self, server):
3968 aa = server.get_accountant().get_anonymous_account()
3969 sa = server.get_accountant().get_starter_account()
3972 return (si, hashutil.tagged_hash("renew", si),
3973 hashutil.tagged_hash("cancel", si))
3974 def make_mutable(si):
3975 return (si, hashutil.tagged_hash("renew", si),
3976 hashutil.tagged_hash("cancel", si),
3977 hashutil.tagged_hash("write-enabler", si))
3978 def make_extra_lease(si, num):
3979 return (hashutil.tagged_hash("renew-%d" % num, si),
3980 hashutil.tagged_hash("cancel-%d" % num, si))
3982 writev = aa.remote_slot_testv_and_readv_and_writev
3984 immutable_si_0, rs0, cs0 = make("\x00" * 16)
3985 immutable_si_1, rs1, cs1 = make("\x01" * 16)
3986 rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
3987 mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
3988 mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
3989 rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
3991 canary = FakeCanary()
3992 # note: 'tahoe debug dump-share' will not handle this file, since the
3993 # inner contents are not a valid CHK share
3994 data = "\xff" * 1000
3996 self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
3997 self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
3998 self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
4000 d = defer.succeed(None)
4001 d.addCallback(lambda ign: aa.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
4003 def _got_buckets( (a, w) ):
4004 w[0].remote_write(0, data)
4006 d.addCallback(_got_buckets)
4008 d.addCallback(lambda ign: aa.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
4010 d.addCallback(_got_buckets)
4011 d.addCallback(lambda ign: sa.remote_add_lease(immutable_si_1, rs1a, cs1a))
4013 d.addCallback(lambda ign: writev(mutable_si_2, (we2, rs2, cs2),
4014 {0: ([], [(0,data)], len(data))}, []))
4015 d.addCallback(lambda ign: writev(mutable_si_3, (we3, rs3, cs3),
4016 {0: ([], [(0,data)], len(data))}, []))
4017 d.addCallback(lambda ign: sa.remote_add_lease(mutable_si_3, rs3a, cs3a))
4020 def test_basic(self):
4021 server = self.create("test_basic", detached=True)
4023 ep = ExpirationPolicy(enabled=False)
4024 server.get_accountant().set_expiration_policy(ep)
4025 aa = server.get_accountant().get_anonymous_account()
4026 sa = server.get_accountant().get_starter_account()
4028 # finish as fast as possible
4029 ac = server.get_accounting_crawler()
4033 webstatus = StorageStatus(server)
4035 # create a few shares, with some leases on them
4036 d = self.make_shares(server)
4038 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
4040 if isinstance(server.backend, DiskBackend):
4041 # add a non-sharefile to exercise another code path
4042 fn = os.path.join(server.backend._sharedir,
4043 storage_index_to_dir(immutable_si_0),
4045 fileutil.write(fn, "I am not a share.\n")
4047 # this is before the crawl has started, so we're not in a cycle yet
4048 initial_state = ac.get_state()
4049 self.failIf(ac.get_progress()["cycle-in-progress"])
4050 self.failIfIn("cycle-to-date", initial_state)
4051 self.failIfIn("estimated-remaining-cycle", initial_state)
4052 self.failIfIn("estimated-current-cycle", initial_state)
4053 self.failUnlessIn("history", initial_state)
4054 self.failUnlessEqual(initial_state["history"], {})
4056 server.setServiceParent(self.sparent)
4060 # now examine the state right after the 'aa' prefix has been processed.
4061 d2 = self._after_prefix(None, 'aa', ac)
4062 def _after_aa_prefix(state):
4063 self.failUnlessIn("cycle-to-date", state)
4064 self.failUnlessIn("estimated-remaining-cycle", state)
4065 self.failUnlessIn("estimated-current-cycle", state)
4066 self.failUnlessIn("history", state)
4067 self.failUnlessEqual(state["history"], {})
4069 so_far = state["cycle-to-date"]
4070 self.failUnlessEqual(so_far["expiration-enabled"], False)
4071 self.failUnlessIn("configured-expiration-mode", so_far)
4072 self.failUnlessIn("lease-age-histogram", so_far)
4073 lah = so_far["lease-age-histogram"]
4074 self.failUnlessEqual(type(lah), list)
4075 self.failUnlessEqual(len(lah), 1)
4076 self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
4077 self.failUnlessEqual(so_far["corrupt-shares"], [])
4078 sr1 = so_far["space-recovered"]
4079 self.failUnlessEqual(sr1["examined-buckets"], 1)
4080 self.failUnlessEqual(sr1["examined-shares"], 1)
4081 self.failUnlessEqual(sr1["actual-shares"], 0)
4082 left = state["estimated-remaining-cycle"]
4083 sr2 = left["space-recovered"]
4084 self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
4085 self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
4086 self.failIfEqual(sr2["actual-shares"], None)
4087 d2.addCallback(_after_aa_prefix)
4089 d2.addCallback(lambda ign: self.render1(webstatus))
4090 def _check_html_in_cycle(html):
4091 s = remove_tags(html)
4092 self.failUnlessIn("So far, this cycle has examined "
4093 "1 shares in 1 sharesets (0 mutable / 1 immutable) ", s)
4094 self.failUnlessIn("and has recovered: "
4095 "0 shares, 0 sharesets (0 mutable / 0 immutable), "
4096 "0 B (0 B / 0 B)", s)
4098 return ac.set_hook('after_cycle')
4099 d2.addCallback(_check_html_in_cycle)
4101 def _after_first_cycle(cycle):
4102 # After the first cycle, nothing should have been removed.
4103 self.failUnlessEqual(cycle, 0)
4104 progress = ac.get_progress()
4105 self.failUnlessReallyEqual(progress["cycle-in-progress"], False)
4108 self.failIf("cycle-to-date" in s)
4109 self.failIf("estimated-remaining-cycle" in s)
4110 self.failIf("estimated-current-cycle" in s)
4111 last = s["history"][0]
4112 self.failUnlessEqual(type(last), dict, repr(last))
4113 self.failUnlessIn("cycle-start-finish-times", last)
4114 self.failUnlessEqual(type(last["cycle-start-finish-times"]), list, repr(last))
4115 self.failUnlessEqual(last["expiration-enabled"], False)
4116 self.failUnlessIn("configured-expiration-mode", last)
4118 self.failUnlessIn("lease-age-histogram", last)
4119 lah = last["lease-age-histogram"]
4120 self.failUnlessEqual(type(lah), list)
4121 self.failUnlessEqual(len(lah), 1)
4122 self.failUnlessEqual(tuple(lah[0]), (0.0, DAY, 6) )
4124 self.failUnlessEqual(last["corrupt-shares"], [])
4126 rec = last["space-recovered"]
4127 self.failUnlessEqual(rec["examined-buckets"], 4)
4128 self.failUnlessEqual(rec["examined-shares"], 4)
4129 self.failUnlessEqual(rec["actual-buckets"], 0)
4130 self.failUnlessEqual(rec["actual-shares"], 0)
4131 self.failUnlessEqual(rec["actual-diskbytes"], 0)
4133 def count_leases(si):
4134 return (len(aa.get_leases(si)), len(sa.get_leases(si)))
4135 self.failUnlessEqual(count_leases(immutable_si_0), (1, 0))
4136 self.failUnlessEqual(count_leases(immutable_si_1), (1, 1))
4137 self.failUnlessEqual(count_leases(mutable_si_2), (1, 0))
4138 self.failUnlessEqual(count_leases(mutable_si_3), (1, 1))
4139 d2.addCallback(_after_first_cycle)
4141 d2.addCallback(lambda ign: self.render1(webstatus))
4142 def _check_html_after_cycle(html):
4143 s = remove_tags(html)
4144 self.failUnlessIn("recovered: 0 shares, 0 sharesets "
4145 "(0 mutable / 0 immutable), 0 B (0 B / 0 B) ", s)
4146 self.failUnlessIn("and saw a total of 4 shares, 4 sharesets "
4147 "(2 mutable / 2 immutable),", s)
4148 self.failUnlessIn("but expiration was not enabled", s)
4149 d2.addCallback(_check_html_after_cycle)
4151 d2.addCallback(lambda ign: self.render_json(webstatus))
4152 def _check_json_after_cycle(json):
4153 data = simplejson.loads(json)
4154 self.failUnlessIn("lease-checker", data)
4155 self.failUnlessIn("lease-checker-progress", data)
4156 d2.addCallback(_check_json_after_cycle)
4157 d2.addBoth(self._wait_for_yield, ac)
4159 d.addCallback(_do_test)
4162 def _assert_sharecount(self, server, si, expected):
4163 d = defer.succeed(None)
4164 d.addCallback(lambda ign: server.backend.get_shareset(si).get_shares())
4165 def _got_shares( (shares, corrupted) ):
4166 self.failUnlessEqual(len(shares), expected, "share count for %r" % (si,))
4167 self.failUnlessEqual(len(corrupted), 0, str(corrupted))
4168 d.addCallback(_got_shares)
4171 def _assert_leasecount(self, server, si, expected):
4172 aa = server.get_accountant().get_anonymous_account()
4173 sa = server.get_accountant().get_starter_account()
4174 self.failUnlessEqual((len(aa.get_leases(si)), len(sa.get_leases(si))),
4177 def test_expire_age(self):
4178 server = self.create("test_expire_age", detached=True)
4180 # setting expiration_time to 2000 means that any lease which is more
4181 # than 2000s old will be expired.
4183 ep = ExpirationPolicy(enabled=True, mode="age", override_lease_duration=2000)
4184 server.get_accountant().set_expiration_policy(ep)
4185 aa = server.get_accountant().get_anonymous_account()
4186 sa = server.get_accountant().get_starter_account()
4188 # finish as fast as possible
4189 ac = server.get_accounting_crawler()
4193 webstatus = StorageStatus(server)
4195 # create a few shares, with some leases on them
4196 d = self.make_shares(server)
4198 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
4200 d2 = defer.succeed(None)
4201 d2.addCallback(lambda ign: self._assert_sharecount(server, immutable_si_0, 1))
4202 d2.addCallback(lambda ign: self._assert_leasecount(server, immutable_si_0, (1, 0)))
4203 d2.addCallback(lambda ign: self._assert_sharecount(server, immutable_si_1, 1))
4204 d2.addCallback(lambda ign: self._assert_leasecount(server, immutable_si_1, (1, 1)))
4205 d2.addCallback(lambda ign: self._assert_sharecount(server, mutable_si_2, 1))
4206 d2.addCallback(lambda ign: self._assert_leasecount(server, mutable_si_2, (1, 0)))
4207 d2.addCallback(lambda ign: self._assert_sharecount(server, mutable_si_3, 1))
4208 d2.addCallback(lambda ign: self._assert_leasecount(server, mutable_si_3, (1, 1)))
4211 # artificially crank back the renewal time on the first lease of each
4212 # share to 3000s ago, and set the expiration time to 31 days later.
4213 new_renewal_time = now - 3000
4214 new_expiration_time = new_renewal_time + 31*24*60*60
4216 # Some shares have an extra lease which is set to expire at the
4217 # default time in 31 days from now (age=31days). We then run the
4218 # crawler, which will expire the first lease, making some shares get
4219 # deleted and others stay alive (with one remaining lease)
4221 aa.add_or_renew_lease(immutable_si_0, 0, new_renewal_time, new_expiration_time)
4223 # immutable_si_1 gets an extra lease
4224 sa.add_or_renew_lease(immutable_si_1, 0, new_renewal_time, new_expiration_time)
4226 aa.add_or_renew_lease(mutable_si_2, 0, new_renewal_time, new_expiration_time)
4228 # mutable_si_3 gets an extra lease
4229 sa.add_or_renew_lease(mutable_si_3, 0, new_renewal_time, new_expiration_time)
4231 server.setServiceParent(self.sparent)
4233 # now examine the web status right after the 'aa' prefix has been processed.
4234 d3 = self._after_prefix(None, 'aa', ac)
4235 d3.addCallback(lambda ign: self.render1(webstatus))
4236 def _check_html_in_cycle(html):
4237 s = remove_tags(html)
4238 # the first shareset encountered gets deleted, and its prefix
4239 # happens to be about 1/5th of the way through the ring, so the
4240 # predictor thinks we'll have 5 shares and that we'll delete them
4241 # all. This part of the test depends upon the SIs landing right
4242 # where they do now.
4243 self.failUnlessIn("The remainder of this cycle is expected to "
4244 "recover: 4 shares, 4 sharesets", s)
4245 self.failUnlessIn("The whole cycle is expected to examine "
4246 "5 shares in 5 sharesets and to recover: "
4247 "5 shares, 5 sharesets", s)
4249 return ac.set_hook('after_cycle')
4250 d3.addCallback(_check_html_in_cycle)
4252 d3.addCallback(lambda ign: self._assert_sharecount(server, immutable_si_0, 0))
4253 d3.addCallback(lambda ign: self._assert_sharecount(server, immutable_si_1, 1))
4254 d3.addCallback(lambda ign: self._assert_leasecount(server, immutable_si_1, (1, 0)))
4255 d3.addCallback(lambda ign: self._assert_sharecount(server, mutable_si_2, 0))
4256 d3.addCallback(lambda ign: self._assert_sharecount(server, mutable_si_3, 1))
4257 d3.addCallback(lambda ign: self._assert_leasecount(server, mutable_si_3, (1, 0)))
4259 def _after_first_cycle(ignored):
4261 last = s["history"][0]
4263 self.failUnlessEqual(last["expiration-enabled"], True)
4264 cem = last["configured-expiration-mode"]
4265 self.failUnlessEqual(cem[0], "age")
4266 self.failUnlessEqual(cem[1], 2000)
4267 self.failUnlessEqual(cem[2], None)
4268 self.failUnlessEqual(cem[3][0], "mutable")
4269 self.failUnlessEqual(cem[3][1], "immutable")
4271 rec = last["space-recovered"]
4272 self.failUnlessEqual(rec["examined-buckets"], 4)
4273 self.failUnlessEqual(rec["examined-shares"], 4)
4274 self.failUnlessEqual(rec["actual-buckets"], 2)
4275 self.failUnlessEqual(rec["actual-shares"], 2)
4276 # different platforms have different notions of "blocks used by
4277 # this file", so merely assert that it's a number
4278 self.failUnless(rec["actual-diskbytes"] >= 0,
4279 rec["actual-diskbytes"])
4280 d3.addCallback(_after_first_cycle)
4282 d3.addCallback(lambda ign: self.render1(webstatus))
4283 def _check_html_after_cycle(html):
4284 s = remove_tags(html)
4285 self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
4286 self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
4287 self.failUnlessIn(" recovered: 2 shares, 2 sharesets (1 mutable / 1 immutable), ", s)
4288 d3.addCallback(_check_html_after_cycle)
4289 d3.addBoth(self._wait_for_yield, ac)
4291 d2.addCallback(_then)
4293 d.addCallback(_do_test)
4296 def test_expire_cutoff_date(self):
4297 server = self.create("test_expire_cutoff_date", detached=True)
4299 # setting cutoff-date to 2000 seconds ago means that any lease which
4300 # is more than 2000s old will be expired.
4302 then = int(now - 2000)
4303 ep = ExpirationPolicy(enabled=True, mode="cutoff-date", cutoff_date=then)
4304 server.get_accountant().set_expiration_policy(ep)
4305 aa = server.get_accountant().get_anonymous_account()
4306 sa = server.get_accountant().get_starter_account()
4308 # finish as fast as possible
4309 ac = server.get_accounting_crawler()
4313 webstatus = StorageStatus(server)
4315 # create a few shares, with some leases on them
4316 d = self.make_shares(server)
4318 [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
4320 d2 = defer.succeed(None)
4321 d2.addCallback(lambda ign: self._assert_sharecount(server, immutable_si_0, 1))
4322 d2.addCallback(lambda ign: self._assert_leasecount(server, immutable_si_0, (1, 0)))
4323 d2.addCallback(lambda ign: self._assert_sharecount(server, immutable_si_1, 1))
4324 d2.addCallback(lambda ign: self._assert_leasecount(server, immutable_si_1, (1, 1)))
4325 d2.addCallback(lambda ign: self._assert_sharecount(server, mutable_si_2, 1))
4326 d2.addCallback(lambda ign: self._assert_leasecount(server, mutable_si_2, (1, 0)))
4327 d2.addCallback(lambda ign: self._assert_sharecount(server, mutable_si_3, 1))
4328 d2.addCallback(lambda ign: self._assert_leasecount(server, mutable_si_3, (1, 1)))
4331 # artificially crank back the renewal time on the first lease of each
4332 # share to 3000s ago, and set the expiration time to 31 days later.
4333 new_renewal_time = now - 3000
4334 new_expiration_time = new_renewal_time + 31*24*60*60
4336 # Some shares have an extra lease which is set to expire at the
4337 # default time in 31 days from now (age=31days). We then run the
4338 # crawler, which will expire the first lease, making some shares get
4339 # deleted and others stay alive (with one remaining lease)
4341 aa.add_or_renew_lease(immutable_si_0, 0, new_renewal_time, new_expiration_time)
4343 # immutable_si_1 gets an extra lease
4344 sa.add_or_renew_lease(immutable_si_1, 0, new_renewal_time, new_expiration_time)
4346 aa.add_or_renew_lease(mutable_si_2, 0, new_renewal_time, new_expiration_time)
4348 # mutable_si_3 gets an extra lease
4349 sa.add_or_renew_lease(mutable_si_3, 0, new_renewal_time, new_expiration_time)
4351 server.setServiceParent(self.sparent)
4353 # now examine the web status right after the 'aa' prefix has been processed.
4354 d3 = self._after_prefix(None, 'aa', ac)
4355 d3.addCallback(lambda ign: self.render1(webstatus))
4356 def _check_html_in_cycle(html):
4357 s = remove_tags(html)
4358 # the first bucket encountered gets deleted, and its prefix
4359 # happens to be about 1/5th of the way through the ring, so the
4360 # predictor thinks we'll have 5 shares and that we'll delete them
4361 # all. This part of the test depends upon the SIs landing right
4362 # where they do now.
4363 self.failUnlessIn("The remainder of this cycle is expected to "
4364 "recover: 4 shares, 4 sharesets", s)
4365 self.failUnlessIn("The whole cycle is expected to examine "
4366 "5 shares in 5 sharesets and to recover: "
4367 "5 shares, 5 sharesets", s)
4369 return ac.set_hook('after_cycle')
4370 d3.addCallback(_check_html_in_cycle)
4372 d3.addCallback(lambda ign: self._assert_sharecount(server, immutable_si_0, 0))
4373 d3.addCallback(lambda ign: self._assert_sharecount(server, immutable_si_1, 1))
4374 d3.addCallback(lambda ign: self._assert_leasecount(server, immutable_si_1, (1, 0)))
4375 d3.addCallback(lambda ign: self._assert_sharecount(server, mutable_si_2, 0))
4376 d3.addCallback(lambda ign: self._assert_sharecount(server, mutable_si_3, 1))
4377 d3.addCallback(lambda ign: self._assert_leasecount(server, mutable_si_3, (1, 0)))
4379 def _after_first_cycle(ignored):
4381 last = s["history"][0]
4383 self.failUnlessEqual(last["expiration-enabled"], True)
4384 cem = last["configured-expiration-mode"]
4385 self.failUnlessEqual(cem[0], "cutoff-date")
4386 self.failUnlessEqual(cem[1], None)
4387 self.failUnlessEqual(cem[2], then)
4388 self.failUnlessEqual(cem[3][0], "mutable")
4389 self.failUnlessEqual(cem[3][1], "immutable")
4391 rec = last["space-recovered"]
4392 self.failUnlessEqual(rec["examined-buckets"], 4)
4393 self.failUnlessEqual(rec["examined-shares"], 4)
4394 self.failUnlessEqual(rec["actual-buckets"], 2)
4395 self.failUnlessEqual(rec["actual-shares"], 2)
4396 # different platforms have different notions of "blocks used by
4397 # this file", so merely assert that it's a number
4398 self.failUnless(rec["actual-diskbytes"] >= 0,
4399 rec["actual-diskbytes"])
4400 d3.addCallback(_after_first_cycle)
4402 d3.addCallback(lambda ign: self.render1(webstatus))
4403 def _check_html_after_cycle(html):
4404 s = remove_tags(html)
4405 self.failUnlessIn("Expiration Enabled:"
4406 " expired leases will be removed", s)
4407 date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
4408 substr = "Leases created or last renewed before %s will be considered expired." % date
4409 self.failUnlessIn(substr, s)
4410 self.failUnlessIn(" recovered: 2 shares, 2 sharesets (1 mutable / 1 immutable), ", s)
4411 d3.addCallback(_check_html_after_cycle)
4412 d3.addBoth(self._wait_for_yield, ac)
4414 d2.addCallback(_then)
4416 d.addCallback(_do_test)
4419 def test_bad_mode(self):
4420 e = self.failUnlessRaises(AssertionError,
4421 ExpirationPolicy, enabled=True, mode="bogus")
4422 self.failUnlessIn("GC mode 'bogus' must be 'age' or 'cutoff-date'", str(e))
4424 def test_parse_duration(self):
4428 p = time_format.parse_duration
4429 self.failUnlessEqual(p("7days"), 7*DAY)
4430 self.failUnlessEqual(p("31day"), 31*DAY)
4431 self.failUnlessEqual(p("60 days"), 60*DAY)
4432 self.failUnlessEqual(p("2mo"), 2*MONTH)
4433 self.failUnlessEqual(p("3 month"), 3*MONTH)
4434 self.failUnlessEqual(p("2years"), 2*YEAR)
4435 e = self.failUnlessRaises(ValueError, p, "2kumquats")
4436 self.failUnlessIn("no unit (like day, month, or year) in '2kumquats'", str(e))
4438 def test_parse_date(self):
4439 p = time_format.parse_date
4440 self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
4441 self.failUnlessEqual(p("2009-03-18"), 1237334400)
4443 def test_limited_history(self):
4444 server = self.create("test_limited_history", detached=True)
4446 # finish as fast as possible
4449 ac = server.get_accounting_crawler()
4450 ac._leasedb.retained_history_entries = RETAINED
4453 ac.allowed_cpu_proportion = 1.0
4454 ac.minimum_cycle_time = 0
4456 # create a few shares, with some leases on them
4457 d = self.make_shares(server)
4459 server.setServiceParent(self.sparent)
4461 d2 = ac.set_hook('after_cycle')
4462 def _after_cycle(cycle):
4464 return ac.set_hook('after_cycle').addCallback(_after_cycle)
4466 state = ac.get_state()
4467 self.failUnlessIn("history", state)
4468 h = state["history"]
4469 self.failUnlessEqual(len(h), RETAINED)
4470 self.failUnlessEqual(max(h.keys()), CYCLES)
4471 self.failUnlessEqual(min(h.keys()), CYCLES-RETAINED+1)
4472 d2.addCallback(_after_cycle)
4473 d2.addBoth(self._wait_for_yield, ac)
4474 d.addCallback(_do_test)
4477 def render_json(self, page):
4478 d = self.render1(page, args={"t": ["json"]})
4482 class AccountingCrawlerWithDiskBackend(WithDiskBackend, AccountingCrawlerTest, unittest.TestCase):
4486 #class AccountingCrawlerWithMockCloudBackend(WithMockCloudBackend, AccountingCrawlerTest, unittest.TestCase):
4490 class WebStatusWithDiskBackend(WithDiskBackend, WebRenderingMixin, unittest.TestCase):
4491 def test_no_server(self):
4492 w = StorageStatus(None)
4493 html = w.renderSynchronously()
4494 self.failUnlessIn("<h1>No Storage Server Running</h1>", html)
4496 def test_status(self):
4497 server = self.create("test_status")
4499 w = StorageStatus(server, "nickname")
4501 def _check_html(html):
4502 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4503 s = remove_tags(html)
4504 self.failUnlessIn("Server Nickname: nickname", s)
4505 self.failUnlessIn("Server Nodeid: %s" % base32.b2a(server.get_serverid()), s)
4506 self.failUnlessIn("Accepting new shares: Yes", s)
4507 self.failUnlessIn("Reserved space: - 0 B (0)", s)
4508 d.addCallback(_check_html)
4509 d.addCallback(lambda ign: self.render_json(w))
4510 def _check_json(json):
4511 data = simplejson.loads(json)
4513 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
4514 self.failUnlessEqual(s["storage_server.reserved_space"], 0)
4515 self.failUnlessIn("bucket-counter", data)
4516 self.failUnlessIn("lease-checker", data)
4517 d.addCallback(_check_json)
4520 @mock.patch('allmydata.util.fileutil.get_disk_stats')
4521 def test_status_no_disk_stats(self, mock_get_disk_stats):
4522 mock_get_disk_stats.side_effect = AttributeError()
4524 # Some platforms may have no disk stats API. Make sure the code can handle that
4525 # (test runs on all platforms).
4526 server = self.create("test_status_no_disk_stats")
4528 w = StorageStatus(server)
4529 html = w.renderSynchronously()
4530 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4531 s = remove_tags(html)
4532 self.failUnlessIn("Accepting new shares: Yes", s)
4533 self.failUnlessIn("Total disk space: ?", s)
4534 self.failUnlessIn("Space Available to Tahoe: ?", s)
4535 self.failUnless(server.get_available_space() is None)
4537 @mock.patch('allmydata.util.fileutil.get_disk_stats')
4538 def test_status_bad_disk_stats(self, mock_get_disk_stats):
4539 mock_get_disk_stats.side_effect = OSError()
4541 # If the API to get disk stats exists but a call to it fails, then the status should
4542 # show that no shares will be accepted, and get_available_space() should be 0.
4543 server = self.create("test_status_bad_disk_stats")
4545 w = StorageStatus(server)
4546 html = w.renderSynchronously()
4547 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4548 s = remove_tags(html)
4549 self.failUnlessIn("Accepting new shares: No", s)
4550 self.failUnlessIn("Total disk space: ?", s)
4551 self.failUnlessIn("Space Available to Tahoe: ?", s)
4552 self.failUnlessEqual(server.get_available_space(), 0)
4554 @mock.patch('allmydata.util.fileutil.get_disk_stats')
4555 def test_status_right_disk_stats(self, mock_get_disk_stats):
4558 free_for_root = 4*GB
4559 free_for_nonroot = 3*GB
4560 reserved_space = 1*GB
4561 used = total - free_for_root
4562 avail = max(free_for_nonroot - reserved_space, 0)
4563 mock_get_disk_stats.return_value = {
4565 'free_for_root': free_for_root,
4566 'free_for_nonroot': free_for_nonroot,
4571 server = self.create("test_status_right_disk_stats", reserved_space=GB)
4572 expecteddir = server.backend._sharedir
4574 w = StorageStatus(server)
4575 html = w.renderSynchronously()
4577 self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
4578 (mock_get_disk_stats.call_args_list, expecteddir, reserved_space))
4580 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4581 s = remove_tags(html)
4582 self.failUnlessIn("Total disk space: 5.00 GB", s)
4583 self.failUnlessIn("Disk space used: - 1.00 GB", s)
4584 self.failUnlessIn("Disk space free (root): 4.00 GB", s)
4585 self.failUnlessIn("Disk space free (non-root): 3.00 GB", s)
4586 self.failUnlessIn("Reserved space: - 1.00 GB", s)
4587 self.failUnlessIn("Space Available to Tahoe: 2.00 GB", s)
4588 self.failUnlessEqual(server.get_available_space(), 2*GB)
4590 def test_readonly(self):
4591 server = self.create("test_readonly", readonly=True)
4593 w = StorageStatus(server)
4594 html = w.renderSynchronously()
4595 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4596 s = remove_tags(html)
4597 self.failUnlessIn("Accepting new shares: No", s)
4599 def test_reserved(self):
4600 server = self.create("test_reserved", reserved_space=10e6)
4602 w = StorageStatus(server)
4603 html = w.renderSynchronously()
4604 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4605 s = remove_tags(html)
4606 self.failUnlessIn("Reserved space: - 10.00 MB (10000000)", s)
4608 def test_util(self):
4609 w = StorageStatus(None)
4610 self.failUnlessEqual(w.render_space(None, None), "?")
4611 self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
4612 self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
4613 self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
4614 self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
4615 self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)
4618 class WebStatusWithMockCloudBackend(WithMockCloudBackend, WebRenderingMixin, unittest.TestCase):
4619 def test_status(self):
4620 server = self.create("test_status")
4622 w = StorageStatus(server, "nickname")
4624 def _check_html(html):
4625 self.failUnlessIn("<h1>Storage Server Status</h1>", html)
4626 s = remove_tags(html)
4627 self.failUnlessIn("Server Nickname: nickname", s)
4628 self.failUnlessIn("Server Nodeid: %s" % base32.b2a(server.get_serverid()), s)
4629 self.failUnlessIn("Accepting new shares: Yes", s)
4630 d.addCallback(_check_html)
4631 d.addCallback(lambda ign: self.render_json(w))
4632 def _check_json(json):
4633 data = simplejson.loads(json)
4635 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
4636 self.failUnlessIn("bucket-counter", data)
4637 self.failUnlessIn("lease-checker", data)
4638 d.addCallback(_check_json)