import random, weakref, itertools, time
from zope.interface import implements
-from twisted.internet import defer
+from twisted.internet import defer, reactor
from twisted.internet.interfaces import IPushProducer, IConsumer
from foolscap.api import DeadReferenceError, RemoteException, eventually
# first step: who should we download from?
d = defer.maybeDeferred(self._get_all_shareholders)
- d.addCallback(self._got_all_shareholders)
+ d.addBoth(self._got_all_shareholders)
# now get the uri_extension block from somebody and integrity check
# it and parse and validate its contents
d.addCallback(self._obtain_uri_extension)
""" Once the number of buckets that I know about is >= K then I
callback the Deferred that I return.
- If all of the get_buckets deferreds have fired (whether callback or
- errback) and I still don't have enough buckets then I'll callback the
- Deferred that I return.
+ If all of the get_buckets deferreds have fired (whether callback
+ or errback) and I still don't have enough buckets then I'll also
+ callback -- not errback -- the Deferred that I return.
"""
- self._wait_for_enough_buckets_d = defer.Deferred()
+ wait_for_enough_buckets_d = defer.Deferred()
+ self._wait_for_enough_buckets_d = wait_for_enough_buckets_d
- self._queries_sent = 0
- self._responses_received = 0
- self._queries_failed = 0
sb = self._storage_broker
servers = sb.get_servers_for_index(self._storage_index)
if not servers:
raise NoServersError("broker gave us no servers!")
+
+ self._total_queries = len(servers)
+ self._responses_received = 0
+ self._queries_failed = 0
for (peerid,ss) in servers:
self.log(format="sending DYHB to [%(peerid)s]",
peerid=idlib.shortnodeid_b2a(peerid),
level=log.NOISY, umid="rT03hg")
- self._queries_sent += 1
d = ss.callRemote("get_buckets", self._storage_index)
d.addCallbacks(self._got_response, self._got_error,
callbackArgs=(peerid,))
+ d.addBoth(self._check_got_all_responses)
+
if self._status:
self._status.set_status("Locating Shares (%d/%d)" %
- (self._responses_received,
- self._queries_sent))
- return self._wait_for_enough_buckets_d
+ (len(self._share_buckets),
+ self._verifycap.needed_shares))
+ return wait_for_enough_buckets_d
+
+ def _check_got_all_responses(self, ignored=None):
+ assert (self._responses_received+self._queries_failed) <= self._total_queries
+ if self._wait_for_enough_buckets_d and (self._responses_received+self._queries_failed) == self._total_queries:
+ reactor.callLater(0, self._wait_for_enough_buckets_d.callback, False)
+ self._wait_for_enough_buckets_d = None
def _got_response(self, buckets, peerid):
+ self._responses_received += 1
self.log(format="got results from [%(peerid)s]: shnums %(shnums)s",
peerid=idlib.shortnodeid_b2a(peerid),
shnums=sorted(buckets.keys()),
level=log.NOISY, umid="o4uwFg")
- self._responses_received += 1
if self._results:
elapsed = time.time() - self._started
self._results.timings["servers_peer_selection"][peerid] = elapsed
if self._status:
self._status.set_status("Locating Shares (%d/%d)" %
(self._responses_received,
- self._queries_sent))
+ self._total_queries))
for sharenum, bucket in buckets.iteritems():
b = layout.ReadBucketProxy(bucket, peerid, self._storage_index)
self.add_share_bucket(sharenum, b)
# deferred. Then remove it from self so that we don't fire it
# again.
if self._wait_for_enough_buckets_d and len(self._share_buckets) >= self._verifycap.needed_shares:
- self._wait_for_enough_buckets_d.callback(True)
- self._wait_for_enough_buckets_d = None
-
- # Else, if we ran out of outstanding requests then fire it and
- # remove it from self.
- assert (self._responses_received+self._queries_failed) <= self._queries_sent
- if self._wait_for_enough_buckets_d and (self._responses_received+self._queries_failed) == self._queries_sent:
- self._wait_for_enough_buckets_d.callback(False)
+ reactor.callLater(0, self._wait_for_enough_buckets_d.callback, True)
self._wait_for_enough_buckets_d = None
if self._results:
self._share_buckets.setdefault(sharenum, []).append(bucket)
def _got_error(self, f):
+ self._queries_failed += 1
level = log.WEIRD
if f.check(DeadReferenceError):
level = log.UNUSUAL
self.log("Error during get_buckets", failure=f, level=level,
umid="3uuBUQ")
- # If we ran out of outstanding requests then errback it and remove it
- # from self.
- self._queries_failed += 1
- assert (self._responses_received+self._queries_failed) <= self._queries_sent
- if self._wait_for_enough_buckets_d and self._responses_received == self._queries_sent:
- self._wait_for_enough_buckets_d.errback()
- self._wait_for_enough_buckets_d = None
def bucket_failed(self, vbucket):
shnum = vbucket.sharenum
import os.path
from zope.interface import implements
from twisted.application import service
-from twisted.internet import reactor
+from twisted.internet import defer, reactor
from twisted.python.failure import Failure
from foolscap.api import Referenceable, fireEventually, RemoteException
from base64 import b32encode
def __init__(self, original):
self.original = original
self.broken = False
+ self.hung_until = None
self.post_call_notifier = None
self.disconnectors = {}
return a
args = tuple([wrap(a) for a in args])
kwargs = dict([(k,wrap(kwargs[k])) for k in kwargs])
+
+ def _really_call():
+ meth = getattr(self.original, "remote_" + methname)
+ return meth(*args, **kwargs)
+
def _call():
if self.broken:
raise IntentionalError("I was asked to break")
- meth = getattr(self.original, "remote_" + methname)
- return meth(*args, **kwargs)
+ if self.hung_until:
+ d2 = defer.Deferred()
+ self.hung_until.addCallback(lambda ign: _really_call())
+ self.hung_until.addCallback(lambda res: d2.callback(res))
+ def _err(res):
+ d2.errback(res)
+ return res
+ self.hung_until.addErrback(_err)
+ return d2
+ return _really_call()
+
d = fireEventually()
d.addCallback(lambda res: _call())
def _wrap_exception(f):
# asked to hold a share
self.servers_by_id[serverid].broken = True
+ def hang_server(self, serverid, until=defer.Deferred()):
+ # hang the given server until 'until' fires
+ self.servers_by_id[serverid].hung_until = until
+
+
class GridTestMixin:
def setUp(self):
self.s = service.MultiService()
--- /dev/null
+
+import os, shutil
+from twisted.trial import unittest
+from twisted.internet import defer, reactor
+from twisted.python import failure
+from allmydata import uri
+from allmydata.util.consumer import download_to_data
+from allmydata.immutable import upload
+from allmydata.storage.common import storage_index_to_dir
+from allmydata.test.no_network import GridTestMixin
+from allmydata.test.common import ShouldFailMixin
+from allmydata.interfaces import NotEnoughSharesError
+
+immutable_plaintext = "data" * 10000
+mutable_plaintext = "muta" * 10000
+
+class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase):
+ timeout = 30
+
+ def _break(self, servers):
+ for (id, ss) in servers:
+ self.g.break_server(id)
+
+ def _hang(self, servers, **kwargs):
+ for (id, ss) in servers:
+ self.g.hang_server(id, **kwargs)
+
+ def _delete_all_shares_from(self, servers):
+ serverids = [id for (id, ss) in servers]
+ for (i_shnum, i_serverid, i_sharefile) in self.shares:
+ if i_serverid in serverids:
+ os.unlink(i_sharefile)
+
+ # untested
+ def _pick_a_share_from(self, server):
+ (id, ss) = server
+ for (i_shnum, i_serverid, i_sharefile) in self.shares:
+ if i_serverid == id:
+ return (i_shnum, i_sharefile)
+ raise AssertionError("server %r had no shares" % server)
+
+ # untested
+ def _copy_all_shares_from(self, from_servers, to_server):
+ serverids = [id for (id, ss) in from_servers]
+ for (i_shnum, i_serverid, i_sharefile) in self.shares:
+ if i_serverid in serverids:
+ self._copy_share((i_shnum, i_sharefile), to_server)
+\r
+ # untested
+ def _copy_share(self, share, to_server):\r
+ (sharenum, sharefile) = share\r
+ (id, ss) = to_server\r
+ # FIXME: this doesn't work because we only have a LocalWrapper\r
+ shares_dir = os.path.join(ss.storedir, "shares")\r
+ si = uri.from_string(self.uri).get_storage_index()\r
+ si_dir = os.path.join(shares_dir, storage_index_to_dir(si))\r
+ if not os.path.exists(si_dir):\r
+ os.makedirs(si_dir)\r
+ new_sharefile = os.path.join(si_dir, str(sharenum))\r
+ shutil.copy(sharefile, new_sharefile)\r
+ self.shares = self.find_shares(self.uri)\r
+ # Make sure that the storage server has the share.\r
+ self.failUnless((sharenum, ss.my_nodeid, new_sharefile)\r
+ in self.shares)\r
+\r
+ # untested
+ def _add_server(self, server_number, readonly=False):\r
+ ss = self.g.make_server(server_number, readonly)\r
+ self.g.add_server(server_number, ss)\r
+ self.shares = self.find_shares(self.uri)\r
+
+ def _set_up(self, testdir, num_clients=1, num_servers=10):
+ self.basedir = "download/" + testdir
+ self.set_up_grid(num_clients=num_clients, num_servers=num_servers)
+\r
+ self.c0 = self.g.clients[0]\r
+ sb = self.c0.nodemaker.storage_broker
+ self.servers = [(id, ss) for (id, ss) in sb.get_all_servers()]
+\r
+ data = upload.Data(immutable_plaintext, convergence="")\r
+ d = self.c0.upload(data)\r
+ def _uploaded(ur):\r
+ self.uri = ur.uri\r
+ self.shares = self.find_shares(self.uri)\r
+ d.addCallback(_uploaded)\r
+ return d\r
+
+ def test_10_good_sanity_check(self):
+ d = self._set_up("test_10_good_sanity_check")
+ d.addCallback(lambda ign: self.download_immutable())
+ return d
+
+ def test_3_good_7_hung(self):
+ d = self._set_up("test_3_good_7_hung")
+ d.addCallback(lambda ign: self._hang(self.servers[3:]))\r
+ d.addCallback(lambda ign: self.download_immutable())
+ return d
+
+ def test_3_good_7_noshares(self):
+ d = self._set_up("test_3_good_7_noshares")
+ d.addCallback(lambda ign: self._delete_all_shares_from(self.servers[3:]))\r
+ d.addCallback(lambda ign: self.download_immutable())
+ return d
+
+ def test_2_good_8_broken_fail(self):
+ d = self._set_up("test_2_good_8_broken_fail")
+ d.addCallback(lambda ign: self._break(self.servers[2:]))\r
+ d.addCallback(lambda ign:
+ self.shouldFail(NotEnoughSharesError, "test_2_good_8_broken_fail",
+ "Failed to get enough shareholders: have 2, need 3",
+ self.download_immutable))
+ return d
+
+ def test_2_good_8_noshares_fail(self):
+ d = self._set_up("test_2_good_8_noshares_fail")
+ d.addCallback(lambda ign: self._delete_all_shares_from(self.servers[2:]))\r
+ d.addCallback(lambda ign:
+ self.shouldFail(NotEnoughSharesError, "test_2_good_8_noshares_fail",
+ "Failed to get enough shareholders: have 2, need 3",
+ self.download_immutable))
+ return d
+
+ def test_2_good_8_hung_then_1_recovers(self):
+ recovered = defer.Deferred()
+ d = self._set_up("test_2_good_8_hung_then_1_recovers")
+ d.addCallback(lambda ign: self._hang(self.servers[2:3], until=recovered))\r
+ d.addCallback(lambda ign: self._hang(self.servers[3:]))\r
+ d.addCallback(lambda ign: self.download_immutable())
+ reactor.callLater(5, recovered.callback, None)
+ return d
+
+ def test_2_good_8_hung_then_1_recovers_with_2_shares(self):
+ recovered = defer.Deferred()
+ d = self._set_up("test_2_good_8_hung_then_1_recovers_with_2_shares")
+ d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[0:1], self.servers[2]))\r
+ d.addCallback(lambda ign: self._hang(self.servers[2:3], until=recovered))\r
+ d.addCallback(lambda ign: self._hang(self.servers[3:]))\r
+ d.addCallback(lambda ign: self.download_immutable())
+ reactor.callLater(5, recovered.callback, None)
+ return d
+
+ def download_immutable(self):
+ n = self.c0.create_node_from_uri(self.uri)
+ d = download_to_data(n)
+ def _got_data(data):
+ self.failUnlessEqual(data, immutable_plaintext)
+ d.addCallback(_got_data)
+ return d
+
+ # unused
+ def download_mutable(self):
+ n = self.c0.create_node_from_uri(self.uri)
+ d = n.download_best_version()
+ def _got_data(data):
+ self.failUnlessEqual(data, mutable_plaintext)
+ d.addCallback(_got_data)
+ return d