from twisted.application import service
from twisted.web.error import Error as WebError
from foolscap.api import flushEventualQueue, fireEventually
-from allmydata import uri, dirnode, client
+from allmydata import uri, client
from allmydata.introducer.server import IntroducerNode
-from allmydata.interfaces import IMutableFileNode, IImmutableFileNode, \
- FileTooLargeError, NotEnoughSharesError, ICheckable
+from allmydata.interfaces import IMutableFileNode, IImmutableFileNode,\
+ NotEnoughSharesError, ICheckable, \
+ IMutableUploadable, SDMF_VERSION, \
+ MDMF_VERSION
from allmydata.check_results import CheckResults, CheckAndRepairResults, \
DeepCheckResults, DeepCheckAndRepairResults
-from allmydata.mutable.common import CorruptShareError
+from allmydata.storage_client import StubServer
from allmydata.mutable.layout import unpack_header
-from allmydata.storage.server import storage_index_to_dir
+from allmydata.mutable.publish import MutableData
from allmydata.storage.mutable import MutableShareFile
from allmydata.util import hashutil, log, fileutil, pollmixin
from allmydata.util.assertutil import precondition
import allmydata.test.common_util as testutil
from allmydata import immutable
+TEST_RSA_KEY_SIZE = 522
def flush_but_dont_ignore(res):
d = flushEventualQueue()
"""I provide IImmutableFileNode, but all of my data is stored in a
class-level dictionary."""
implements(IImmutableFileNode)
- all_contents = {}
- bad_shares = {}
- def __init__(self, filecap):
+ def __init__(self, filecap, all_contents):
precondition(isinstance(filecap, (uri.CHKFileURI, uri.LiteralFileURI)), filecap)
+ self.all_contents = all_contents
self.my_uri = filecap
self.storage_index = self.my_uri.get_storage_index()
return self.storage_index
def check(self, monitor, verify=False, add_lease=False):
- r = CheckResults(self.my_uri, self.storage_index)
- is_bad = self.bad_shares.get(self.storage_index, None)
- data = {}
- data["count-shares-needed"] = 3
- data["count-shares-expected"] = 10
- data["count-good-share-hosts"] = 10
- data["count-wrong-shares"] = 0
- nodeid = "\x00"*20
- data["list-corrupt-shares"] = []
- data["sharemap"] = {1: [nodeid]}
- data["servers-responding"] = [nodeid]
- data["count-recoverable-versions"] = 1
- data["count-unrecoverable-versions"] = 0
- if is_bad:
- r.set_healthy(False)
- r.set_recoverable(True)
- data["count-shares-good"] = 9
- data["list-corrupt-shares"] = [(nodeid, self.storage_index, 0)]
- r.problems = failure.Failure(CorruptShareError(is_bad))
- else:
- r.set_healthy(True)
- r.set_recoverable(True)
- data["count-shares-good"] = 10
- r.problems = []
- r.set_data(data)
- r.set_needs_rebalancing(False)
+ s = StubServer("\x00"*20)
+ r = CheckResults(self.my_uri, self.storage_index,
+ healthy=True, recoverable=True,
+ needs_rebalancing=False,
+ count_shares_needed=3,
+ count_shares_expected=10,
+ count_shares_good=10,
+ count_good_share_hosts=10,
+ count_recoverable_versions=1,
+ count_unrecoverable_versions=0,
+ servers_responding=[s],
+ sharemap={1: [s]},
+ count_wrong_shares=0,
+ list_corrupt_shares=[],
+ count_corrupt_shares=0,
+ list_incompatible_shares=[],
+ count_incompatible_shares=0,
+ summary="",
+ report=[],
+ share_problems=[],
+ servermap=None)
return defer.succeed(r)
def check_and_repair(self, monitor, verify=False, add_lease=False):
d = self.check(verify)
except KeyError, le:
raise NotEnoughSharesError(le, 0, 3)
return len(data)
+ def get_current_size(self):
+ return defer.succeed(self.get_size())
def read(self, consumer, offset=0, size=None):
# we don't bother to call registerProducer/unregisterProducer,
consumer.write(data[start:end])
return consumer
+
+ def get_best_readable_version(self):
+ return defer.succeed(self)
+
+
+ def download_to_data(self):
+ return download_to_data(self)
+
+
+ download_best_version = download_to_data
+
+
+ def get_size_of_best_version(self):
+ return defer.succeed(self.get_size)
+
+
def make_chk_file_cap(size):
return uri.CHKFileURI(key=os.urandom(16),
uri_extension_hash=os.urandom(32),
def make_chk_file_uri(size):
return make_chk_file_cap(size).to_string()
-def create_chk_filenode(contents):
+def create_chk_filenode(contents, all_contents):
filecap = make_chk_file_cap(len(contents))
- n = FakeCHKFileNode(filecap)
- FakeCHKFileNode.all_contents[filecap.to_string()] = contents
+ n = FakeCHKFileNode(filecap, all_contents)
+ all_contents[filecap.to_string()] = contents
return n
implements(IMutableFileNode, ICheckable)
MUTABLE_SIZELIMIT = 10000
- all_contents = {}
- bad_shares = {}
def __init__(self, storage_broker, secret_holder,
- default_encoding_parameters, history):
+ default_encoding_parameters, history, all_contents):
+ self.all_contents = all_contents
+ self.file_types = {} # storage index => MDMF_VERSION or SDMF_VERSION
self.init_from_cap(make_mutable_file_cap())
- def create(self, contents, key_generator=None, keysize=None):
+ self._k = default_encoding_parameters['k']
+ self._segsize = default_encoding_parameters['max_segment_size']
+ def create(self, contents, key_generator=None, keysize=None,
+ version=SDMF_VERSION):
+ if version == MDMF_VERSION and \
+ isinstance(self.my_uri, (uri.ReadonlySSKFileURI,
+ uri.WriteableSSKFileURI)):
+ self.init_from_cap(make_mdmf_mutable_file_cap())
+ self.file_types[self.storage_index] = version
initial_contents = self._get_initial_contents(contents)
- if len(initial_contents) > self.MUTABLE_SIZELIMIT:
- raise FileTooLargeError("SDMF is limited to one segment, and "
- "%d > %d" % (len(initial_contents),
- self.MUTABLE_SIZELIMIT))
- self.all_contents[self.storage_index] = initial_contents
+ data = initial_contents.read(initial_contents.get_size())
+ data = "".join(data)
+ self.all_contents[self.storage_index] = data
return defer.succeed(self)
def _get_initial_contents(self, contents):
- if isinstance(contents, str):
- return contents
if contents is None:
- return ""
+ return MutableData("")
+
+ if IMutableUploadable.providedBy(contents):
+ return contents
+
assert callable(contents), "%s should be callable, not %s" % \
(contents, type(contents))
return contents(self)
def init_from_cap(self, filecap):
assert isinstance(filecap, (uri.WriteableSSKFileURI,
- uri.ReadonlySSKFileURI))
+ uri.ReadonlySSKFileURI,
+ uri.WriteableMDMFFileURI,
+ uri.ReadonlyMDMFFileURI))
self.my_uri = filecap
self.storage_index = self.my_uri.get_storage_index()
+ if isinstance(filecap, (uri.WriteableMDMFFileURI,
+ uri.ReadonlyMDMFFileURI)):
+ self.file_types[self.storage_index] = MDMF_VERSION
+
+ else:
+ self.file_types[self.storage_index] = SDMF_VERSION
+
return self
def get_cap(self):
return self.my_uri
return self.my_uri.get_readonly().to_string()
def get_verify_cap(self):
return self.my_uri.get_verify_cap()
+ def get_repair_cap(self):
+ if self.my_uri.is_readonly():
+ return None
+ return self.my_uri
def is_readonly(self):
return self.my_uri.is_readonly()
def is_mutable(self):
def get_storage_index(self):
return self.storage_index
+ def get_servermap(self, mode):
+ return defer.succeed(None)
+
+ def get_version(self):
+ assert self.storage_index in self.file_types
+ return self.file_types[self.storage_index]
+
def check(self, monitor, verify=False, add_lease=False):
- r = CheckResults(self.my_uri, self.storage_index)
- is_bad = self.bad_shares.get(self.storage_index, None)
- data = {}
- data["count-shares-needed"] = 3
- data["count-shares-expected"] = 10
- data["count-good-share-hosts"] = 10
- data["count-wrong-shares"] = 0
- data["list-corrupt-shares"] = []
- nodeid = "\x00"*20
- data["sharemap"] = {"seq1-abcd-sh0": [nodeid]}
- data["servers-responding"] = [nodeid]
- data["count-recoverable-versions"] = 1
- data["count-unrecoverable-versions"] = 0
- if is_bad:
- r.set_healthy(False)
- r.set_recoverable(True)
- data["count-shares-good"] = 9
- r.problems = failure.Failure(CorruptShareError("peerid",
- 0, # shnum
- is_bad))
- else:
- r.set_healthy(True)
- r.set_recoverable(True)
- data["count-shares-good"] = 10
- r.problems = []
- r.set_data(data)
- r.set_needs_rebalancing(False)
+ s = StubServer("\x00"*20)
+ r = CheckResults(self.my_uri, self.storage_index,
+ healthy=True, recoverable=True,
+ needs_rebalancing=False,
+ count_shares_needed=3,
+ count_shares_expected=10,
+ count_shares_good=10,
+ count_good_share_hosts=10,
+ count_recoverable_versions=1,
+ count_unrecoverable_versions=0,
+ servers_responding=[s],
+ sharemap={"seq1-abcd-sh0": [s]},
+ count_wrong_shares=0,
+ list_corrupt_shares=[],
+ count_corrupt_shares=0,
+ list_incompatible_shares=[],
+ count_incompatible_shares=0,
+ summary="",
+ report=[],
+ share_problems=[],
+ servermap=None)
return defer.succeed(r)
def check_and_repair(self, monitor, verify=False, add_lease=False):
return d
def download_best_version(self):
+ return defer.succeed(self._download_best_version())
+
+
+ def _download_best_version(self, ignored=None):
if isinstance(self.my_uri, uri.LiteralFileURI):
- return defer.succeed(self.my_uri.data)
+ return self.my_uri.data
if self.storage_index not in self.all_contents:
- return defer.fail(NotEnoughSharesError(None, 0, 3))
- return defer.succeed(self.all_contents[self.storage_index])
+ raise NotEnoughSharesError(None, 0, 3)
+ return self.all_contents[self.storage_index]
+
def overwrite(self, new_contents):
- if len(new_contents) > self.MUTABLE_SIZELIMIT:
- raise FileTooLargeError("SDMF is limited to one segment, and "
- "%d > %d" % (len(new_contents),
- self.MUTABLE_SIZELIMIT))
assert not self.is_readonly()
- self.all_contents[self.storage_index] = new_contents
+ new_data = new_contents.read(new_contents.get_size())
+ new_data = "".join(new_data)
+ self.all_contents[self.storage_index] = new_data
return defer.succeed(None)
def modify(self, modifier):
# this does not implement FileTooLargeError, but the real one does
def _modify(self, modifier):
assert not self.is_readonly()
old_contents = self.all_contents[self.storage_index]
- self.all_contents[self.storage_index] = modifier(old_contents, None, True)
+ new_data = modifier(old_contents, None, True)
+ self.all_contents[self.storage_index] = new_data
return None
+ # As actually implemented, MutableFilenode and MutableFileVersion
+ # are distinct. However, nothing in the webapi uses (yet) that
+ # distinction -- it just uses the unified download interface
+ # provided by get_best_readable_version and read. When we start
+ # doing cooler things like LDMF, we will want to revise this code to
+ # be less simplistic.
+ def get_best_readable_version(self):
+ return defer.succeed(self)
+
+
+ def get_best_mutable_version(self):
+ return defer.succeed(self)
+
+ # Ditto for this, which is an implementation of IWriteable.
+ # XXX: Declare that the same is implemented.
+ def update(self, data, offset):
+ assert not self.is_readonly()
+ def modifier(old, servermap, first_time):
+ new = old[:offset] + "".join(data.read(data.get_size()))
+ new += old[len(new):]
+ return new
+ return self.modify(modifier)
+
+
+ def read(self, consumer, offset=0, size=None):
+ data = self._download_best_version()
+ if size:
+ data = data[offset:offset+size]
+ consumer.write(data)
+ return defer.succeed(consumer)
+
+
def make_mutable_file_cap():
return uri.WriteableSSKFileURI(writekey=os.urandom(16),
fingerprint=os.urandom(32))
-def make_mutable_file_uri():
- return make_mutable_file_cap().to_string()
+
+def make_mdmf_mutable_file_cap():
+ return uri.WriteableMDMFFileURI(writekey=os.urandom(16),
+ fingerprint=os.urandom(32))
+
+def make_mutable_file_uri(mdmf=False):
+ if mdmf:
+ uri = make_mdmf_mutable_file_cap()
+ else:
+ uri = make_mutable_file_cap()
+
+ return uri.to_string()
def make_verifier_uri():
return uri.SSKVerifierURI(storage_index=os.urandom(16),
fingerprint=os.urandom(32)).to_string()
-class FakeDirectoryNode(dirnode.DirectoryNode):
- """This offers IDirectoryNode, but uses a FakeMutableFileNode for the
- backing store, so it doesn't go to the grid. The child data is still
- encrypted and serialized, so this isn't useful for tests that want to
- look inside the dirnodes and check their contents.
- """
- filenode_class = FakeMutableFileNode
+def create_mutable_filenode(contents, mdmf=False, all_contents=None):
+ # XXX: All of these arguments are kind of stupid.
+ if mdmf:
+ cap = make_mdmf_mutable_file_cap()
+ else:
+ cap = make_mutable_file_cap()
+
+ encoding_params = {}
+ encoding_params['k'] = 3
+ encoding_params['max_segment_size'] = 128*1024
+
+ filenode = FakeMutableFileNode(None, None, encoding_params, None,
+ all_contents)
+ filenode.init_from_cap(cap)
+ if mdmf:
+ filenode.create(MutableData(contents), version=MDMF_VERSION)
+ else:
+ filenode.create(MutableData(contents), version=SDMF_VERSION)
+ return filenode
+
class LoggingServiceParent(service.MultiService):
def log(self, *args, **kwargs):
return log.msg(*args, **kwargs)
-
class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
# SystemTestMixin tests tend to be a lot of work, and we have a few
iv_dir = self.getdir("introducer")
if not os.path.isdir(iv_dir):
fileutil.make_dirs(iv_dir)
- f = open(os.path.join(iv_dir, "webport"), "w")
- f.write("tcp:0:interface=127.0.0.1\n")
- f.close()
+ fileutil.write(os.path.join(iv_dir, 'tahoe.cfg'),
+ "[node]\n" +
+ u"nickname = introducer \u263A\n".encode('utf-8') +
+ "web.port = tcp:0:interface=127.0.0.1\n")
if SYSTEM_TEST_CERTS:
os.mkdir(os.path.join(iv_dir, "private"))
f = open(os.path.join(iv_dir, "private", "node.pem"), "w")
self.key_generator_svc = KeyGeneratorService(kgsdir,
display_furl=False,
- default_key_size=522)
+ default_key_size=TEST_RSA_KEY_SIZE)
self.key_generator_svc.key_generator.pool_size = 4
self.key_generator_svc.key_generator.pool_refresh_delay = 60
self.add_service(self.key_generator_svc)
f.write(SYSTEM_TEST_CERTS[i+1])
f.close()
- def write(name, value):
- open(os.path.join(basedir, name), "w").write(value+"\n")
+ config = "[client]\n"
+ config += "introducer.furl = %s\n" % self.introducer_furl
+ if self.stats_gatherer_furl:
+ config += "stats_gatherer.furl = %s\n" % self.stats_gatherer_furl
+
+ nodeconfig = "[node]\n"
+ nodeconfig += (u"nickname = client %d \u263A\n" % (i,)).encode('utf-8')
+
if i == 0:
# clients[0] runs a webserver and a helper, no key_generator
- write("webport", "tcp:0:interface=127.0.0.1")
- write("run_helper", "yes")
- write("keepalive_timeout", "600")
- if i == 3:
+ config += nodeconfig
+ config += "web.port = tcp:0:interface=127.0.0.1\n"
+ config += "timeout.keepalive = 600\n"
+ config += "[helper]\n"
+ config += "enabled = True\n"
+ elif i == 3:
# clients[3] runs a webserver and uses a helper, uses
# key_generator
- write("webport", "tcp:0:interface=127.0.0.1")
- write("disconnect_timeout", "1800")
if self.key_generator_furl:
- kgf = "%s\n" % (self.key_generator_furl,)
- write("key_generator.furl", kgf)
- write("introducer.furl", self.introducer_furl)
- if self.stats_gatherer_furl:
- write("stats_gatherer.furl", self.stats_gatherer_furl)
+ config += "key_generator.furl = %s\n" % self.key_generator_furl
+ config += nodeconfig
+ config += "web.port = tcp:0:interface=127.0.0.1\n"
+ config += "timeout.disconnect = 1800\n"
+ else:
+ config += nodeconfig
+
+ fileutil.write(os.path.join(basedir, 'tahoe.cfg'), config)
# give subclasses a chance to append lines to the node's tahoe.cfg
# files before they are launched.
# will have registered the helper furl).
c = self.add_service(client.Client(basedir=basedirs[0]))
self.clients.append(c)
- c.set_default_mutable_keysize(522)
+ c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE)
d = c.when_tub_ready()
def _ready(res):
f = open(os.path.join(basedirs[0],"private","helper.furl"), "r")
f.close()
self.helper_furl = helper_furl
if self.numclients >= 4:
- f = open(os.path.join(basedirs[3],"helper.furl"), "w")
- f.write(helper_furl)
+ f = open(os.path.join(basedirs[3], 'tahoe.cfg'), 'ab+')
+ f.write(
+ "[client]\n"
+ "helper.furl = %s\n" % helper_furl)
f.close()
# this starts the rest of the clients
for i in range(1, self.numclients):
c = self.add_service(client.Client(basedir=basedirs[i]))
self.clients.append(c)
- c.set_default_mutable_keysize(522)
+ c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE)
log.msg("STARTING")
return self.wait_for_connections()
d.addCallback(_ready)
def _stopped(res):
new_c = client.Client(basedir=self.getdir("client%d" % num))
self.clients[num] = new_c
- new_c.set_default_mutable_keysize(522)
+ new_c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE)
self.add_service(new_c)
return new_c.when_tub_ready()
d.addCallback(_stopped)
basedir = self.getdir("client%d" % client_num)
if not os.path.isdir(basedir):
fileutil.make_dirs(basedir)
- open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
+ config = "[client]\n"
+ config += "introducer.furl = %s\n" % self.introducer_furl
if helper_furl:
- f = open(os.path.join(basedir, "helper.furl") ,"w")
- f.write(helper_furl+"\n")
- f.close()
+ config += "helper.furl = %s\n" % helper_furl
+ fileutil.write(os.path.join(basedir, 'tahoe.cfg'), config)
c = client.Client(basedir=basedir)
self.clients.append(c)
- c.set_default_mutable_keysize(522)
+ c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE)
self.numclients += 1
if add_to_sparent:
c.setServiceParent(self.sparent)
if not c.connected_to_introducer():
return False
sb = c.get_storage_broker()
- if len(sb.get_all_servers()) != self.numclients:
+ if len(sb.get_connected_servers()) != self.numclients:
+ return False
+ up = c.getServiceNamed("uploader")
+ if up._helper_furl and not up._helper:
return False
return True
def wait_for_connections(self, ignored=None):
- # TODO: replace this with something that takes a list of peerids and
- # fires when they've all been heard from, instead of using a count
- # and a threshold
return self.poll(self._check_connections, timeout=200)
TEST_DATA="\x02"*(immutable.upload.Uploader.URI_LIT_SIZE_THRESHOLD+1)
-class ShareManglingMixin(SystemTestMixin):
-
- def setUp(self):
- # Set self.basedir to a temp dir which has the name of the current
- # test method in its name.
- self.basedir = self.mktemp()
-
- d = defer.maybeDeferred(SystemTestMixin.setUp, self)
- d.addCallback(lambda x: self.set_up_nodes())
-
- def _upload_a_file(ignored):
- cl0 = self.clients[0]
- # We need multiple segments to test crypttext hash trees that are
- # non-trivial (i.e. they have more than just one hash in them).
- cl0.DEFAULT_ENCODING_PARAMETERS['max_segment_size'] = 12
- # Tests that need to test servers of happiness using this should
- # set their own value for happy -- the default (7) breaks stuff.
- cl0.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
- d2 = cl0.upload(immutable.upload.Data(TEST_DATA, convergence=""))
- def _after_upload(u):
- filecap = u.uri
- self.n = self.clients[1].create_node_from_uri(filecap)
- self.uri = uri.CHKFileURI.init_from_string(filecap)
- return cl0.create_node_from_uri(filecap)
- d2.addCallback(_after_upload)
- return d2
- d.addCallback(_upload_a_file)
-
- def _stash_it(filenode):
- self.filenode = filenode
- d.addCallback(_stash_it)
- return d
-
- def find_all_shares(self, unused=None):
- """Locate shares on disk. Returns a dict that maps
- (clientnum,sharenum) to a string that contains the share container
- (copied directly from the disk, containing leases etc). You can
- modify this dict and then call replace_shares() to modify the shares.
- """
- shares = {} # k: (i, sharenum), v: data
-
- for i, c in enumerate(self.clients):
- sharedir = c.getServiceNamed("storage").sharedir
- for (dirp, dirns, fns) in os.walk(sharedir):
- for fn in fns:
- try:
- sharenum = int(fn)
- except TypeError:
- # Whoops, I guess that's not a share file then.
- pass
- else:
- data = open(os.path.join(sharedir, dirp, fn), "rb").read()
- shares[(i, sharenum)] = data
-
- return shares
-
- def replace_shares(self, newshares, storage_index):
- """Replace shares on disk. Takes a dictionary in the same form
- as find_all_shares() returns."""
-
- for i, c in enumerate(self.clients):
- sharedir = c.getServiceNamed("storage").sharedir
- for (dirp, dirns, fns) in os.walk(sharedir):
- for fn in fns:
- try:
- sharenum = int(fn)
- except TypeError:
- # Whoops, I guess that's not a share file then.
- pass
- else:
- pathtosharefile = os.path.join(sharedir, dirp, fn)
- os.unlink(pathtosharefile)
- for ((clientnum, sharenum), newdata) in newshares.iteritems():
- if clientnum == i:
- fullsharedirp=os.path.join(sharedir, storage_index_to_dir(storage_index))
- fileutil.make_dirs(fullsharedirp)
- wf = open(os.path.join(fullsharedirp, str(sharenum)), "wb")
- wf.write(newdata)
- wf.close()
-
- def _delete_a_share(self, unused=None, sharenum=None):
- """ Delete one share. """
-
- shares = self.find_all_shares()
- ks = shares.keys()
- if sharenum is not None:
- k = [ key for key in shares.keys() if key[1] == sharenum ][0]
- else:
- k = random.choice(ks)
- del shares[k]
- self.replace_shares(shares, storage_index=self.uri.get_storage_index())
-
- return unused
-
- def _corrupt_a_share(self, unused, corruptor_func, sharenum):
- shares = self.find_all_shares()
- ks = [ key for key in shares.keys() if key[1] == sharenum ]
- assert ks, (shares.keys(), sharenum)
- k = ks[0]
- shares[k] = corruptor_func(shares[k])
- self.replace_shares(shares, storage_index=self.uri.get_storage_index())
- return corruptor_func
-
- def _corrupt_all_shares(self, unused, corruptor_func):
- """ All shares on disk will be corrupted by corruptor_func. """
- shares = self.find_all_shares()
- for k in shares.keys():
- self._corrupt_a_share(unused, corruptor_func, k[1])
- return corruptor_func
-
- def _corrupt_a_random_share(self, unused, corruptor_func):
- """ Exactly one share on disk will be corrupted by corruptor_func. """
- shares = self.find_all_shares()
- ks = shares.keys()
- k = random.choice(ks)
- self._corrupt_a_share(unused, corruptor_func, k[1])
- return k[1]
-
- def _count_reads(self):
- sum_of_read_counts = 0
- for thisclient in self.clients:
- counters = thisclient.stats_provider.get_stats()['counters']
- sum_of_read_counts += counters.get('storage_server.read', 0)
- return sum_of_read_counts
-
- def _count_allocates(self):
- sum_of_allocate_counts = 0
- for thisclient in self.clients:
- counters = thisclient.stats_provider.get_stats()['counters']
- sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
- return sum_of_allocate_counts
-
- def _count_writes(self):
- sum_of_write_counts = 0
- for thisclient in self.clients:
- counters = thisclient.stats_provider.get_stats()['counters']
- sum_of_write_counts += counters.get('storage_server.write', 0)
- return sum_of_write_counts
-
- def _download_and_check_plaintext(self, unused=None):
- d = download_to_data(self.n)
- def _after_download(result):
- self.failUnlessEqual(result, TEST_DATA)
- d.addCallback(_after_download)
- return d
-
class ShouldFailMixin:
def shouldFail(self, expected_failure, which, substring,
callable, *args, **kwargs):
if substring:
message = repr(res.value.args[0])
self.failUnless(substring in message,
- "substring '%s' not in '%s'"
- % (substring, message))
+ "%s: substring '%s' not in '%s'"
+ % (which, substring, message))
else:
self.fail("%s was supposed to raise %s, not get '%s'" %
(which, expected_failure, res))
assert callable
def _validate(f):
if code is not None:
- self.failUnlessEqual(f.value.status, str(code))
+ self.failUnlessEqual(f.value.status, str(code), which)
if substring:
code_string = str(f)
self.failUnless(substring in code_string,
- "substring '%s' not in '%s'"
- % (substring, code_string))
+ "%s: substring '%s' not in '%s'"
+ % (which, substring, code_string))
response_body = f.value.response
if response_substring:
self.failUnless(response_substring in response_body,
- "response substring '%s' not in '%s'"
- % (response_substring, response_body))
+ "%s: response substring '%s' not in '%s'"
+ % (which, response_substring, response_body))
return response_body
d = defer.maybeDeferred(callable, *args, **kwargs)
d.addBoth(self._shouldHTTPError, which, _validate)
return corrupt_field(data, 0x0c+0x44, sharedatasize)
+def _corrupt_share_data_last_byte(data, debug=False):
+ """Scramble the file data -- flip all bits of the last byte."""
+ sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+ assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways, not v%d." % sharevernum
+ if sharevernum == 1:
+ sharedatasize = struct.unpack(">L", data[0x0c+0x08:0x0c+0x08+4])[0]
+ offset = 0x0c+0x24+sharedatasize-1
+ else:
+ sharedatasize = struct.unpack(">Q", data[0x0c+0x08:0x0c+0x0c+8])[0]
+ offset = 0x0c+0x44+sharedatasize-1
+
+ newdata = data[:offset] + chr(ord(data[offset])^0xFF) + data[offset+1:]
+ if debug:
+ log.msg("testing: flipping all bits of byte at offset %d: %r, newdata: %r" % (offset, data[offset], newdata[offset]))
+ return newdata
+
def _corrupt_crypttext_hash_tree(data, debug=False):
"""Scramble the file data -- the field containing the crypttext hash tree
will have one bit flipped or else will be changed to a random value.