-
-import os
+import os, random, struct
from zope.interface import implements
from twisted.internet import defer
-from twisted.internet.interfaces import IConsumer
+from twisted.internet.interfaces import IPullProducer
from twisted.python import failure
from twisted.application import service
from twisted.web.error import Error as WebError
-from foolscap.eventual import flushEventualQueue, fireEventually
+from foolscap.api import flushEventualQueue, fireEventually
from allmydata import uri, dirnode, client
from allmydata.introducer.server import IntroducerNode
-from allmydata.interfaces import IURI, IMutableFileNode, IFileNode, \
- FileTooLargeError, NotEnoughSharesError, ICheckable
-from allmydata.checker_results import CheckerResults, CheckAndRepairResults, \
+from allmydata.interfaces import IMutableFileNode, IImmutableFileNode,\
+ NotEnoughSharesError, ICheckable, \
+ IMutableUploadable, SDMF_VERSION, \
+ MDMF_VERSION
+from allmydata.check_results import CheckResults, CheckAndRepairResults, \
DeepCheckResults, DeepCheckAndRepairResults
from allmydata.mutable.common import CorruptShareError
-from allmydata.storage import storage_index_to_dir
-from allmydata.util import log, fileutil, pollmixin
+from allmydata.mutable.layout import unpack_header
+from allmydata.mutable.publish import MutableData
+from allmydata.storage.mutable import MutableShareFile
+from allmydata.util import hashutil, log, fileutil, pollmixin
from allmydata.util.assertutil import precondition
+from allmydata.util.consumer import download_to_data
from allmydata.stats import StatsGathererService
from allmydata.key_generator import KeyGeneratorService
-import common_util as testutil
+import allmydata.test.common_util as testutil
+from allmydata import immutable
+TEST_RSA_KEY_SIZE = 522
def flush_but_dont_ignore(res):
d = flushEventualQueue()
d.addCallback(_done)
return d
+class DummyProducer:
+ implements(IPullProducer)
+ def resumeProducing(self):
+ pass
+
class FakeCHKFileNode:
- """I provide IFileNode, but all of my data is stored in a class-level
- dictionary."""
- implements(IFileNode)
+ """I provide IImmutableFileNode, but all of my data is stored in a
+ class-level dictionary."""
+ implements(IImmutableFileNode)
all_contents = {}
bad_shares = {}
- def __init__(self, u, client):
- precondition(IURI.providedBy(u), u)
- self.client = client
- self.my_uri = u
- self.storage_index = u.storage_index
+ def __init__(self, filecap):
+ precondition(isinstance(filecap, (uri.CHKFileURI, uri.LiteralFileURI)), filecap)
+ self.my_uri = filecap
+ self.storage_index = self.my_uri.get_storage_index()
def get_uri(self):
return self.my_uri.to_string()
+ def get_write_uri(self):
+ return None
def get_readonly_uri(self):
return self.my_uri.to_string()
+ def get_cap(self):
+ return self.my_uri
def get_verify_cap(self):
return self.my_uri.get_verify_cap()
+ def get_repair_cap(self):
+ return self.my_uri.get_verify_cap()
def get_storage_index(self):
return self.storage_index
- def check(self, monitor, verify=False):
- r = CheckerResults(self.my_uri, self.storage_index)
+ def check(self, monitor, verify=False, add_lease=False):
+ r = CheckResults(self.my_uri, self.storage_index)
is_bad = self.bad_shares.get(self.storage_index, None)
data = {}
data["count-shares-needed"] = 3
data["count-recoverable-versions"] = 1
data["count-unrecoverable-versions"] = 0
if is_bad:
- r.set_healthy(False)
- r.set_recoverable(True)
- data["count-shares-good"] = 9
- data["list-corrupt-shares"] = [(nodeid, self.storage_index, 0)]
- r.problems = failure.Failure(CorruptShareError(is_bad))
+ r.set_healthy(False)
+ r.set_recoverable(True)
+ data["count-shares-good"] = 9
+ data["list-corrupt-shares"] = [(nodeid, self.storage_index, 0)]
+ r.problems = failure.Failure(CorruptShareError(is_bad))
else:
- r.set_healthy(True)
- r.set_recoverable(True)
- data["count-shares-good"] = 10
- r.problems = []
+ r.set_healthy(True)
+ r.set_recoverable(True)
+ data["count-shares-good"] = 10
+ r.problems = []
r.set_data(data)
r.set_needs_rebalancing(False)
return defer.succeed(r)
- def check_and_repair(self, monitor, verify=False):
+ def check_and_repair(self, monitor, verify=False, add_lease=False):
d = self.check(verify)
def _got(cr):
r = CheckAndRepairResults(self.storage_index)
return False
def is_readonly(self):
return True
+ def is_unknown(self):
+ return False
+ def is_allowed_in_immutable_directory(self):
+ return True
+ def raise_error(self):
+ pass
- def download(self, target):
- if self.my_uri.to_string() not in self.all_contents:
- f = failure.Failure(NotEnoughSharesError())
- target.fail(f)
- return defer.fail(f)
- data = self.all_contents[self.my_uri.to_string()]
- target.open(len(data))
- target.write(data)
- target.close()
- return defer.maybeDeferred(target.finish)
- def download_to_data(self):
- if self.my_uri.to_string() not in self.all_contents:
- return defer.fail(NotEnoughSharesError())
- data = self.all_contents[self.my_uri.to_string()]
- return defer.succeed(data)
def get_size(self):
+ if isinstance(self.my_uri, uri.LiteralFileURI):
+ return self.my_uri.get_size()
try:
data = self.all_contents[self.my_uri.to_string()]
except KeyError, le:
- raise NotEnoughSharesError(le)
+ raise NotEnoughSharesError(le, 0, 3)
return len(data)
+
def read(self, consumer, offset=0, size=None):
- d = self.download_to_data()
- def _got(data):
- start = offset
- if size is not None:
- end = offset + size
- else:
- end = len(data)
- consumer.write(data[start:end])
- return consumer
- d.addCallback(_got)
+ # we don't bother to call registerProducer/unregisterProducer,
+ # because it's a hassle to write a dummy Producer that does the right
+ # thing (we have to make sure that DummyProducer.resumeProducing
+ # writes the data into the consumer immediately, otherwise it will
+ # loop forever).
+
+ d = defer.succeed(None)
+ d.addCallback(self._read, consumer, offset, size)
return d
-def make_chk_file_uri(size):
+ def _read(self, ignored, consumer, offset, size):
+ if isinstance(self.my_uri, uri.LiteralFileURI):
+ data = self.my_uri.data
+ else:
+ if self.my_uri.to_string() not in self.all_contents:
+ raise NotEnoughSharesError(None, 0, 3)
+ data = self.all_contents[self.my_uri.to_string()]
+ start = offset
+ if size is not None:
+ end = offset + size
+ else:
+ end = len(data)
+ consumer.write(data[start:end])
+ return consumer
+
+
+ def get_best_readable_version(self):
+ return defer.succeed(self)
+
+
+ def download_to_data(self):
+ return download_to_data(self)
+
+
+ download_best_version = download_to_data
+
+
+ def get_size_of_best_version(self):
+ return defer.succeed(self.get_size)
+
+
+def make_chk_file_cap(size):
return uri.CHKFileURI(key=os.urandom(16),
uri_extension_hash=os.urandom(32),
needed_shares=3,
total_shares=10,
size=size)
+def make_chk_file_uri(size):
+ return make_chk_file_cap(size).to_string()
-def create_chk_filenode(client, contents):
- u = make_chk_file_uri(len(contents))
- n = FakeCHKFileNode(u, client)
- FakeCHKFileNode.all_contents[u.to_string()] = contents
+def create_chk_filenode(contents):
+ filecap = make_chk_file_cap(len(contents))
+ n = FakeCHKFileNode(filecap)
+ FakeCHKFileNode.all_contents[filecap.to_string()] = contents
return n
MUTABLE_SIZELIMIT = 10000
all_contents = {}
bad_shares = {}
-
- def __init__(self, client):
- self.client = client
- self.my_uri = make_mutable_file_uri()
- self.storage_index = self.my_uri.storage_index
- def create(self, initial_contents, key_generator=None):
- if len(initial_contents) > self.MUTABLE_SIZELIMIT:
- raise FileTooLargeError("SDMF is limited to one segment, and "
- "%d > %d" % (len(initial_contents),
- self.MUTABLE_SIZELIMIT))
- self.all_contents[self.storage_index] = initial_contents
+ file_types = {} # storage index => MDMF_VERSION or SDMF_VERSION
+
+ def __init__(self, storage_broker, secret_holder,
+ default_encoding_parameters, history):
+ self.init_from_cap(make_mutable_file_cap())
+ self._k = default_encoding_parameters['k']
+ self._segsize = default_encoding_parameters['max_segment_size']
+ def create(self, contents, key_generator=None, keysize=None,
+ version=SDMF_VERSION):
+ if version == MDMF_VERSION and \
+ isinstance(self.my_uri, (uri.ReadonlySSKFileURI,
+ uri.WriteableSSKFileURI)):
+ self.init_from_cap(make_mdmf_mutable_file_cap())
+ self.file_types[self.storage_index] = version
+ initial_contents = self._get_initial_contents(contents)
+ data = initial_contents.read(initial_contents.get_size())
+ data = "".join(data)
+ self.all_contents[self.storage_index] = data
+ self.my_uri.set_extension_params([self._k, self._segsize])
return defer.succeed(self)
- def init_from_uri(self, myuri):
- self.my_uri = IURI(myuri)
- self.storage_index = self.my_uri.storage_index
+ def _get_initial_contents(self, contents):
+ if contents is None:
+ return MutableData("")
+
+ if IMutableUploadable.providedBy(contents):
+ return contents
+
+ assert callable(contents), "%s should be callable, not %s" % \
+ (contents, type(contents))
+ return contents(self)
+ def init_from_cap(self, filecap):
+ assert isinstance(filecap, (uri.WriteableSSKFileURI,
+ uri.ReadonlySSKFileURI,
+ uri.WriteableMDMFFileURI,
+ uri.ReadonlyMDMFFileURI))
+ self.my_uri = filecap
+ self.storage_index = self.my_uri.get_storage_index()
+ if isinstance(filecap, (uri.WriteableMDMFFileURI,
+ uri.ReadonlyMDMFFileURI)):
+ self.file_types[self.storage_index] = MDMF_VERSION
+
+ else:
+ self.file_types[self.storage_index] = SDMF_VERSION
+
return self
+ def get_cap(self):
+ return self.my_uri
+ def get_readcap(self):
+ return self.my_uri.get_readonly()
def get_uri(self):
return self.my_uri.to_string()
+ def get_write_uri(self):
+ if self.is_readonly():
+ return None
+ return self.my_uri.to_string()
def get_readonly(self):
return self.my_uri.get_readonly()
def get_readonly_uri(self):
return self.my_uri.get_readonly().to_string()
+ def get_verify_cap(self):
+ return self.my_uri.get_verify_cap()
+ def get_repair_cap(self):
+ if self.my_uri.is_readonly():
+ return None
+ return self.my_uri
def is_readonly(self):
return self.my_uri.is_readonly()
def is_mutable(self):
return self.my_uri.is_mutable()
+ def is_unknown(self):
+ return False
+ def is_allowed_in_immutable_directory(self):
+ return not self.my_uri.is_mutable()
+ def raise_error(self):
+ pass
def get_writekey(self):
return "\x00"*16
def get_size(self):
- return "?" # TODO: see mutable.MutableFileNode.get_size
+ return len(self.all_contents[self.storage_index])
+ def get_current_size(self):
+ return self.get_size_of_best_version()
def get_size_of_best_version(self):
return defer.succeed(len(self.all_contents[self.storage_index]))
def get_storage_index(self):
return self.storage_index
- def check(self, monitor, verify=False):
- r = CheckerResults(self.my_uri, self.storage_index)
+ def get_servermap(self, mode):
+ return defer.succeed(None)
+
+ def get_version(self):
+ assert self.storage_index in self.file_types
+ return self.file_types[self.storage_index]
+
+ def check(self, monitor, verify=False, add_lease=False):
+ r = CheckResults(self.my_uri, self.storage_index)
is_bad = self.bad_shares.get(self.storage_index, None)
data = {}
data["count-shares-needed"] = 3
data["count-recoverable-versions"] = 1
data["count-unrecoverable-versions"] = 0
if is_bad:
- r.set_healthy(False)
- r.set_recoverable(True)
- data["count-shares-good"] = 9
- r.problems = failure.Failure(CorruptShareError("peerid",
- 0, # shnum
- is_bad))
+ r.set_healthy(False)
+ r.set_recoverable(True)
+ data["count-shares-good"] = 9
+ r.problems = failure.Failure(CorruptShareError("peerid",
+ 0, # shnum
+ is_bad))
else:
- r.set_healthy(True)
- r.set_recoverable(True)
- data["count-shares-good"] = 10
- r.problems = []
+ r.set_healthy(True)
+ r.set_recoverable(True)
+ data["count-shares-good"] = 10
+ r.problems = []
r.set_data(data)
r.set_needs_rebalancing(False)
return defer.succeed(r)
- def check_and_repair(self, monitor, verify=False):
+ def check_and_repair(self, monitor, verify=False, add_lease=False):
d = self.check(verify)
def _got(cr):
r = CheckAndRepairResults(self.storage_index)
d.addCallback(_got)
return d
- def deep_check(self, verify=False):
+ def deep_check(self, verify=False, add_lease=False):
d = self.check(verify)
def _done(r):
dr = DeepCheckResults(self.storage_index)
d.addCallback(_done)
return d
- def deep_check_and_repair(self, verify=False):
+ def deep_check_and_repair(self, verify=False, add_lease=False):
d = self.check_and_repair(verify)
def _done(r):
dr = DeepCheckAndRepairResults(self.storage_index)
return d
def download_best_version(self):
- return defer.succeed(self.all_contents[self.storage_index])
+ return defer.succeed(self._download_best_version())
+
+
+ def _download_best_version(self, ignored=None):
+ if isinstance(self.my_uri, uri.LiteralFileURI):
+ return self.my_uri.data
+ if self.storage_index not in self.all_contents:
+ raise NotEnoughSharesError(None, 0, 3)
+ return self.all_contents[self.storage_index]
+
+
def overwrite(self, new_contents):
- if len(new_contents) > self.MUTABLE_SIZELIMIT:
- raise FileTooLargeError("SDMF is limited to one segment, and "
- "%d > %d" % (len(new_contents),
- self.MUTABLE_SIZELIMIT))
assert not self.is_readonly()
- self.all_contents[self.storage_index] = new_contents
+ new_data = new_contents.read(new_contents.get_size())
+ new_data = "".join(new_data)
+ self.all_contents[self.storage_index] = new_data
+ self.my_uri.set_extension_params([self._k, self._segsize])
return defer.succeed(None)
def modify(self, modifier):
# this does not implement FileTooLargeError, but the real one does
def _modify(self, modifier):
assert not self.is_readonly()
old_contents = self.all_contents[self.storage_index]
- self.all_contents[self.storage_index] = modifier(old_contents, None, True)
+ new_data = modifier(old_contents, None, True)
+ self.all_contents[self.storage_index] = new_data
+ self.my_uri.set_extension_params([self._k, self._segsize])
return None
- def download(self, target):
- if self.storage_index not in self.all_contents:
- f = failure.Failure(NotEnoughSharesError())
- target.fail(f)
- return defer.fail(f)
- data = self.all_contents[self.storage_index]
- target.open(len(data))
- target.write(data)
- target.close()
- return defer.maybeDeferred(target.finish)
- def download_to_data(self):
- if self.storage_index not in self.all_contents:
- return defer.fail(NotEnoughSharesError())
- data = self.all_contents[self.storage_index]
- return defer.succeed(data)
+ # As actually implemented, MutableFilenode and MutableFileVersion
+ # are distinct. However, nothing in the webapi uses (yet) that
+ # distinction -- it just uses the unified download interface
+ # provided by get_best_readable_version and read. When we start
+ # doing cooler things like LDMF, we will want to revise this code to
+ # be less simplistic.
+ def get_best_readable_version(self):
+ return defer.succeed(self)
+
+
+ def get_best_mutable_version(self):
+ return defer.succeed(self)
+
+ # Ditto for this, which is an implementation of IWriteable.
+ # XXX: Declare that the same is implemented.
+ def update(self, data, offset):
+ assert not self.is_readonly()
+ def modifier(old, servermap, first_time):
+ new = old[:offset] + "".join(data.read(data.get_size()))
+ new += old[len(new):]
+ return new
+ return self.modify(modifier)
+
+
+ def read(self, consumer, offset=0, size=None):
+ data = self._download_best_version()
+ if size:
+ data = data[offset:offset+size]
+ consumer.write(data)
+ return defer.succeed(consumer)
-def make_mutable_file_uri():
+
+def make_mutable_file_cap():
return uri.WriteableSSKFileURI(writekey=os.urandom(16),
fingerprint=os.urandom(32))
+
+def make_mdmf_mutable_file_cap():
+ return uri.WriteableMDMFFileURI(writekey=os.urandom(16),
+ fingerprint=os.urandom(32))
+
+def make_mutable_file_uri(mdmf=False):
+ if mdmf:
+ uri = make_mdmf_mutable_file_cap()
+ else:
+ uri = make_mutable_file_cap()
+
+ return uri.to_string()
+
def make_verifier_uri():
return uri.SSKVerifierURI(storage_index=os.urandom(16),
- fingerprint=os.urandom(32))
+ fingerprint=os.urandom(32)).to_string()
-class FakeDirectoryNode(dirnode.NewDirectoryNode):
+def create_mutable_filenode(contents, mdmf=False):
+ # XXX: All of these arguments are kind of stupid.
+ if mdmf:
+ cap = make_mdmf_mutable_file_cap()
+ else:
+ cap = make_mutable_file_cap()
+
+ encoding_params = {}
+ encoding_params['k'] = 3
+ encoding_params['max_segment_size'] = 128*1024
+
+ filenode = FakeMutableFileNode(None, None, encoding_params, None)
+ filenode.init_from_cap(cap)
+ if mdmf:
+ filenode.create(MutableData(contents), version=MDMF_VERSION)
+ else:
+ filenode.create(MutableData(contents), version=SDMF_VERSION)
+ return filenode
+
+
+class FakeDirectoryNode(dirnode.DirectoryNode):
"""This offers IDirectoryNode, but uses a FakeMutableFileNode for the
backing store, so it doesn't go to the grid. The child data is still
encrypted and serialized, so this isn't useful for tests that want to
def log(self, *args, **kwargs):
return log.msg(*args, **kwargs)
-
class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
+ # SystemTestMixin tests tend to be a lot of work, and we have a few
+ # buildslaves that are pretty slow, and every once in a while these tests
+ # run up against the default 120 second timeout. So increase the default
+ # timeout. Individual test cases can override this, of course.
+ timeout = 300
+
def setUp(self):
self.sparent = service.MultiService()
self.sparent.startService()
iv_dir = self.getdir("introducer")
if not os.path.isdir(iv_dir):
fileutil.make_dirs(iv_dir)
- f = open(os.path.join(iv_dir, "webport"), "w")
- f.write("tcp:0:interface=127.0.0.1\n")
- f.close()
+ fileutil.write(os.path.join(iv_dir, 'tahoe.cfg'), \
+ "[node]\n" + \
+ "web.port = tcp:0:interface=127.0.0.1\n")
if SYSTEM_TEST_CERTS:
os.mkdir(os.path.join(iv_dir, "private"))
f = open(os.path.join(iv_dir, "private", "node.pem"), "w")
kgsdir = self.getdir("key_generator")
fileutil.make_dirs(kgsdir)
- self.key_generator_svc = KeyGeneratorService(kgsdir, display_furl=False)
+ self.key_generator_svc = KeyGeneratorService(kgsdir,
+ display_furl=False,
+ default_key_size=TEST_RSA_KEY_SIZE)
self.key_generator_svc.key_generator.pool_size = 4
self.key_generator_svc.key_generator.pool_refresh_delay = 60
self.add_service(self.key_generator_svc)
f.write(SYSTEM_TEST_CERTS[i+1])
f.close()
- def write(name, value):
- open(os.path.join(basedir, name), "w").write(value+"\n")
+ config = "[client]\n"
+ config += "introducer.furl = %s\n" % self.introducer_furl
+ if self.stats_gatherer_furl:
+ config += "stats_gatherer.furl = %s\n" % self.stats_gatherer_furl
+
if i == 0:
- # client[0] runs a webserver and a helper, no key_generator
- write("webport", "tcp:0:interface=127.0.0.1")
- write("run_helper", "yes")
- write("keepalive_timeout", "600")
+ # clients[0] runs a webserver and a helper, no key_generator
+ config += "[node]\n"
+ config += "web.port = tcp:0:interface=127.0.0.1\n"
+ config += "timeout.keepalive = 600\n"
+ config += "[helper]\n"
+ config += "enabled = True\n"
if i == 3:
- # client[3] runs a webserver and uses a helper, uses
+ # clients[3] runs a webserver and uses a helper, uses
# key_generator
- write("webport", "tcp:0:interface=127.0.0.1")
- write("disconnect_timeout", "1800")
if self.key_generator_furl:
- kgf = "%s\n" % (self.key_generator_furl,)
- write("key_generator.furl", kgf)
- write("introducer.furl", self.introducer_furl)
- if self.stats_gatherer_furl:
- write("stats_gatherer.furl", self.stats_gatherer_furl)
+ config += "key_generator.furl = %s\n" % self.key_generator_furl
+ config += "[node]\n"
+ config += "web.port = tcp:0:interface=127.0.0.1\n"
+ config += "timeout.disconnect = 1800\n"
+
+ fileutil.write(os.path.join(basedir, 'tahoe.cfg'), config)
- # give subclasses a chance to append liens to the node's tahoe.cfg
+ # give subclasses a chance to append lines to the node's tahoe.cfg
# files before they are launched.
self._set_up_nodes_extra_config()
- # start client[0], wait for it's tub to be ready (at which point it
+ # start clients[0], wait for it's tub to be ready (at which point it
# will have registered the helper furl).
c = self.add_service(client.Client(basedir=basedirs[0]))
self.clients.append(c)
+ c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE)
d = c.when_tub_ready()
def _ready(res):
f = open(os.path.join(basedirs[0],"private","helper.furl"), "r")
helper_furl = f.read()
f.close()
self.helper_furl = helper_furl
- f = open(os.path.join(basedirs[3],"helper.furl"), "w")
- f.write(helper_furl)
- f.close()
+ if self.numclients >= 4:
+ f = open(os.path.join(basedirs[3], 'tahoe.cfg'), 'ab+')
+ f.write(
+ "[client]\n"
+ "helper.furl = %s\n" % helper_furl)
+ f.close()
# this starts the rest of the clients
for i in range(1, self.numclients):
c = self.add_service(client.Client(basedir=basedirs[i]))
self.clients.append(c)
+ c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE)
log.msg("STARTING")
return self.wait_for_connections()
d.addCallback(_ready)
def _connected(res):
log.msg("CONNECTED")
# now find out where the web port was
- l = self.clients[0].getServiceNamed("webish").listener
- port = l._port.getHost().port
- self.webish_url = "http://localhost:%d/" % port
- # and the helper-using webport
- l = self.clients[3].getServiceNamed("webish").listener
- port = l._port.getHost().port
- self.helper_webish_url = "http://localhost:%d/" % port
+ self.webish_url = self.clients[0].getServiceNamed("webish").getURL()
+ if self.numclients >=4:
+ # and the helper-using webport
+ self.helper_webish_url = self.clients[3].getServiceNamed("webish").getURL()
d.addCallback(_connected)
return d
def _stopped(res):
new_c = client.Client(basedir=self.getdir("client%d" % num))
self.clients[num] = new_c
+ new_c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE)
self.add_service(new_c)
return new_c.when_tub_ready()
d.addCallback(_stopped)
def _maybe_get_webport(res):
if num == 0:
# now find out where the web port was
- l = self.clients[0].getServiceNamed("webish").listener
- port = l._port.getHost().port
- self.webish_url = "http://localhost:%d/" % port
+ self.webish_url = self.clients[0].getServiceNamed("webish").getURL()
d.addCallback(_maybe_get_webport)
return d
basedir = self.getdir("client%d" % client_num)
if not os.path.isdir(basedir):
fileutil.make_dirs(basedir)
- open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
+ config = "[client]\n"
+ config += "introducer.furl = %s\n" % self.introducer_furl
if helper_furl:
- f = open(os.path.join(basedir, "helper.furl") ,"w")
- f.write(helper_furl+"\n")
- f.close()
+ config += "helper.furl = %s\n" % helper_furl
+ fileutil.write(os.path.join(basedir, 'tahoe.cfg'), config)
c = client.Client(basedir=basedir)
self.clients.append(c)
+ c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE)
self.numclients += 1
if add_to_sparent:
c.setServiceParent(self.sparent)
def _check_connections(self):
for c in self.clients:
- ic = c.introducer_client
- if not ic.connected_to_introducer():
+ if not c.connected_to_introducer():
return False
- if len(ic.get_all_peerids()) != self.numclients:
+ sb = c.get_storage_broker()
+ if len(sb.get_connected_servers()) != self.numclients:
return False
return True
# To disable the pre-computed tub certs, uncomment this line.
#SYSTEM_TEST_CERTS = []
-class ShareManglingMixin(SystemTestMixin):
-
- def find_shares(self, unused=None):
- """Locate shares on disk. Returns a dict that maps
- (clientnum,sharenum) to a string that contains the share container
- (copied directly from the disk, containing leases etc). You can
- modify this dict and then call replace_shares() to modify the shares.
- """
- shares = {} # k: (i, sharenum), v: data
-
- for i, c in enumerate(self.clients):
- sharedir = c.getServiceNamed("storage").sharedir
- for (dirp, dirns, fns) in os.walk(sharedir):
- for fn in fns:
- try:
- sharenum = int(fn)
- except TypeError:
- # Whoops, I guess that's not a share file then.
- pass
- else:
- data = open(os.path.join(sharedir, dirp, fn), "r").read()
- shares[(i, sharenum)] = data
-
- return shares
-
- def replace_shares(self, newshares, storage_index):
- """Replace shares on disk. Takes a dictionary in the same form
- as find_shares() returns."""
-
- for i, c in enumerate(self.clients):
- sharedir = c.getServiceNamed("storage").sharedir
- for (dirp, dirns, fns) in os.walk(sharedir):
- for fn in fns:
- try:
- sharenum = int(fn)
- except TypeError:
- # Whoops, I guess that's not a share file then.
- pass
- else:
- pathtosharefile = os.path.join(sharedir, dirp, fn)
- os.unlink(pathtosharefile)
- for ((clientnum, sharenum), newdata) in newshares.iteritems():
- if clientnum == i:
- fullsharedirp=os.path.join(sharedir, storage_index_to_dir(storage_index))
- fileutil.make_dirs(fullsharedirp)
- wf = open(os.path.join(fullsharedirp, str(sharenum)), "w")
- wf.write(newdata)
+TEST_DATA="\x02"*(immutable.upload.Uploader.URI_LIT_SIZE_THRESHOLD+1)
class ShouldFailMixin:
def shouldFail(self, expected_failure, which, substring,
error message, if any, because Deferred chains frequently make it
difficult to tell which assertion was tripped.
- The substring= argument, if not None, must appear inside the
- stringified Failure, or the test will fail.
+ The substring= argument, if not None, must appear in the 'repr'
+ of the message wrapped by this Failure, or the test will fail.
"""
assert substring is None or isinstance(substring, str)
if isinstance(res, failure.Failure):
res.trap(expected_failure)
if substring:
- self.failUnless(substring in str(res),
+ message = repr(res.value.args[0])
+ self.failUnless(substring in message,
"substring '%s' not in '%s'"
- % (substring, str(res)))
+ % (substring, message))
else:
self.fail("%s was supposed to raise %s, not get '%s'" %
(which, expected_failure, res))
f.trap(WebError)
print "Web Error:", f.value, ":", f.value.response
return f
+
+ def _shouldHTTPError(self, res, which, validator):
+ if isinstance(res, failure.Failure):
+ res.trap(WebError)
+ return validator(res)
+ else:
+ self.fail("%s was supposed to Error, not get '%s'" % (which, res))
+
+ def shouldHTTPError(self, which,
+ code=None, substring=None, response_substring=None,
+ callable=None, *args, **kwargs):
+ # returns a Deferred with the response body
+ assert substring is None or isinstance(substring, str)
+ assert callable
+ def _validate(f):
+ if code is not None:
+ self.failUnlessEqual(f.value.status, str(code))
+ if substring:
+ code_string = str(f)
+ self.failUnless(substring in code_string,
+ "substring '%s' not in '%s'"
+ % (substring, code_string))
+ response_body = f.value.response
+ if response_substring:
+ self.failUnless(response_substring in response_body,
+ "response substring '%s' not in '%s'"
+ % (response_substring, response_body))
+ return response_body
+ d = defer.maybeDeferred(callable, *args, **kwargs)
+ d.addBoth(self._shouldHTTPError, which, _validate)
+ return d
+
class ErrorMixin(WebErrorMixin):
def explain_error(self, f):
if f.check(defer.FirstError):
print "First Error:", f.value.subFailure
return f
-class MemoryConsumer:
- implements(IConsumer)
- def __init__(self):
- self.chunks = []
- self.done = False
- def registerProducer(self, p, streaming):
- if streaming:
- # call resumeProducing once to start things off
- p.resumeProducing()
- else:
- while not self.done:
- p.resumeProducing()
- def write(self, data):
- self.chunks.append(data)
- def unregisterProducer(self):
- self.done = True
-
-def download_to_data(n, offset=0, size=None):
- d = n.read(MemoryConsumer(), offset, size)
- d.addCallback(lambda mc: "".join(mc.chunks))
- return d
+def corrupt_field(data, offset, size, debug=False):
+ if random.random() < 0.5:
+ newdata = testutil.flip_one_bit(data, offset, size)
+ if debug:
+ log.msg("testing: corrupting offset %d, size %d flipping one bit orig: %r, newdata: %r" % (offset, size, data[offset:offset+size], newdata[offset:offset+size]))
+ return newdata
+ else:
+ newval = testutil.insecurerandstr(size)
+ if debug:
+ log.msg("testing: corrupting offset %d, size %d randomizing field, orig: %r, newval: %r" % (offset, size, data[offset:offset+size], newval))
+ return data[:offset]+newval+data[offset+size:]
+
+def _corrupt_nothing(data, debug=False):
+ """Leave the data pristine. """
+ return data
+
+def _corrupt_file_version_number(data, debug=False):
+ """Scramble the file data -- the share file version number have one bit
+ flipped or else will be changed to a random value."""
+ return corrupt_field(data, 0x00, 4)
+
+def _corrupt_size_of_file_data(data, debug=False):
+ """Scramble the file data -- the field showing the size of the share data
+ within the file will be set to one smaller."""
+ return corrupt_field(data, 0x04, 4)
+
+def _corrupt_sharedata_version_number(data, debug=False):
+ """Scramble the file data -- the share data version number will have one
+ bit flipped or else will be changed to a random value, but not 1 or 2."""
+ return corrupt_field(data, 0x0c, 4)
+ sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+ assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
+ newsharevernum = sharevernum
+ while newsharevernum in (1, 2):
+ newsharevernum = random.randrange(0, 2**32)
+ newsharevernumbytes = struct.pack(">L", newsharevernum)
+ return data[:0x0c] + newsharevernumbytes + data[0x0c+4:]
+
+def _corrupt_sharedata_version_number_to_plausible_version(data, debug=False):
+ """Scramble the file data -- the share data version number will be
+ changed to 2 if it is 1 or else to 1 if it is 2."""
+ sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+ assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
+ if sharevernum == 1:
+ newsharevernum = 2
+ else:
+ newsharevernum = 1
+ newsharevernumbytes = struct.pack(">L", newsharevernum)
+ return data[:0x0c] + newsharevernumbytes + data[0x0c+4:]
+
+def _corrupt_segment_size(data, debug=False):
+ """Scramble the file data -- the field showing the size of the segment
+ will have one bit flipped or else be changed to a random value."""
+ sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+ assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
+ if sharevernum == 1:
+ return corrupt_field(data, 0x0c+0x04, 4, debug=False)
+ else:
+ return corrupt_field(data, 0x0c+0x04, 8, debug=False)
+
+def _corrupt_size_of_sharedata(data, debug=False):
+ """Scramble the file data -- the field showing the size of the data
+ within the share data will have one bit flipped or else will be changed
+ to a random value."""
+ sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+ assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
+ if sharevernum == 1:
+ return corrupt_field(data, 0x0c+0x08, 4)
+ else:
+ return corrupt_field(data, 0x0c+0x0c, 8)
+
+def _corrupt_offset_of_sharedata(data, debug=False):
+ """Scramble the file data -- the field showing the offset of the data
+ within the share data will have one bit flipped or else be changed to a
+ random value."""
+ sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+ assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
+ if sharevernum == 1:
+ return corrupt_field(data, 0x0c+0x0c, 4)
+ else:
+ return corrupt_field(data, 0x0c+0x14, 8)
+
+def _corrupt_offset_of_ciphertext_hash_tree(data, debug=False):
+ """Scramble the file data -- the field showing the offset of the
+ ciphertext hash tree within the share data will have one bit flipped or
+ else be changed to a random value.
+ """
+ sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+ assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
+ if sharevernum == 1:
+ return corrupt_field(data, 0x0c+0x14, 4, debug=False)
+ else:
+ return corrupt_field(data, 0x0c+0x24, 8, debug=False)
+
+def _corrupt_offset_of_block_hashes(data, debug=False):
+ """Scramble the file data -- the field showing the offset of the block
+ hash tree within the share data will have one bit flipped or else will be
+ changed to a random value."""
+ sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+ assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
+ if sharevernum == 1:
+ return corrupt_field(data, 0x0c+0x18, 4)
+ else:
+ return corrupt_field(data, 0x0c+0x2c, 8)
+
+def _corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes(data, debug=False):
+ """Scramble the file data -- the field showing the offset of the block
+ hash tree within the share data will have a multiple of hash size
+ subtracted from it, thus causing the downloader to download an incomplete
+ crypttext hash tree."""
+ sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+ assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
+ if sharevernum == 1:
+ curval = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
+ newval = random.randrange(0, max(1, (curval/hashutil.CRYPTO_VAL_SIZE)/2))*hashutil.CRYPTO_VAL_SIZE
+ newvalstr = struct.pack(">L", newval)
+ return data[:0x0c+0x18]+newvalstr+data[0x0c+0x18+4:]
+ else:
+ curval = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
+ newval = random.randrange(0, max(1, (curval/hashutil.CRYPTO_VAL_SIZE)/2))*hashutil.CRYPTO_VAL_SIZE
+ newvalstr = struct.pack(">Q", newval)
+ return data[:0x0c+0x2c]+newvalstr+data[0x0c+0x2c+8:]
+
+def _corrupt_offset_of_share_hashes(data, debug=False):
+ """Scramble the file data -- the field showing the offset of the share
+ hash tree within the share data will have one bit flipped or else will be
+ changed to a random value."""
+ sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+ assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
+ if sharevernum == 1:
+ return corrupt_field(data, 0x0c+0x1c, 4)
+ else:
+ return corrupt_field(data, 0x0c+0x34, 8)
+
+def _corrupt_offset_of_uri_extension(data, debug=False):
+ """Scramble the file data -- the field showing the offset of the uri
+ extension will have one bit flipped or else will be changed to a random
+ value."""
+ sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+ assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
+ if sharevernum == 1:
+ return corrupt_field(data, 0x0c+0x20, 4)
+ else:
+ return corrupt_field(data, 0x0c+0x3c, 8)
+
+def _corrupt_offset_of_uri_extension_to_force_short_read(data, debug=False):
+ """Scramble the file data -- the field showing the offset of the uri
+ extension will be set to the size of the file minus 3. This means when
+ the client tries to read the length field from that location it will get
+ a short read -- the result string will be only 3 bytes long, not the 4 or
+ 8 bytes necessary to do a successful struct.unpack."""
+ sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+ assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
+ # The "-0x0c" in here is to skip the server-side header in the share
+ # file, which the client doesn't see when seeking and reading.
+ if sharevernum == 1:
+ if debug:
+ log.msg("testing: corrupting offset %d, size %d, changing %d to %d (len(data) == %d)" % (0x2c, 4, struct.unpack(">L", data[0x2c:0x2c+4])[0], len(data)-0x0c-3, len(data)))
+ return data[:0x2c] + struct.pack(">L", len(data)-0x0c-3) + data[0x2c+4:]
+ else:
+ if debug:
+ log.msg("testing: corrupting offset %d, size %d, changing %d to %d (len(data) == %d)" % (0x48, 8, struct.unpack(">Q", data[0x48:0x48+8])[0], len(data)-0x0c-3, len(data)))
+ return data[:0x48] + struct.pack(">Q", len(data)-0x0c-3) + data[0x48+8:]
+
+def _corrupt_mutable_share_data(data, debug=False):
+ prefix = data[:32]
+ assert prefix == MutableShareFile.MAGIC, "This function is designed to corrupt mutable shares of v1, and the magic number doesn't look right: %r vs %r" % (prefix, MutableShareFile.MAGIC)
+ data_offset = MutableShareFile.DATA_OFFSET
+ sharetype = data[data_offset:data_offset+1]
+ assert sharetype == "\x00", "non-SDMF mutable shares not supported"
+ (version, ig_seqnum, ig_roothash, ig_IV, ig_k, ig_N, ig_segsize,
+ ig_datalen, offsets) = unpack_header(data[data_offset:])
+ assert version == 0, "this function only handles v0 SDMF files"
+ start = data_offset + offsets["share_data"]
+ length = data_offset + offsets["enc_privkey"] - start
+ return corrupt_field(data, start, length)
+
+def _corrupt_share_data(data, debug=False):
+ """Scramble the file data -- the field containing the share data itself
+ will have one bit flipped or else will be changed to a random value."""
+ sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+ assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways, not v%d." % sharevernum
+ if sharevernum == 1:
+ sharedatasize = struct.unpack(">L", data[0x0c+0x08:0x0c+0x08+4])[0]
+
+ return corrupt_field(data, 0x0c+0x24, sharedatasize)
+ else:
+ sharedatasize = struct.unpack(">Q", data[0x0c+0x08:0x0c+0x0c+8])[0]
+
+ return corrupt_field(data, 0x0c+0x44, sharedatasize)
+
+def _corrupt_share_data_last_byte(data, debug=False):
+ """Scramble the file data -- flip all bits of the last byte."""
+ sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+ assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways, not v%d." % sharevernum
+ if sharevernum == 1:
+ sharedatasize = struct.unpack(">L", data[0x0c+0x08:0x0c+0x08+4])[0]
+ offset = 0x0c+0x24+sharedatasize-1
+ else:
+ sharedatasize = struct.unpack(">Q", data[0x0c+0x08:0x0c+0x0c+8])[0]
+ offset = 0x0c+0x44+sharedatasize-1
+
+ newdata = data[:offset] + chr(ord(data[offset])^0xFF) + data[offset+1:]
+ if debug:
+ log.msg("testing: flipping all bits of byte at offset %d: %r, newdata: %r" % (offset, data[offset], newdata[offset]))
+ return newdata
+
+def _corrupt_crypttext_hash_tree(data, debug=False):
+ """Scramble the file data -- the field containing the crypttext hash tree
+ will have one bit flipped or else will be changed to a random value.
+ """
+ sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+ assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
+ if sharevernum == 1:
+ crypttexthashtreeoffset = struct.unpack(">L", data[0x0c+0x14:0x0c+0x14+4])[0]
+ blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
+ else:
+ crypttexthashtreeoffset = struct.unpack(">Q", data[0x0c+0x24:0x0c+0x24+8])[0]
+ blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
+
+ return corrupt_field(data, 0x0c+crypttexthashtreeoffset, blockhashesoffset-crypttexthashtreeoffset, debug=debug)
+
+def _corrupt_crypttext_hash_tree_byte_x221(data, debug=False):
+ """Scramble the file data -- the byte at offset 0x221 will have its 7th
+ (b1) bit flipped.
+ """
+ sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+ assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
+ if debug:
+ log.msg("original data: %r" % (data,))
+ return data[:0x0c+0x221] + chr(ord(data[0x0c+0x221])^0x02) + data[0x0c+0x2210+1:]
+
+def _corrupt_block_hashes(data, debug=False):
+ """Scramble the file data -- the field containing the block hash tree
+ will have one bit flipped or else will be changed to a random value.
+ """
+ sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+ assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
+ if sharevernum == 1:
+ blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
+ sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
+ else:
+ blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
+ sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
+
+ return corrupt_field(data, 0x0c+blockhashesoffset, sharehashesoffset-blockhashesoffset)
+
+def _corrupt_share_hashes(data, debug=False):
+ """Scramble the file data -- the field containing the share hash chain
+ will have one bit flipped or else will be changed to a random value.
+ """
+ sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+ assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
+ if sharevernum == 1:
+ sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
+ uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
+ else:
+ sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
+ uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
+
+ return corrupt_field(data, 0x0c+sharehashesoffset, uriextoffset-sharehashesoffset)
+
+def _corrupt_length_of_uri_extension(data, debug=False):
+ """Scramble the file data -- the field showing the length of the uri
+ extension will have one bit flipped or else will be changed to a random
+ value."""
+ sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+ assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
+ if sharevernum == 1:
+ uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
+ return corrupt_field(data, uriextoffset, 4)
+ else:
+ uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
+ return corrupt_field(data, 0x0c+uriextoffset, 8)
+
+def _corrupt_uri_extension(data, debug=False):
+ """Scramble the file data -- the field containing the uri extension will
+ have one bit flipped or else will be changed to a random value."""
+ sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+ assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
+ if sharevernum == 1:
+ uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
+ uriextlen = struct.unpack(">L", data[0x0c+uriextoffset:0x0c+uriextoffset+4])[0]
+ else:
+ uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
+ uriextlen = struct.unpack(">Q", data[0x0c+uriextoffset:0x0c+uriextoffset+8])[0]
+
+ return corrupt_field(data, 0x0c+uriextoffset, uriextlen)