-import os, struct
+import struct
from cStringIO import StringIO
from twisted.trial import unittest
from twisted.internet import defer, reactor
-from twisted.python import failure
-from allmydata import uri
-from allmydata.storage.server import StorageServer
+from allmydata import uri, client
+from allmydata.nodemaker import NodeMaker
from allmydata.immutable import download
-from allmydata.util import base32, idlib
+from allmydata.util import base32
from allmydata.util.idlib import shortnodeid_b2a
-from allmydata.util.hashutil import tagged_hash
-from allmydata.util.fileutil import make_dirs
-from allmydata.interfaces import IURI, IMutableFileURI, IUploadable, \
- NotEnoughSharesError, IRepairResults, ICheckAndRepairResults
+from allmydata.util.hashutil import tagged_hash, ssk_writekey_hash, \
+ ssk_pubkey_fingerprint_hash
+from allmydata.interfaces import IRepairResults, ICheckAndRepairResults, \
+ NotEnoughSharesError
from allmydata.monitor import Monitor
from allmydata.test.common import ShouldFailMixin
+from allmydata.test.no_network import GridTestMixin
from foolscap.api import eventually, fireEventually
from foolscap.logging import log
from allmydata.storage_client import StorageFarmBroker
import common_util as testutil
-# this "FastMutableFileNode" exists solely to speed up tests by using smaller
-# public/private keys. Once we switch to fast DSA-based keys, we can get rid
-# of this.
-
-class FastMutableFileNode(MutableFileNode):
- SIGNATURE_KEY_SIZE = 522
-
# this "FakeStorage" exists to put the share data in RAM and avoid using real
# network connections, both to speed up the tests and to reduce the amount of
# non-mutable.py code being exercised.
self._sequence = None
self._pending = {}
self._pending_timer = None
- self._special_answers = {}
def read(self, peerid, storage_index):
shares = self._peers.get(peerid, {})
- if self._special_answers.get(peerid, []):
- mode = self._special_answers[peerid].pop(0)
- if mode == "fail":
- shares = failure.Failure(IntentionalError())
- elif mode == "none":
- shares = {}
- elif mode == "normal":
- pass
if self._sequence is None:
return defer.succeed(shares)
d = defer.Deferred()
return fireEventually(answer)
-# our "FakeClient" has just enough functionality of the real Client to let
-# the tests run.
-
-class FakeClient:
- mutable_file_node_class = FastMutableFileNode
-
- def __init__(self, num_peers=10):
- self._storage = FakeStorage()
- self._num_peers = num_peers
- peerids = [tagged_hash("peerid", "%d" % i)[:20]
- for i in range(self._num_peers)]
- self.nodeid = "fakenodeid"
- self.storage_broker = StorageFarmBroker(None, True)
- for peerid in peerids:
- fss = FakeStorageServer(peerid, self._storage)
- self.storage_broker.test_add_server(peerid, fss)
-
- def get_storage_broker(self):
- return self.storage_broker
- def debug_break_connection(self, peerid):
- self.storage_broker.test_servers[peerid].broken = True
- def debug_remove_connection(self, peerid):
- self.storage_broker.test_servers.pop(peerid)
- def debug_get_connection(self, peerid):
- return self.storage_broker.test_servers[peerid]
-
- def get_encoding_parameters(self):
- return {"k": 3, "n": 10}
-
- def log(self, msg, **kw):
- return log.msg(msg, **kw)
-
- def get_renewal_secret(self):
- return "I hereby permit you to renew my files"
- def get_cancel_secret(self):
- return "I hereby permit you to cancel my leases"
-
- def create_mutable_file(self, contents=""):
- n = self.mutable_file_node_class(self)
- d = n.create(contents)
- d.addCallback(lambda res: n)
- return d
-
- def get_history(self):
- return None
-
- def create_node_from_uri(self, u, readcap=None):
- if not u:
- u = readcap
- u = IURI(u)
- assert IMutableFileURI.providedBy(u), u
- res = self.mutable_file_node_class(self).init_from_uri(u)
- return res
-
- def upload(self, uploadable):
- assert IUploadable.providedBy(uploadable)
- d = uploadable.get_size()
- d.addCallback(lambda length: uploadable.read(length))
- #d.addCallback(self.create_mutable_file)
- def _got_data(datav):
- data = "".join(datav)
- #newnode = FastMutableFileNode(self)
- return uri.LiteralFileURI(data)
- d.addCallback(_got_data)
- return d
-
-
def flip_bit(original, byte_offset):
return (original[:byte_offset] +
chr(ord(original[byte_offset]) ^ 0x01) +
shares[shnum] = flip_bit(data, real_offset)
return res
+def make_storagebroker(s=None, num_peers=10):
+ if not s:
+ s = FakeStorage()
+ peerids = [tagged_hash("peerid", "%d" % i)[:20]
+ for i in range(num_peers)]
+ storage_broker = StorageFarmBroker(None, True)
+ for peerid in peerids:
+ fss = FakeStorageServer(peerid, s)
+ storage_broker.test_add_server(peerid, fss)
+ return storage_broker
+
+def make_nodemaker(s=None, num_peers=10):
+ storage_broker = make_storagebroker(s, num_peers)
+ sh = client.SecretHolder("lease secret")
+ keygen = client.KeyGenerator()
+ keygen.set_default_keysize(522)
+ nodemaker = NodeMaker(storage_broker, sh, None,
+ None, None, None,
+ {"k": 3, "n": 10}, keygen)
+ return nodemaker
+
class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
# this used to be in Publish, but we removed the limit. Some of
# these tests test whether the new code correctly allows files
# larger than the limit.
OLD_MAX_SEGMENT_SIZE = 3500000
def setUp(self):
- self.client = FakeClient()
+ self._storage = s = FakeStorage()
+ self.nodemaker = make_nodemaker(s)
def test_create(self):
- d = self.client.create_mutable_file()
+ d = self.nodemaker.create_mutable_file()
def _created(n):
- self.failUnless(isinstance(n, FastMutableFileNode))
+ self.failUnless(isinstance(n, MutableFileNode))
self.failUnlessEqual(n.get_storage_index(), n._storage_index)
- sb = self.client.get_storage_broker()
+ sb = self.nodemaker.storage_broker
peer0 = sorted(sb.get_all_serverids())[0]
- shnums = self.client._storage._peers[peer0].keys()
+ shnums = self._storage._peers[peer0].keys()
self.failUnlessEqual(len(shnums), 1)
d.addCallback(_created)
return d
def test_serialize(self):
- n = MutableFileNode(self.client)
+ n = MutableFileNode(None, None, {"k": 3, "n": 10}, None)
calls = []
def _callback(*args, **kwargs):
self.failUnlessEqual(args, (4,) )
return d
def test_upload_and_download(self):
- d = self.client.create_mutable_file()
+ d = self.nodemaker.create_mutable_file()
def _created(n):
d = defer.succeed(None)
d.addCallback(lambda res: n.get_servermap(MODE_READ))
return d
def test_create_with_initial_contents(self):
- d = self.client.create_mutable_file("contents 1")
+ d = self.nodemaker.create_mutable_file("contents 1")
def _created(n):
d = n.download_best_version()
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
def test_create_with_too_large_contents(self):
BIG = "a" * (self.OLD_MAX_SEGMENT_SIZE + 1)
- d = self.client.create_mutable_file(BIG)
+ d = self.nodemaker.create_mutable_file(BIG)
def _created(n):
d = n.overwrite(BIG)
return d
raise UncoordinatedWriteError("simulated")
return old_contents
- d = self.client.create_mutable_file("line1")
+ d = self.nodemaker.create_mutable_file("line1")
def _created(n):
d = n.modify(_modifier)
d.addCallback(lambda res: n.download_best_version())
giveuper._delay = 0.1
giveuper.factor = 1
- d = self.client.create_mutable_file("line1")
+ d = self.nodemaker.create_mutable_file("line1")
def _created(n):
d = n.modify(_modifier)
d.addCallback(lambda res: n.download_best_version())
return d
def test_upload_and_download_full_size_keys(self):
- self.client.mutable_file_node_class = MutableFileNode
- d = self.client.create_mutable_file()
+ self.nodemaker.key_generator = client.KeyGenerator()
+ d = self.nodemaker.create_mutable_file()
def _created(n):
d = defer.succeed(None)
d.addCallback(lambda res: n.get_servermap(MODE_READ))
class MakeShares(unittest.TestCase):
def test_encrypt(self):
- c = FakeClient()
- fn = FastMutableFileNode(c)
+ nm = make_nodemaker()
CONTENTS = "some initial contents"
- d = fn.create(CONTENTS)
- def _created(res):
- p = Publish(fn, None)
+ d = nm.create_mutable_file(CONTENTS)
+ def _created(fn):
+ p = Publish(fn, nm.storage_broker, None)
p.salt = "SALT" * 4
p.readkey = "\x00" * 16
p.newdata = CONTENTS
return d
def test_generate(self):
- c = FakeClient()
- fn = FastMutableFileNode(c)
+ nm = make_nodemaker()
CONTENTS = "some initial contents"
- d = fn.create(CONTENTS)
- def _created(res):
- p = Publish(fn, None)
+ d = nm.create_mutable_file(CONTENTS)
+ def _created(fn):
+ self._fn = fn
+ p = Publish(fn, nm.storage_broker, None)
self._p = p
p.newdata = CONTENTS
p.required_shares = 3
self.failUnlessEqual(len(block_hash_tree), 1) # very small tree
self.failUnlessEqual(IV, "SALT"*4)
self.failUnlessEqual(len(share_data), len("%07d" % 1))
- self.failUnlessEqual(enc_privkey, fn.get_encprivkey())
+ self.failUnlessEqual(enc_privkey, self._fn.get_encprivkey())
d.addCallback(_generated)
return d
# later.
self.CONTENTS = "New contents go here" * 1000
num_peers = 20
- self._client = FakeClient(num_peers)
- self._storage = self._client._storage
- d = self._client.create_mutable_file(self.CONTENTS)
+ self._storage = FakeStorage()
+ self._nodemaker = make_nodemaker(self._storage)
+ self._storage_broker = self._nodemaker.storage_broker
+ d = self._nodemaker.create_mutable_file(self.CONTENTS)
def _created(node):
self._fn = node
- self._fn2 = self._client.create_node_from_uri(node.get_uri())
+ self._fn2 = self._nodemaker.create_from_cap(node.get_uri())
d.addCallback(_created)
return d
+
def publish_multiple(self):
self.CONTENTS = ["Contents 0",
"Contents 1",
"Contents 3b"]
self._copied_shares = {}
num_peers = 20
- self._client = FakeClient(num_peers)
- self._storage = self._client._storage
- d = self._client.create_mutable_file(self.CONTENTS[0]) # seqnum=1
+ self._storage = FakeStorage()
+ self._nodemaker = make_nodemaker(self._storage)
+ d = self._nodemaker.create_mutable_file(self.CONTENTS[0]) # seqnum=1
def _created(node):
self._fn = node
# now create multiple versions of the same file, and accumulate
return d
def _copy_shares(self, ignored, index):
- shares = self._client._storage._peers
+ shares = self._storage._peers
# we need a deep copy
new_shares = {}
for peerid in shares:
# versionmap maps shnums to which version (0,1,2,3,4) we want the
# share to be at. Any shnum which is left out of the map will stay at
# its current version.
- shares = self._client._storage._peers
+ shares = self._storage._peers
oldshares = self._copied_shares
for peerid in shares:
for shnum in shares[peerid]:
def setUp(self):
return self.publish_one()
- def make_servermap(self, mode=MODE_CHECK, fn=None):
+ def make_servermap(self, mode=MODE_CHECK, fn=None, sb=None):
if fn is None:
fn = self._fn
- smu = ServermapUpdater(fn, Monitor(), ServerMap(), mode)
+ if sb is None:
+ sb = self._storage_broker
+ smu = ServermapUpdater(fn, sb, Monitor(),
+ ServerMap(), mode)
d = smu.update()
return d
def update_servermap(self, oldmap, mode=MODE_CHECK):
- smu = ServermapUpdater(self._fn, Monitor(), oldmap, mode)
+ smu = ServermapUpdater(self._fn, self._storage_broker, Monitor(),
+ oldmap, mode)
d = smu.update()
return d
# create a new file, which is large enough to knock the privkey out
# of the early part of the file
LARGE = "These are Larger contents" * 200 # about 5KB
- d.addCallback(lambda res: self._client.create_mutable_file(LARGE))
+ d.addCallback(lambda res: self._nodemaker.create_mutable_file(LARGE))
def _created(large_fn):
- large_fn2 = self._client.create_node_from_uri(large_fn.get_uri())
+ large_fn2 = self._nodemaker.create_from_cap(large_fn.get_uri())
return self.make_servermap(MODE_WRITE, large_fn2)
d.addCallback(_created)
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
self.failUnlessEqual(len(sm.shares_available()), 0)
def test_no_shares(self):
- self._client._storage._peers = {} # delete all shares
+ self._storage._peers = {} # delete all shares
ms = self.make_servermap
d = defer.succeed(None)
return sm
def test_not_quite_enough_shares(self):
- s = self._client._storage
+ s = self._storage
ms = self.make_servermap
num_shares = len(s._peers)
for peerid in s._peers:
def setUp(self):
return self.publish_one()
- def make_servermap(self, mode=MODE_READ, oldmap=None):
+ def make_servermap(self, mode=MODE_READ, oldmap=None, sb=None):
if oldmap is None:
oldmap = ServerMap()
- smu = ServermapUpdater(self._fn, Monitor(), oldmap, mode)
+ if sb is None:
+ sb = self._storage_broker
+ smu = ServermapUpdater(self._fn, sb, Monitor(), oldmap, mode)
d = smu.update()
return d
return d
def test_no_servers(self):
- c2 = FakeClient(0)
- self._fn._client = c2
+ sb2 = make_storagebroker(num_peers=0)
# if there are no servers, then a MODE_READ servermap should come
# back empty
- d = self.make_servermap()
+ d = self.make_servermap(sb=sb2)
def _check_servermap(servermap):
self.failUnlessEqual(servermap.best_recoverable_version(), None)
self.failIf(servermap.recoverable_versions())
test_no_servers.timeout = 15
def test_no_servers_download(self):
- c2 = FakeClient(0)
- self._fn._client = c2
+ sb2 = make_storagebroker(num_peers=0)
+ self._fn._storage_broker = sb2
d = self.shouldFail(UnrecoverableFileError,
"test_no_servers_download",
"no recoverable versions",
# anybody should not prevent a subsequent download from working.
# This isn't quite the webapi-driven test that #463 wants, but it
# should be close enough.
- self._fn._client = self._client
+ self._fn._storage_broker = self._storage_broker
return self._fn.download_best_version()
def _retrieved(new_contents):
self.failUnlessEqual(new_contents, self.CONTENTS)
d.addCallback(_check_results)
return d
+class DevNullDictionary(dict):
+ def __setitem__(self, key, value):
+ return
+
class MultipleEncodings(unittest.TestCase):
def setUp(self):
self.CONTENTS = "New contents go here"
- num_peers = 20
- self._client = FakeClient(num_peers)
- self._storage = self._client._storage
- d = self._client.create_mutable_file(self.CONTENTS)
+ self._storage = FakeStorage()
+ self._nodemaker = make_nodemaker(self._storage, num_peers=20)
+ self._storage_broker = self._nodemaker.storage_broker
+ d = self._nodemaker.create_mutable_file(self.CONTENTS)
def _created(node):
self._fn = node
d.addCallback(_created)
def _encode(self, k, n, data):
# encode 'data' into a peerid->shares dict.
- fn2 = FastMutableFileNode(self._client)
- # init_from_uri populates _uri, _writekey, _readkey, _storage_index,
- # and _fingerprint
fn = self._fn
- fn2.init_from_uri(fn.get_uri())
+ # disable the nodecache, since for these tests we explicitly need
+ # multiple nodes pointing at the same file
+ self._nodemaker._node_cache = DevNullDictionary()
+ fn2 = self._nodemaker.create_from_cap(fn.get_uri())
# then we copy over other fields that are normally fetched from the
# existing shares
fn2._pubkey = fn._pubkey
fn2._required_shares = k
fn2._total_shares = n
- s = self._client._storage
+ s = self._storage
s._peers = {} # clear existing storage
- p2 = Publish(fn2, None)
+ p2 = Publish(fn2, self._storage_broker, None)
d = p2.publish(data)
def _published(res):
shares = s._peers
def make_servermap(self, mode=MODE_READ, oldmap=None):
if oldmap is None:
oldmap = ServerMap()
- smu = ServermapUpdater(self._fn, Monitor(), oldmap, mode)
+ smu = ServermapUpdater(self._fn, self._storage_broker, Monitor(),
+ oldmap, mode)
d = smu.update()
return d
# we make a retrieval object that doesn't know what encoding
# parameters to use
- fn3 = FastMutableFileNode(self._client)
- fn3.init_from_uri(self._fn.get_uri())
+ fn3 = self._nodemaker.create_from_cap(self._fn.get_uri())
# now we upload a file through fn1, and grab its shares
d = self._encode(3, 10, contents1)
places = [2, 2, 3, 2, 1, 1, 1, 2]
sharemap = {}
- sb = self._client.get_storage_broker()
+ sb = self._storage_broker
for peerid in sorted(sb.get_all_serverids()):
peerid_s = shortnodeid_b2a(peerid)
which = places[shnum]
else:
which = "x"
- self._client._storage._peers[peerid] = peers = {}
+ self._storage._peers[peerid] = peers = {}
in_1 = shnum in self._shares1[peerid]
in_2 = shnum in self._shares2.get(peerid, {})
in_3 = shnum in self._shares3.get(peerid, {})
# now sort the sequence so that share 0 is returned first
new_sequence = [sharemap[shnum]
for shnum in sorted(sharemap.keys())]
- self._client._storage._sequence = new_sequence
+ self._storage._sequence = new_sequence
log.msg("merge done")
d.addCallback(_merge)
d.addCallback(lambda res: fn3.download_best_version())
ucwe = UncoordinatedWriteError()
self.failUnless("UncoordinatedWriteError" in repr(ucwe), repr(ucwe))
-# we can't do this test with a FakeClient, since it uses FakeStorageServer
-# instances which always succeed. So we need a less-fake one.
-
-class IntentionalError(Exception):
- pass
-
-class LocalWrapper:
- def __init__(self, original):
- self.original = original
- self.broken = False
- self.post_call_notifier = None
- def callRemote(self, methname, *args, **kwargs):
- def _call():
- if self.broken:
- raise IntentionalError("I was asked to break")
- meth = getattr(self.original, "remote_" + methname)
- return meth(*args, **kwargs)
- d = fireEventually()
- d.addCallback(lambda res: _call())
- if self.post_call_notifier:
- d.addCallback(self.post_call_notifier, methname)
- return d
-
-class LessFakeClient(FakeClient):
-
- def __init__(self, basedir, num_peers=10):
- self._num_peers = num_peers
- peerids = [tagged_hash("peerid", "%d" % i)[:20]
- for i in range(self._num_peers)]
- self.storage_broker = StorageFarmBroker(None, True)
- for peerid in peerids:
- peerdir = os.path.join(basedir, idlib.shortnodeid_b2a(peerid))
- make_dirs(peerdir)
- ss = StorageServer(peerdir, peerid)
- lw = LocalWrapper(ss)
- self.storage_broker.test_add_server(peerid, lw)
- self.nodeid = "fakenodeid"
-
-
-class Problems(unittest.TestCase, testutil.ShouldFailMixin):
+class SameKeyGenerator:
+ def __init__(self, pubkey, privkey):
+ self.pubkey = pubkey
+ self.privkey = privkey
+ def generate(self, keysize=None):
+ return defer.succeed( (self.pubkey, self.privkey) )
+
+class FirstServerGetsKilled:
+ done = False
+ def notify(self, retval, wrapper, methname):
+ if not self.done:
+ wrapper.broken = True
+ self.done = True
+ return retval
+
+class FirstServerGetsDeleted:
+ def __init__(self):
+ self.done = False
+ self.silenced = None
+ def notify(self, retval, wrapper, methname):
+ if not self.done:
+ # this query will work, but later queries should think the share
+ # has been deleted
+ self.done = True
+ self.silenced = wrapper
+ return retval
+ if wrapper == self.silenced:
+ assert methname == "slot_testv_and_readv_and_writev"
+ return (True, {})
+ return retval
+
+class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
def test_publish_surprise(self):
- basedir = os.path.join("mutable/CollidingWrites/test_surprise")
- self.client = LessFakeClient(basedir)
- d = self.client.create_mutable_file("contents 1")
+ self.basedir = "mutable/Problems/test_publish_surprise"
+ self.set_up_grid()
+ nm = self.g.clients[0].nodemaker
+ d = nm.create_mutable_file("contents 1")
def _created(n):
d = defer.succeed(None)
d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
return d
def test_retrieve_surprise(self):
- basedir = os.path.join("mutable/CollidingWrites/test_retrieve")
- self.client = LessFakeClient(basedir)
- d = self.client.create_mutable_file("contents 1")
+ self.basedir = "mutable/Problems/test_retrieve_surprise"
+ self.set_up_grid()
+ nm = self.g.clients[0].nodemaker
+ d = nm.create_mutable_file("contents 1")
def _created(n):
d = defer.succeed(None)
d.addCallback(lambda res: n.get_servermap(MODE_READ))
# upload using the old servermap. The last upload should fail with an
# UncoordinatedWriteError, because of the shares that didn't appear
# in the servermap.
- basedir = os.path.join("mutable/CollidingWrites/test_unexpexted_shares")
- self.client = LessFakeClient(basedir)
- d = self.client.create_mutable_file("contents 1")
+ self.basedir = "mutable/Problems/test_unexpected_shares"
+ self.set_up_grid()
+ nm = self.g.clients[0].nodemaker
+ d = nm.create_mutable_file("contents 1")
def _created(n):
d = defer.succeed(None)
d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
self.old_map = smap
# now shut down one of the servers
peer0 = list(smap.make_sharemap()[0])[0]
- self.client.debug_remove_connection(peer0)
+ self.g.remove_server(peer0)
# then modify the file, leaving the old map untouched
log.msg("starting winning write")
return n.overwrite("contents 2")
# Break one server, then create the file: the initial publish should
# complete with an alternate server. Breaking a second server should
# not prevent an update from succeeding either.
- basedir = os.path.join("mutable/CollidingWrites/test_bad_server")
- self.client = LessFakeClient(basedir, 20)
+ self.basedir = "mutable/Problems/test_bad_server"
+ self.set_up_grid()
+ nm = self.g.clients[0].nodemaker
+
# to make sure that one of the initial peers is broken, we have to
- # get creative. We create the keys, so we can figure out the storage
- # index, but we hold off on doing the initial publish until we've
- # broken the server on which the first share wants to be stored.
- n = FastMutableFileNode(self.client)
- d = defer.succeed(None)
- d.addCallback(n._generate_pubprivkeys, keysize=522)
- d.addCallback(n._generated)
+ # get creative. We create an RSA key and compute its storage-index.
+ # Then we make a KeyGenerator that always returns that one key, and
+ # use it to create the mutable file. This will get easier when we can
+ # use #467 static-server-selection to disable permutation and force
+ # the choice of server for share[0].
+
+ d = nm.key_generator.generate(522)
+ def _got_key( (pubkey, privkey) ):
+ nm.key_generator = SameKeyGenerator(pubkey, privkey)
+ pubkey_s = pubkey.serialize()
+ privkey_s = privkey.serialize()
+ u = uri.WriteableSSKFileURI(ssk_writekey_hash(privkey_s),
+ ssk_pubkey_fingerprint_hash(pubkey_s))
+ self._storage_index = u.storage_index
+ d.addCallback(_got_key)
def _break_peer0(res):
- si = n.get_storage_index()
- peerlist = self.client.storage_broker.get_servers_for_index(si)
+ si = self._storage_index
+ peerlist = nm.storage_broker.get_servers_for_index(si)
peerid0, connection0 = peerlist[0]
peerid1, connection1 = peerlist[1]
connection0.broken = True
self.connection1 = connection1
d.addCallback(_break_peer0)
- # now let the initial publish finally happen
- d.addCallback(lambda res: n._upload("contents 1", None))
+ # now "create" the file, using the pre-established key, and let the
+ # initial publish finally happen
+ d.addCallback(lambda res: nm.create_mutable_file("contents 1"))
# that ought to work
- d.addCallback(lambda res: n.download_best_version())
- d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
- # now break the second peer
- def _break_peer1(res):
- self.connection1.broken = True
- d.addCallback(_break_peer1)
- d.addCallback(lambda res: n.overwrite("contents 2"))
- # that ought to work too
- d.addCallback(lambda res: n.download_best_version())
- d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
- def _explain_error(f):
- print f
- if f.check(NotEnoughServersError):
- print "first_error:", f.value.first_error
- return f
- d.addErrback(_explain_error)
+ def _got_node(n):
+ d = n.download_best_version()
+ d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
+ # now break the second peer
+ def _break_peer1(res):
+ self.connection1.broken = True
+ d.addCallback(_break_peer1)
+ d.addCallback(lambda res: n.overwrite("contents 2"))
+ # that ought to work too
+ d.addCallback(lambda res: n.download_best_version())
+ d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
+ def _explain_error(f):
+ print f
+ if f.check(NotEnoughServersError):
+ print "first_error:", f.value.first_error
+ return f
+ d.addErrback(_explain_error)
+ return d
+ d.addCallback(_got_node)
return d
def test_bad_server_overlap(self):
# Break one server, then create the file: the initial publish should
# complete with an alternate server. Breaking a second server should
# not prevent an update from succeeding either.
- basedir = os.path.join("mutable/CollidingWrites/test_bad_server")
- self.client = LessFakeClient(basedir, 10)
- sb = self.client.get_storage_broker()
+ self.basedir = "mutable/Problems/test_bad_server_overlap"
+ self.set_up_grid()
+ nm = self.g.clients[0].nodemaker
+ sb = nm.storage_broker
- peerids = list(sb.get_all_serverids())
- self.client.debug_break_connection(peerids[0])
+ peerids = [serverid for (serverid,ss) in sb.get_all_servers()]
+ self.g.break_server(peerids[0])
- d = self.client.create_mutable_file("contents 1")
+ d = nm.create_mutable_file("contents 1")
def _created(n):
d = n.download_best_version()
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
# now break one of the remaining servers
def _break_second_server(res):
- self.client.debug_break_connection(peerids[1])
+ self.g.break_server(peerids[1])
d.addCallback(_break_second_server)
d.addCallback(lambda res: n.overwrite("contents 2"))
# that ought to work too
def test_publish_all_servers_bad(self):
# Break all servers: the publish should fail
- basedir = os.path.join("mutable/CollidingWrites/publish_all_servers_bad")
- self.client = LessFakeClient(basedir, 20)
- sb = self.client.get_storage_broker()
- for peerid in sb.get_all_serverids():
- self.client.debug_break_connection(peerid)
+ self.basedir = "mutable/Problems/test_publish_all_servers_bad"
+ self.set_up_grid()
+ nm = self.g.clients[0].nodemaker
+ for (serverid,ss) in nm.storage_broker.get_all_servers():
+ ss.broken = True
+
d = self.shouldFail(NotEnoughServersError,
"test_publish_all_servers_bad",
"Ran out of non-bad servers",
- self.client.create_mutable_file, "contents")
+ nm.create_mutable_file, "contents")
return d
def test_publish_no_servers(self):
# no servers at all: the publish should fail
- basedir = os.path.join("mutable/CollidingWrites/publish_no_servers")
- self.client = LessFakeClient(basedir, 0)
+ self.basedir = "mutable/Problems/test_publish_no_servers"
+ self.set_up_grid(num_servers=0)
+ nm = self.g.clients[0].nodemaker
+
d = self.shouldFail(NotEnoughServersError,
"test_publish_no_servers",
"Ran out of non-bad servers",
- self.client.create_mutable_file, "contents")
+ nm.create_mutable_file, "contents")
return d
test_publish_no_servers.timeout = 30
def test_privkey_query_error(self):
# when a servermap is updated with MODE_WRITE, it tries to get the
# privkey. Something might go wrong during this query attempt.
- self.client = FakeClient(20)
+ # Exercise the code in _privkey_query_failed which tries to handle
+ # such an error.
+ self.basedir = "mutable/Problems/test_privkey_query_error"
+ self.set_up_grid(num_servers=20)
+ nm = self.g.clients[0].nodemaker
+ nm._node_cache = DevNullDictionary() # disable the nodecache
+
# we need some contents that are large enough to push the privkey out
# of the early part of the file
- LARGE = "These are Larger contents" * 200 # about 5KB
- d = self.client.create_mutable_file(LARGE)
+ LARGE = "These are Larger contents" * 2000 # about 50KB
+ d = nm.create_mutable_file(LARGE)
def _created(n):
self.uri = n.get_uri()
- self.n2 = self.client.create_node_from_uri(self.uri)
- # we start by doing a map update to figure out which is the first
- # server.
- return n.get_servermap(MODE_WRITE)
+ self.n2 = nm.create_from_cap(self.uri)
+
+ # When a mapupdate is performed on a node that doesn't yet know
+ # the privkey, a short read is sent to a batch of servers, to get
+ # the verinfo and (hopefully, if the file is short enough) the
+ # encprivkey. Our file is too large to let this first read
+ # contain the encprivkey. Each non-encprivkey-bearing response
+ # that arrives (until the node gets the encprivkey) will trigger
+ # a second read to specifically read the encprivkey.
+ #
+ # So, to exercise this case:
+ # 1. notice which server gets a read() call first
+ # 2. tell that server to start throwing errors
+ killer = FirstServerGetsKilled()
+ for (serverid,ss) in nm.storage_broker.get_all_servers():
+ ss.post_call_notifier = killer.notify
d.addCallback(_created)
- d.addCallback(lambda res: fireEventually(res))
- def _got_smap1(smap):
- peer0 = list(smap.make_sharemap()[0])[0]
- # we tell the server to respond to this peer first, so that it
- # will be asked for the privkey first
- self.client._storage._sequence = [peer0]
- # now we make the peer fail their second query
- self.client._storage._special_answers[peer0] = ["normal", "fail"]
- d.addCallback(_got_smap1)
+
# now we update a servermap from a new node (which doesn't have the
- # privkey yet, forcing it to use a separate privkey query). Each
- # query response will trigger a privkey query, and since we're using
- # _sequence to make the peer0 response come back first, we'll send it
- # a privkey query first, and _sequence will again ensure that the
- # peer0 query will also come back before the others, and then
- # _special_answers will make sure that the query raises an exception.
- # The whole point of these hijinks is to exercise the code in
- # _privkey_query_failed. Note that the map-update will succeed, since
- # we'll just get a copy from one of the other shares.
+ # privkey yet, forcing it to use a separate privkey query). Note that
+ # the map-update will succeed, since we'll just get a copy from one
+ # of the other shares.
d.addCallback(lambda res: self.n2.get_servermap(MODE_WRITE))
- # Using FakeStorage._sequence means there will be read requests still
- # floating around.. wait for them to retire
- def _cancel_timer(res):
- if self.client._storage._pending_timer:
- self.client._storage._pending_timer.cancel()
- return res
- d.addBoth(_cancel_timer)
+
return d
def test_privkey_query_missing(self):
# like test_privkey_query_error, but the shares are deleted by the
# second query, instead of raising an exception.
- self.client = FakeClient(20)
- LARGE = "These are Larger contents" * 200 # about 5KB
- d = self.client.create_mutable_file(LARGE)
+ self.basedir = "mutable/Problems/test_privkey_query_missing"
+ self.set_up_grid(num_servers=20)
+ nm = self.g.clients[0].nodemaker
+ LARGE = "These are Larger contents" * 2000 # about 50KB
+ nm._node_cache = DevNullDictionary() # disable the nodecache
+
+ d = nm.create_mutable_file(LARGE)
def _created(n):
self.uri = n.get_uri()
- self.n2 = self.client.create_node_from_uri(self.uri)
- return n.get_servermap(MODE_WRITE)
+ self.n2 = nm.create_from_cap(self.uri)
+ deleter = FirstServerGetsDeleted()
+ for (serverid,ss) in nm.storage_broker.get_all_servers():
+ ss.post_call_notifier = deleter.notify
d.addCallback(_created)
- d.addCallback(lambda res: fireEventually(res))
- def _got_smap1(smap):
- peer0 = list(smap.make_sharemap()[0])[0]
- self.client._storage._sequence = [peer0]
- self.client._storage._special_answers[peer0] = ["normal", "none"]
- d.addCallback(_got_smap1)
d.addCallback(lambda res: self.n2.get_servermap(MODE_WRITE))
- def _cancel_timer(res):
- if self.client._storage._pending_timer:
- self.client._storage._pending_timer.cancel()
- return res
- d.addBoth(_cancel_timer)
return d