import os, re, base64
from cStringIO import StringIO
+
from twisted.trial import unittest
from twisted.internet import defer, reactor
+
from allmydata import uri, client
from allmydata.nodemaker import NodeMaker
from allmydata.util import base32, consumer, fileutil, mathutil
+from allmydata.util.fileutil import abspath_expanduser_unicode
from allmydata.util.hashutil import tagged_hash, ssk_writekey_hash, \
ssk_pubkey_fingerprint_hash
from allmydata.util.consumer import MemoryConsumer
from allmydata.monitor import Monitor
from allmydata.test.common import ShouldFailMixin
from allmydata.test.no_network import GridTestMixin
-from foolscap.api import eventually, fireEventually
+from foolscap.api import eventually, fireEventually, flushEventualQueue
from foolscap.logging import log
from allmydata.storage_client import StorageFarmBroker
from allmydata.storage.common import storage_index_to_dir
from allmydata.scripts import debug
from allmydata.mutable.filenode import MutableFileNode, BackoffAgent
-from allmydata.mutable.common import ResponseCache, \
+from allmydata.mutable.common import \
MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
NeedMoreDataError, UnrecoverableFileError, UncoordinatedWriteError, \
NotEnoughServersError, CorruptShareError
dumped = servermap.dump(StringIO())
self.failUnlessIn("3-of-10", dumped.getvalue())
d.addCallback(_then)
- # Now overwrite the contents with some new contents. We want
+ # Now overwrite the contents with some new contents. We want
# to make them big enough to force the file to be uploaded
# in more than one segment.
big_contents = "contents1" * 100000 # about 900 KiB
# before, they need to be big enough to force multiple
# segments, so that we make the downloader deal with
# multiple segments.
- bigger_contents = "contents2" * 1000000 # about 9MiB
+ bigger_contents = "contents2" * 1000000 # about 9MiB
bigger_contents_uploadable = MutableData(bigger_contents)
d.addCallback(lambda ignored:
n.overwrite(bigger_contents_uploadable))
d.addCallback(_created)
return d
-
- def test_response_cache_memory_leak(self):
- d = self.nodemaker.create_mutable_file("contents")
- def _created(n):
- d = n.download_best_version()
- d.addCallback(lambda res: self.failUnlessEqual(res, "contents"))
- d.addCallback(lambda ign: self.failUnless(isinstance(n._cache, ResponseCache)))
-
- def _check_cache(expected):
- # The total size of cache entries should not increase on the second download;
- # in fact the cache contents should be identical.
- d2 = n.download_best_version()
- d2.addCallback(lambda rep: self.failUnlessEqual(repr(n._cache.cache), expected))
- return d2
- d.addCallback(lambda ign: _check_cache(repr(n._cache.cache)))
- return d
- d.addCallback(_created)
- return d
-
def test_create_with_initial_contents_function(self):
data = "initial contents"
def _make_contents(n):
d.addCallback(_created)
return d
- def publish_mdmf(self):
+ def publish_mdmf(self, data=None):
# like publish_one, except that the result is guaranteed to be
# an MDMF file.
# self.CONTENTS should have more than one segment.
- self.CONTENTS = "This is an MDMF file" * 100000
+ if data is None:
+ data = "This is an MDMF file" * 100000
+ self.CONTENTS = data
self.uploadable = MutableData(self.CONTENTS)
self._storage = FakeStorage()
self._nodemaker = make_nodemaker(self._storage)
return d
- def publish_sdmf(self):
+ def publish_sdmf(self, data=None):
# like publish_one, except that the result is guaranteed to be
# an SDMF file
- self.CONTENTS = "This is an SDMF file" * 1000
+ if data is None:
+ data = "This is an SDMF file" * 1000
+ self.CONTENTS = data
self.uploadable = MutableData(self.CONTENTS)
self._storage = FakeStorage()
self._nodemaker = make_nodemaker(self._storage)
d.addCallback(_created)
return d
- def publish_empty_sdmf(self):
- self.CONTENTS = ""
- self.uploadable = MutableData(self.CONTENTS)
- self._storage = FakeStorage()
- self._nodemaker = make_nodemaker(self._storage, keysize=None)
- self._storage_broker = self._nodemaker.storage_broker
- d = self._nodemaker.create_mutable_file(self.uploadable,
- version=SDMF_VERSION)
- def _created(node):
- self._fn = node
- self._fn2 = self._nodemaker.create_from_cap(node.get_uri())
- d.addCallback(_created)
- return d
-
def publish_multiple(self, version=0):
self.CONTENTS = ["Contents 0",
d.addCallback(_remove_shares)
return d
+ def test_all_but_two_shares_vanished_updated_servermap(self):
+ # tests error reporting for ticket #1742
+ d = self.make_servermap()
+ def _remove_shares(servermap):
+ self._version = servermap.best_recoverable_version()
+ for shares in self._storage._peers.values()[2:]:
+ shares.clear()
+ return self.make_servermap(servermap)
+ d.addCallback(_remove_shares)
+ def _check(updated_servermap):
+ d1 = self.shouldFail(NotEnoughSharesError,
+ "test_all_but_two_shares_vanished_updated_servermap",
+ "ran out of servers",
+ self.do_download, updated_servermap, version=self._version)
+ return d1
+ d.addCallback(_check)
+ return d
+
def test_no_servers(self):
sb2 = make_storagebroker(num_peers=0)
# if there are no servers, then a MODE_READ servermap should come
def test_corrupt_all_encprivkey_late(self):
- # this should work for the same reason as above, but we corrupt
+ # this should work for the same reason as above, but we corrupt
# after the servermap update to exercise the error handling
# code.
# We need to remove the privkey from the node, or the retrieve
corrupt_early=False,
failure_checker=_check)
- def test_corrupt_all_block_hash_tree_late(self):
- def _check(res):
- f = res[0]
- self.failUnless(f.check(NotEnoughSharesError))
- return self._test_corrupt_all("block_hash_tree",
- "block hash tree failure",
- corrupt_early=False,
- failure_checker=_check)
-
def test_corrupt_all_block_late(self):
def _check(res):
d.addCallback(lambda ignored:
self._test_corrupt_all(("block_hash_tree", 12 * 32),
"block hash tree failure",
- corrupt_early=False,
+ corrupt_early=True,
should_succeed=False))
return d
def test_corrupt_mdmf_block_hash_tree_late(self):
+ # Note - there is no SDMF counterpart to this test, as the SDMF
+ # files are guaranteed to have exactly one block, and therefore
+ # the block hash tree fits within the initial read (#1240).
d = self.publish_mdmf()
d.addCallback(lambda ignored:
self._test_corrupt_all(("block_hash_tree", 12 * 32),
"block hash tree failure",
- corrupt_early=True,
+ corrupt_early=False,
should_succeed=False))
return d
return r
def check_expected_failure(self, r, expected_exception, substring, where):
- for (peerid, storage_index, shnum, f) in r.problems:
+ for (peerid, storage_index, shnum, f) in r.get_share_problems():
if f.check(expected_exception):
self.failUnless(substring in str(f),
"%s: substring '%s' not in '%s'" %
(where, substring, str(f)))
return
self.fail("%s: didn't see expected exception %s in problems %s" %
- (where, expected_exception, r.problems))
+ (where, expected_exception, r.get_share_problems()))
class Checker(unittest.TestCase, CheckerMixin, PublishMixin):
"test_verify_mdmf_bad_encprivkey_uncheckable")
return d
+ def test_verify_sdmf_empty(self):
+ d = self.publish_sdmf("")
+ d.addCallback(lambda ignored: self._fn.check(Monitor(), verify=True))
+ d.addCallback(self.check_good, "test_verify_sdmf")
+ d.addCallback(flushEventualQueue)
+ return d
+
+ def test_verify_mdmf_empty(self):
+ d = self.publish_mdmf("")
+ d.addCallback(lambda ignored: self._fn.check(Monitor(), verify=True))
+ d.addCallback(self.check_good, "test_verify_mdmf")
+ d.addCallback(flushEventualQueue)
+ return d
class Repair(unittest.TestCase, PublishMixin, ShouldFailMixin):
self.failUnlessEqual(old_shares, current_shares)
- def test_unrepairable_0shares(self):
- d = self.publish_one()
- def _delete_all_shares(ign):
+ def _test_whether_repairable(self, publisher, nshares, expected_result):
+ d = publisher()
+ def _delete_some_shares(ign):
shares = self._storage._peers
for peerid in shares:
- shares[peerid] = {}
- d.addCallback(_delete_all_shares)
+ for shnum in list(shares[peerid]):
+ if shnum >= nshares:
+ del shares[peerid][shnum]
+ d.addCallback(_delete_some_shares)
d.addCallback(lambda ign: self._fn.check(Monitor()))
- d.addCallback(lambda check_results: self._fn.repair(check_results))
- def _check(crr):
- self.failUnlessEqual(crr.get_successful(), False)
+ def _check(cr):
+ self.failIf(cr.is_healthy())
+ self.failUnlessEqual(cr.is_recoverable(), expected_result)
+ return cr
d.addCallback(_check)
- return d
-
- def test_mdmf_unrepairable_0shares(self):
- d = self.publish_mdmf()
- def _delete_all_shares(ign):
- shares = self._storage._peers
- for peerid in shares:
- shares[peerid] = {}
- d.addCallback(_delete_all_shares)
- d.addCallback(lambda ign: self._fn.check(Monitor()))
d.addCallback(lambda check_results: self._fn.repair(check_results))
- d.addCallback(lambda crr: self.failIf(crr.get_successful()))
+ d.addCallback(lambda crr: self.failUnlessEqual(crr.get_successful(), expected_result))
return d
+ def test_unrepairable_0shares(self):
+ return self._test_whether_repairable(self.publish_one, 0, False)
+
+ def test_mdmf_unrepairable_0shares(self):
+ return self._test_whether_repairable(self.publish_mdmf, 0, False)
def test_unrepairable_1share(self):
- d = self.publish_one()
- def _delete_all_shares(ign):
- shares = self._storage._peers
- for peerid in shares:
- for shnum in list(shares[peerid]):
- if shnum > 0:
- del shares[peerid][shnum]
- d.addCallback(_delete_all_shares)
- d.addCallback(lambda ign: self._fn.check(Monitor()))
- d.addCallback(lambda check_results: self._fn.repair(check_results))
- def _check(crr):
- self.failUnlessEqual(crr.get_successful(), False)
- d.addCallback(_check)
- return d
+ return self._test_whether_repairable(self.publish_one, 1, False)
def test_mdmf_unrepairable_1share(self):
- d = self.publish_mdmf()
- def _delete_all_shares(ign):
- shares = self._storage._peers
- for peerid in shares:
- for shnum in list(shares[peerid]):
- if shnum > 0:
- del shares[peerid][shnum]
- d.addCallback(_delete_all_shares)
- d.addCallback(lambda ign: self._fn.check(Monitor()))
- d.addCallback(lambda check_results: self._fn.repair(check_results))
- def _check(crr):
- self.failUnlessEqual(crr.get_successful(), False)
- d.addCallback(_check)
- return d
+ return self._test_whether_repairable(self.publish_mdmf, 1, False)
def test_repairable_5shares(self):
- d = self.publish_mdmf()
- def _delete_all_shares(ign):
- shares = self._storage._peers
- for peerid in shares:
- for shnum in list(shares[peerid]):
- if shnum > 4:
- del shares[peerid][shnum]
- d.addCallback(_delete_all_shares)
- d.addCallback(lambda ign: self._fn.check(Monitor()))
- d.addCallback(lambda check_results: self._fn.repair(check_results))
- def _check(crr):
- self.failUnlessEqual(crr.get_successful(), True)
- d.addCallback(_check)
- return d
+ return self._test_whether_repairable(self.publish_one, 5, True)
def test_mdmf_repairable_5shares(self):
- d = self.publish_mdmf()
+ return self._test_whether_repairable(self.publish_mdmf, 5, True)
+
+ def _test_whether_checkandrepairable(self, publisher, nshares, expected_result):
+ """
+ Like the _test_whether_repairable tests, but invoking check_and_repair
+ instead of invoking check and then invoking repair.
+ """
+ d = publisher()
def _delete_some_shares(ign):
shares = self._storage._peers
for peerid in shares:
for shnum in list(shares[peerid]):
- if shnum > 5:
+ if shnum >= nshares:
del shares[peerid][shnum]
d.addCallback(_delete_some_shares)
- d.addCallback(lambda ign: self._fn.check(Monitor()))
- def _check(cr):
- self.failIf(cr.is_healthy())
- self.failUnless(cr.is_recoverable())
- return cr
- d.addCallback(_check)
- d.addCallback(lambda check_results: self._fn.repair(check_results))
- def _check1(crr):
- self.failUnlessEqual(crr.get_successful(), True)
- d.addCallback(_check1)
+ d.addCallback(lambda ign: self._fn.check_and_repair(Monitor()))
+ d.addCallback(lambda crr: self.failUnlessEqual(crr.get_repair_successful(), expected_result))
return d
+ def test_unrepairable_0shares_checkandrepair(self):
+ return self._test_whether_checkandrepairable(self.publish_one, 0, False)
+
+ def test_mdmf_unrepairable_0shares_checkandrepair(self):
+ return self._test_whether_checkandrepairable(self.publish_mdmf, 0, False)
+
+ def test_unrepairable_1share_checkandrepair(self):
+ return self._test_whether_checkandrepairable(self.publish_one, 1, False)
+
+ def test_mdmf_unrepairable_1share_checkandrepair(self):
+ return self._test_whether_checkandrepairable(self.publish_mdmf, 1, False)
+
+ def test_repairable_5shares_checkandrepair(self):
+ return self._test_whether_checkandrepairable(self.publish_one, 5, True)
+
+ def test_mdmf_repairable_5shares_checkandrepair(self):
+ return self._test_whether_checkandrepairable(self.publish_mdmf, 5, True)
+
def test_merge(self):
self.old_shares = []
# In the buggy version, the check that precedes the retrieve+publish
# cycle uses MODE_READ, instead of MODE_REPAIR, and fails to get the
# privkey that repair needs.
- d = self.publish_empty_sdmf()
+ d = self.publish_sdmf("")
def _delete_one_share(ign):
shares = self._storage._peers
for peerid in shares:
# then mix up the shares, to make sure that download survives seeing
# a variety of encodings. This is actually kind of tricky to set up.
- contents1 = "Contents for encoding 1 (3-of-10) go here"
- contents2 = "Contents for encoding 2 (4-of-9) go here"
- contents3 = "Contents for encoding 3 (4-of-7) go here"
+ contents1 = "Contents for encoding 1 (3-of-10) go here"*1000
+ contents2 = "Contents for encoding 2 (4-of-9) go here"*1000
+ contents3 = "Contents for encoding 3 (4-of-7) go here"*1000
# we make a retrieval object that doesn't know what encoding
# parameters to use
return d
-class Utils(unittest.TestCase):
- def test_cache(self):
- c = ResponseCache()
- # xdata = base62.b2a(os.urandom(100))[:100]
- xdata = "1Ex4mdMaDyOl9YnGBM3I4xaBF97j8OQAg1K3RBR01F2PwTP4HohB3XpACuku8Xj4aTQjqJIR1f36mEj3BCNjXaJmPBEZnnHL0U9l"
- ydata = "4DCUQXvkEPnnr9Lufikq5t21JsnzZKhzxKBhLhrBB6iIcBOWRuT4UweDhjuKJUre8A4wOObJnl3Kiqmlj4vjSLSqUGAkUD87Y3vs"
- c.add("v1", 1, 0, xdata)
- c.add("v1", 1, 2000, ydata)
- self.failUnlessEqual(c.read("v2", 1, 10, 11), None)
- self.failUnlessEqual(c.read("v1", 2, 10, 11), None)
- self.failUnlessEqual(c.read("v1", 1, 0, 10), xdata[:10])
- self.failUnlessEqual(c.read("v1", 1, 90, 10), xdata[90:])
- self.failUnlessEqual(c.read("v1", 1, 300, 10), None)
- self.failUnlessEqual(c.read("v1", 1, 2050, 5), ydata[50:55])
- self.failUnlessEqual(c.read("v1", 1, 0, 101), None)
- self.failUnlessEqual(c.read("v1", 1, 99, 1), xdata[99:100])
- self.failUnlessEqual(c.read("v1", 1, 100, 1), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 9), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 10), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 11), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 15), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 19), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 20), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 21), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 25), None)
- self.failUnlessEqual(c.read("v1", 1, 1999, 25), None)
-
- # test joining fragments
- c = ResponseCache()
- c.add("v1", 1, 0, xdata[:10])
- c.add("v1", 1, 10, xdata[10:20])
- self.failUnlessEqual(c.read("v1", 1, 0, 20), xdata[:20])
-
class Exceptions(unittest.TestCase):
def test_repr(self):
nmde = NeedMoreDataError(100, 50, 100)
ucwe = UncoordinatedWriteError()
self.failUnless("UncoordinatedWriteError" in repr(ucwe), repr(ucwe))
+
class SameKeyGenerator:
def __init__(self, pubkey, privkey):
self.pubkey = pubkey
self.basedir = "mutable/Problems/test_retrieve_surprise"
self.set_up_grid()
nm = self.g.clients[0].nodemaker
- d = nm.create_mutable_file(MutableData("contents 1"))
+ d = nm.create_mutable_file(MutableData("contents 1"*4000))
def _created(n):
d = defer.succeed(None)
d.addCallback(lambda res: n.get_servermap(MODE_READ))
# now attempt to retrieve the old version with the old servermap.
# This will look like someone has changed the file since we
# updated the servermap.
- d.addCallback(lambda res: n._cache._clear())
d.addCallback(lambda res: log.msg("starting doomed read"))
d.addCallback(lambda res:
self.shouldFail(NotEnoughSharesError,
self.c = self.g.clients[0]
self.nm = self.c.nodemaker
self.data = "test data" * 100000 # about 900 KiB; MDMF
- self.small_data = "test data" * 10 # about 90 B; SDMF
+ self.small_data = "test data" * 10 # 90 B; SDMF
- def do_upload_mdmf(self):
- d = self.nm.create_mutable_file(MutableData(self.data),
+ def do_upload_mdmf(self, data=None):
+ if data is None:
+ data = self.data
+ d = self.nm.create_mutable_file(MutableData(data),
version=MDMF_VERSION)
def _then(n):
assert isinstance(n, MutableFileNode)
d.addCallback(_then)
return d
- def do_upload_sdmf(self):
- d = self.nm.create_mutable_file(MutableData(self.small_data))
+ def do_upload_sdmf(self, data=None):
+ if data is None:
+ data = self.small_data
+ d = self.nm.create_mutable_file(MutableData(data))
def _then(n):
assert isinstance(n, MutableFileNode)
assert n._protocol_version == SDMF_VERSION
fso = debug.FindSharesOptions()
storage_index = base32.b2a(n.get_storage_index())
fso.si_s = storage_index
- fso.nodedirs = [unicode(os.path.dirname(os.path.abspath(storedir)))
+ fso.nodedirs = [os.path.dirname(abspath_expanduser_unicode(unicode(storedir)))
for (i,ss,storedir)
in self.iterate_servers()]
fso.stdout = StringIO()
return d
- def test_partial_read(self):
- d = self.do_upload_mdmf()
- d.addCallback(lambda ign: self.mdmf_node.get_best_readable_version())
- modes = [("start_on_segment_boundary",
- mathutil.next_multiple(128 * 1024, 3), 50),
- ("ending_one_byte_after_segment_boundary",
- mathutil.next_multiple(128 * 1024, 3)-50, 51),
- ("zero_length_at_start", 0, 0),
- ("zero_length_in_middle", 50, 0),
- ("zero_length_at_segment_boundary",
- mathutil.next_multiple(128 * 1024, 3), 0),
- ]
+ def _test_partial_read(self, node, expected, modes, step):
+ d = node.get_best_readable_version()
for (name, offset, length) in modes:
- d.addCallback(self._do_partial_read, name, offset, length)
- # then read only a few bytes at a time, and see that the results are
- # what we expect.
+ d.addCallback(self._do_partial_read, name, expected, offset, length)
+ # then read the whole thing, but only a few bytes at a time, and see
+ # that the results are what we expect.
def _read_data(version):
c = consumer.MemoryConsumer()
d2 = defer.succeed(None)
- for i in xrange(0, len(self.data), 10000):
- d2.addCallback(lambda ignored, i=i: version.read(c, i, 10000))
+ for i in xrange(0, len(expected), step):
+ d2.addCallback(lambda ignored, i=i: version.read(c, i, step))
d2.addCallback(lambda ignored:
- self.failUnlessEqual(self.data, "".join(c.chunks)))
+ self.failUnlessEqual(expected, "".join(c.chunks)))
return d2
d.addCallback(_read_data)
return d
- def _do_partial_read(self, version, name, offset, length):
+
+ def _do_partial_read(self, version, name, expected, offset, length):
c = consumer.MemoryConsumer()
d = version.read(c, offset, length)
- expected = self.data[offset:offset+length]
+ if length is None:
+ expected_range = expected[offset:]
+ else:
+ expected_range = expected[offset:offset+length]
d.addCallback(lambda ignored: "".join(c.chunks))
def _check(results):
- if results != expected:
- print
+ if results != expected_range:
+ print "read([%d]+%s) got %d bytes, not %d" % \
+ (offset, length, len(results), len(expected_range))
print "got: %s ... %s" % (results[:20], results[-20:])
- print "exp: %s ... %s" % (expected[:20], expected[-20:])
- self.fail("results[%s] != expected" % name)
+ print "exp: %s ... %s" % (expected_range[:20], expected_range[-20:])
+ self.fail("results[%s] != expected_range" % name)
return version # daisy-chained to next call
d.addCallback(_check)
return d
+ def test_partial_read_mdmf_0(self):
+ data = ""
+ d = self.do_upload_mdmf(data=data)
+ modes = [("all1", 0,0),
+ ("all2", 0,None),
+ ]
+ d.addCallback(self._test_partial_read, data, modes, 1)
+ return d
+
+ def test_partial_read_mdmf_large(self):
+ segment_boundary = mathutil.next_multiple(128 * 1024, 3)
+ modes = [("start_on_segment_boundary", segment_boundary, 50),
+ ("ending_one_byte_after_segment_boundary", segment_boundary-50, 51),
+ ("zero_length_at_start", 0, 0),
+ ("zero_length_in_middle", 50, 0),
+ ("zero_length_at_segment_boundary", segment_boundary, 0),
+ ("complete_file1", 0, len(self.data)),
+ ("complete_file2", 0, None),
+ ]
+ d = self.do_upload_mdmf()
+ d.addCallback(self._test_partial_read, self.data, modes, 10000)
+ return d
+
+ def test_partial_read_sdmf_0(self):
+ data = ""
+ modes = [("all1", 0,0),
+ ("all2", 0,None),
+ ]
+ d = self.do_upload_sdmf(data=data)
+ d.addCallback(self._test_partial_read, data, modes, 1)
+ return d
+
+ def test_partial_read_sdmf_2(self):
+ data = "hi"
+ modes = [("one_byte", 0, 1),
+ ("last_byte", 1, 1),
+ ("last_byte2", 1, None),
+ ("complete_file", 0, 2),
+ ("complete_file2", 0, None),
+ ]
+ d = self.do_upload_sdmf(data=data)
+ d.addCallback(self._test_partial_read, data, modes, 1)
+ return d
+
+ def test_partial_read_sdmf_90(self):
+ modes = [("start_at_middle", 50, 40),
+ ("start_at_middle2", 50, None),
+ ("zero_length_at_start", 0, 0),
+ ("zero_length_in_middle", 50, 0),
+ ("zero_length_at_end", 90, 0),
+ ("complete_file1", 0, None),
+ ("complete_file2", 0, 90),
+ ]
+ d = self.do_upload_sdmf()
+ d.addCallback(self._test_partial_read, self.small_data, modes, 10)
+ return d
+
+ def test_partial_read_sdmf_100(self):
+ data = "test data "*10
+ modes = [("start_at_middle", 50, 50),
+ ("start_at_middle2", 50, None),
+ ("zero_length_at_start", 0, 0),
+ ("zero_length_in_middle", 50, 0),
+ ("complete_file1", 0, 100),
+ ("complete_file2", 0, None),
+ ]
+ d = self.do_upload_sdmf(data=data)
+ d.addCallback(self._test_partial_read, data, modes, 10)
+ return d
+
def _test_read_and_download(self, node, expected):
d = node.get_best_readable_version()
def _read_data(version):
c = consumer.MemoryConsumer()
+ c2 = consumer.MemoryConsumer()
d2 = defer.succeed(None)
d2.addCallback(lambda ignored: version.read(c))
d2.addCallback(lambda ignored:
self.failUnlessEqual(expected, "".join(c.chunks)))
+
+ d2.addCallback(lambda ignored: version.read(c2, offset=0,
+ size=len(expected)))
+ d2.addCallback(lambda ignored:
+ self.failUnlessEqual(expected, "".join(c2.chunks)))
return d2
d.addCallback(_read_data)
d.addCallback(lambda ignored: node.download_best_version())
def setUp(self):
GridTestMixin.setUp(self)
self.basedir = self.mktemp()
- self.set_up_grid()
+ self.set_up_grid(num_servers=13)
self.c = self.g.clients[0]
self.nm = self.c.nodemaker
self.data = "testdata " * 100000 # about 900 KiB; MDMF
- self.small_data = "test data" * 10 # about 90 B; SDMF
+ self.small_data = "test data" * 10 # 90 B; SDMF
def do_upload_sdmf(self):