import os, re, base64
from cStringIO import StringIO
+
from twisted.trial import unittest
from twisted.internet import defer, reactor
+
from allmydata import uri, client
from allmydata.nodemaker import NodeMaker
from allmydata.util import base32, consumer, fileutil, mathutil
+from allmydata.util.fileutil import abspath_expanduser_unicode
from allmydata.util.hashutil import tagged_hash, ssk_writekey_hash, \
ssk_pubkey_fingerprint_hash
from allmydata.util.consumer import MemoryConsumer
from allmydata.scripts import debug
from allmydata.mutable.filenode import MutableFileNode, BackoffAgent
-from allmydata.mutable.common import ResponseCache, \
+from allmydata.mutable.common import \
MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
NeedMoreDataError, UnrecoverableFileError, UncoordinatedWriteError, \
NotEnoughServersError, CorruptShareError
dumped = servermap.dump(StringIO())
self.failUnlessIn("3-of-10", dumped.getvalue())
d.addCallback(_then)
- # Now overwrite the contents with some new contents. We want
+ # Now overwrite the contents with some new contents. We want
# to make them big enough to force the file to be uploaded
# in more than one segment.
big_contents = "contents1" * 100000 # about 900 KiB
# before, they need to be big enough to force multiple
# segments, so that we make the downloader deal with
# multiple segments.
- bigger_contents = "contents2" * 1000000 # about 9MiB
+ bigger_contents = "contents2" * 1000000 # about 9MiB
bigger_contents_uploadable = MutableData(bigger_contents)
d.addCallback(lambda ignored:
n.overwrite(bigger_contents_uploadable))
d.addCallback(_created)
return d
-
- def test_response_cache_memory_leak(self):
- d = self.nodemaker.create_mutable_file("contents")
- def _created(n):
- d = n.download_best_version()
- d.addCallback(lambda res: self.failUnlessEqual(res, "contents"))
- d.addCallback(lambda ign: self.failUnless(isinstance(n._cache, ResponseCache)))
-
- def _check_cache(expected):
- # The total size of cache entries should not increase on the second download;
- # in fact the cache contents should be identical.
- d2 = n.download_best_version()
- d2.addCallback(lambda rep: self.failUnlessEqual(repr(n._cache.cache), expected))
- return d2
- d.addCallback(lambda ign: _check_cache(repr(n._cache.cache)))
- return d
- d.addCallback(_created)
- return d
-
def test_create_with_initial_contents_function(self):
data = "initial contents"
def _make_contents(n):
d.addCallback(_remove_shares)
return d
+ def test_all_but_two_shares_vanished_updated_servermap(self):
+ # tests error reporting for ticket #1742
+ d = self.make_servermap()
+ def _remove_shares(servermap):
+ self._version = servermap.best_recoverable_version()
+ for shares in self._storage._peers.values()[2:]:
+ shares.clear()
+ return self.make_servermap(servermap)
+ d.addCallback(_remove_shares)
+ def _check(updated_servermap):
+ d1 = self.shouldFail(NotEnoughSharesError,
+ "test_all_but_two_shares_vanished_updated_servermap",
+ "ran out of servers",
+ self.do_download, updated_servermap, version=self._version)
+ return d1
+ d.addCallback(_check)
+ return d
+
def test_no_servers(self):
sb2 = make_storagebroker(num_peers=0)
# if there are no servers, then a MODE_READ servermap should come
def test_corrupt_all_encprivkey_late(self):
- # this should work for the same reason as above, but we corrupt
+ # this should work for the same reason as above, but we corrupt
# after the servermap update to exercise the error handling
# code.
# We need to remove the privkey from the node, or the retrieve
corrupt_early=False,
failure_checker=_check)
- def test_corrupt_all_block_hash_tree_late(self):
- def _check(res):
- f = res[0]
- self.failUnless(f.check(NotEnoughSharesError))
- return self._test_corrupt_all("block_hash_tree",
- "block hash tree failure",
- corrupt_early=False,
- failure_checker=_check)
-
def test_corrupt_all_block_late(self):
def _check(res):
d.addCallback(lambda ignored:
self._test_corrupt_all(("block_hash_tree", 12 * 32),
"block hash tree failure",
- corrupt_early=False,
+ corrupt_early=True,
should_succeed=False))
return d
def test_corrupt_mdmf_block_hash_tree_late(self):
+ # Note - there is no SDMF counterpart to this test, as the SDMF
+ # files are guaranteed to have exactly one block, and therefore
+ # the block hash tree fits within the initial read (#1240).
d = self.publish_mdmf()
d.addCallback(lambda ignored:
self._test_corrupt_all(("block_hash_tree", 12 * 32),
"block hash tree failure",
- corrupt_early=True,
+ corrupt_early=False,
should_succeed=False))
return d
self.failUnlessEqual(old_shares, current_shares)
- def test_unrepairable_0shares(self):
- d = self.publish_one()
- def _delete_all_shares(ign):
+ def _test_whether_repairable(self, publisher, nshares, expected_result):
+ d = publisher()
+ def _delete_some_shares(ign):
shares = self._storage._peers
for peerid in shares:
- shares[peerid] = {}
- d.addCallback(_delete_all_shares)
+ for shnum in list(shares[peerid]):
+ if shnum >= nshares:
+ del shares[peerid][shnum]
+ d.addCallback(_delete_some_shares)
d.addCallback(lambda ign: self._fn.check(Monitor()))
- d.addCallback(lambda check_results: self._fn.repair(check_results))
- def _check(crr):
- self.failUnlessEqual(crr.get_successful(), False)
+ def _check(cr):
+ self.failIf(cr.is_healthy())
+ self.failUnlessEqual(cr.is_recoverable(), expected_result)
+ return cr
d.addCallback(_check)
- return d
-
- def test_mdmf_unrepairable_0shares(self):
- d = self.publish_mdmf()
- def _delete_all_shares(ign):
- shares = self._storage._peers
- for peerid in shares:
- shares[peerid] = {}
- d.addCallback(_delete_all_shares)
- d.addCallback(lambda ign: self._fn.check(Monitor()))
d.addCallback(lambda check_results: self._fn.repair(check_results))
- d.addCallback(lambda crr: self.failIf(crr.get_successful()))
+ d.addCallback(lambda crr: self.failUnlessEqual(crr.get_successful(), expected_result))
return d
+ def test_unrepairable_0shares(self):
+ return self._test_whether_repairable(self.publish_one, 0, False)
+
+ def test_mdmf_unrepairable_0shares(self):
+ return self._test_whether_repairable(self.publish_mdmf, 0, False)
def test_unrepairable_1share(self):
- d = self.publish_one()
- def _delete_all_shares(ign):
- shares = self._storage._peers
- for peerid in shares:
- for shnum in list(shares[peerid]):
- if shnum > 0:
- del shares[peerid][shnum]
- d.addCallback(_delete_all_shares)
- d.addCallback(lambda ign: self._fn.check(Monitor()))
- d.addCallback(lambda check_results: self._fn.repair(check_results))
- def _check(crr):
- self.failUnlessEqual(crr.get_successful(), False)
- d.addCallback(_check)
- return d
+ return self._test_whether_repairable(self.publish_one, 1, False)
def test_mdmf_unrepairable_1share(self):
- d = self.publish_mdmf()
- def _delete_all_shares(ign):
- shares = self._storage._peers
- for peerid in shares:
- for shnum in list(shares[peerid]):
- if shnum > 0:
- del shares[peerid][shnum]
- d.addCallback(_delete_all_shares)
- d.addCallback(lambda ign: self._fn.check(Monitor()))
- d.addCallback(lambda check_results: self._fn.repair(check_results))
- def _check(crr):
- self.failUnlessEqual(crr.get_successful(), False)
- d.addCallback(_check)
- return d
+ return self._test_whether_repairable(self.publish_mdmf, 1, False)
def test_repairable_5shares(self):
- d = self.publish_mdmf()
- def _delete_all_shares(ign):
- shares = self._storage._peers
- for peerid in shares:
- for shnum in list(shares[peerid]):
- if shnum > 4:
- del shares[peerid][shnum]
- d.addCallback(_delete_all_shares)
- d.addCallback(lambda ign: self._fn.check(Monitor()))
- d.addCallback(lambda check_results: self._fn.repair(check_results))
- def _check(crr):
- self.failUnlessEqual(crr.get_successful(), True)
- d.addCallback(_check)
- return d
+ return self._test_whether_repairable(self.publish_one, 5, True)
def test_mdmf_repairable_5shares(self):
- d = self.publish_mdmf()
+ return self._test_whether_repairable(self.publish_mdmf, 5, True)
+
+ def _test_whether_checkandrepairable(self, publisher, nshares, expected_result):
+ """
+ Like the _test_whether_repairable tests, but invoking check_and_repair
+ instead of invoking check and then invoking repair.
+ """
+ d = publisher()
def _delete_some_shares(ign):
shares = self._storage._peers
for peerid in shares:
for shnum in list(shares[peerid]):
- if shnum > 5:
+ if shnum >= nshares:
del shares[peerid][shnum]
d.addCallback(_delete_some_shares)
- d.addCallback(lambda ign: self._fn.check(Monitor()))
- def _check(cr):
- self.failIf(cr.is_healthy())
- self.failUnless(cr.is_recoverable())
- return cr
- d.addCallback(_check)
- d.addCallback(lambda check_results: self._fn.repair(check_results))
- def _check1(crr):
- self.failUnlessEqual(crr.get_successful(), True)
- d.addCallback(_check1)
+ d.addCallback(lambda ign: self._fn.check_and_repair(Monitor()))
+ d.addCallback(lambda crr: self.failUnlessEqual(crr.get_repair_successful(), expected_result))
return d
+ def test_unrepairable_0shares_checkandrepair(self):
+ return self._test_whether_checkandrepairable(self.publish_one, 0, False)
+
+ def test_mdmf_unrepairable_0shares_checkandrepair(self):
+ return self._test_whether_checkandrepairable(self.publish_mdmf, 0, False)
+
+ def test_unrepairable_1share_checkandrepair(self):
+ return self._test_whether_checkandrepairable(self.publish_one, 1, False)
+
+ def test_mdmf_unrepairable_1share_checkandrepair(self):
+ return self._test_whether_checkandrepairable(self.publish_mdmf, 1, False)
+
+ def test_repairable_5shares_checkandrepair(self):
+ return self._test_whether_checkandrepairable(self.publish_one, 5, True)
+
+ def test_mdmf_repairable_5shares_checkandrepair(self):
+ return self._test_whether_checkandrepairable(self.publish_mdmf, 5, True)
+
def test_merge(self):
self.old_shares = []
# then mix up the shares, to make sure that download survives seeing
# a variety of encodings. This is actually kind of tricky to set up.
- contents1 = "Contents for encoding 1 (3-of-10) go here"
- contents2 = "Contents for encoding 2 (4-of-9) go here"
- contents3 = "Contents for encoding 3 (4-of-7) go here"
+ contents1 = "Contents for encoding 1 (3-of-10) go here"*1000
+ contents2 = "Contents for encoding 2 (4-of-9) go here"*1000
+ contents3 = "Contents for encoding 3 (4-of-7) go here"*1000
# we make a retrieval object that doesn't know what encoding
# parameters to use
return d
-class Utils(unittest.TestCase):
- def test_cache(self):
- c = ResponseCache()
- # xdata = base62.b2a(os.urandom(100))[:100]
- xdata = "1Ex4mdMaDyOl9YnGBM3I4xaBF97j8OQAg1K3RBR01F2PwTP4HohB3XpACuku8Xj4aTQjqJIR1f36mEj3BCNjXaJmPBEZnnHL0U9l"
- ydata = "4DCUQXvkEPnnr9Lufikq5t21JsnzZKhzxKBhLhrBB6iIcBOWRuT4UweDhjuKJUre8A4wOObJnl3Kiqmlj4vjSLSqUGAkUD87Y3vs"
- c.add("v1", 1, 0, xdata)
- c.add("v1", 1, 2000, ydata)
- self.failUnlessEqual(c.read("v2", 1, 10, 11), None)
- self.failUnlessEqual(c.read("v1", 2, 10, 11), None)
- self.failUnlessEqual(c.read("v1", 1, 0, 10), xdata[:10])
- self.failUnlessEqual(c.read("v1", 1, 90, 10), xdata[90:])
- self.failUnlessEqual(c.read("v1", 1, 300, 10), None)
- self.failUnlessEqual(c.read("v1", 1, 2050, 5), ydata[50:55])
- self.failUnlessEqual(c.read("v1", 1, 0, 101), None)
- self.failUnlessEqual(c.read("v1", 1, 99, 1), xdata[99:100])
- self.failUnlessEqual(c.read("v1", 1, 100, 1), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 9), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 10), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 11), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 15), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 19), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 20), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 21), None)
- self.failUnlessEqual(c.read("v1", 1, 1990, 25), None)
- self.failUnlessEqual(c.read("v1", 1, 1999, 25), None)
-
- # test joining fragments
- c = ResponseCache()
- c.add("v1", 1, 0, xdata[:10])
- c.add("v1", 1, 10, xdata[10:20])
- self.failUnlessEqual(c.read("v1", 1, 0, 20), xdata[:20])
-
class Exceptions(unittest.TestCase):
def test_repr(self):
nmde = NeedMoreDataError(100, 50, 100)
ucwe = UncoordinatedWriteError()
self.failUnless("UncoordinatedWriteError" in repr(ucwe), repr(ucwe))
+
class SameKeyGenerator:
def __init__(self, pubkey, privkey):
self.pubkey = pubkey
self.basedir = "mutable/Problems/test_retrieve_surprise"
self.set_up_grid()
nm = self.g.clients[0].nodemaker
- d = nm.create_mutable_file(MutableData("contents 1"))
+ d = nm.create_mutable_file(MutableData("contents 1"*4000))
def _created(n):
d = defer.succeed(None)
d.addCallback(lambda res: n.get_servermap(MODE_READ))
# now attempt to retrieve the old version with the old servermap.
# This will look like someone has changed the file since we
# updated the servermap.
- d.addCallback(lambda res: n._cache._clear())
d.addCallback(lambda res: log.msg("starting doomed read"))
d.addCallback(lambda res:
self.shouldFail(NotEnoughSharesError,
fso = debug.FindSharesOptions()
storage_index = base32.b2a(n.get_storage_index())
fso.si_s = storage_index
- fso.nodedirs = [unicode(os.path.dirname(os.path.abspath(storedir)))
+ fso.nodedirs = [os.path.dirname(abspath_expanduser_unicode(unicode(storedir)))
for (i,ss,storedir)
in self.iterate_servers()]
fso.stdout = StringIO()
def setUp(self):
GridTestMixin.setUp(self)
self.basedir = self.mktemp()
- self.set_up_grid()
+ self.set_up_grid(num_servers=13)
self.c = self.g.clients[0]
self.nm = self.c.nodemaker
self.data = "testdata " * 100000 # about 900 KiB; MDMF