From: Brian Warner Date: Sat, 22 Nov 2008 03:07:27 +0000 (-0700) Subject: #538: fetch version and attach to the rref. Make IntroducerClient demand v1 support. X-Git-Url: https://git.rkrishnan.org/simplejson/components/%22doc.html/index.php?a=commitdiff_plain;h=bf06492a90a3cc496d188017f4a364679d1259fb;p=tahoe-lafs%2Ftahoe-lafs.git #538: fetch version and attach to the rref. Make IntroducerClient demand v1 support. --- diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index 8c219935..6ebcdc68 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -16,6 +16,7 @@ from allmydata import storage, hashtree, uri from allmydata.immutable import encode from allmydata.util import base32, idlib, mathutil from allmydata.util.assertutil import precondition +from allmydata.util.rrefutil import get_versioned_remote_reference from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \ IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, NotEnoughSharesError from allmydata.immutable import layout @@ -1216,8 +1217,18 @@ class Uploader(service.MultiService): self._got_helper) def _got_helper(self, helper): + log.msg("got helper connection, getting versions") + default = { "http://allmydata.org/tahoe/protocols/helper/v1" : + { }, + "application-version": "unknown: no get_version()", + } + d = get_versioned_remote_reference(helper, default) + d.addCallback(self._got_versioned_helper) + + def _got_versioned_helper(self, helper): self._helper = helper helper.notifyOnDisconnect(self._lost_helper) + def _lost_helper(self): self._helper = None @@ -1225,6 +1236,7 @@ class Uploader(service.MultiService): # return a tuple of (helper_furl_or_None, connected_bool) return (self._helper_furl, bool(self._helper)) + def upload(self, uploadable): # this returns the URI assert self.parent diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index bcfbf626..47f7afba 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -2156,3 +2156,11 @@ class RIKeyGenerator(RemoteInterface): class FileTooLargeError(Exception): pass + +class InsufficientVersionError(Exception): + def __init__(self, needed, got): + self.needed = needed + self.got = got + def __repr__(self): + return "InsufficientVersionError(need '%s', got %s)" % (self.needed, + self.got) diff --git a/src/allmydata/introducer/client.py b/src/allmydata/introducer/client.py index 085e0ccc..8b59c9cc 100644 --- a/src/allmydata/introducer/client.py +++ b/src/allmydata/introducer/client.py @@ -4,9 +4,11 @@ from base64 import b32decode from zope.interface import implements from twisted.application import service from foolscap import Referenceable +from allmydata.interfaces import InsufficientVersionError from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \ IIntroducerClient from allmydata.util import log, idlib +from allmydata.util.rrefutil import get_versioned_remote_reference from allmydata.introducer.common import make_index @@ -28,6 +30,14 @@ class RemoteServiceConnector: @ivar remote_host: the IAddress, if connected, otherwise None """ + VERSION_DEFAULTS = { + "storage": { "http://allmydata.org/tahoe/protocols/storage/v1" : + { "maximum-immutable-share-size": 2**32 }, + "application-version": "unknown: no get_version()", + }, + "stub_client": { }, + } + def __init__(self, announcement, tub, ic): self._tub = tub self._announcement = announcement @@ -62,11 +72,19 @@ class RemoteServiceConnector: self._reconnector.stopConnecting() def _got_service(self, rref): + self.log("got connection to %s, getting versions" % self._nodeid_s) + + default = self.VERSION_DEFAULTS.get(self.service_name, {}) + d = get_versioned_remote_reference(rref, default) + d.addCallback(self._got_versioned_service) + + def _got_versioned_service(self, rref): + self.log("connected to %s, version %s" % (self._nodeid_s, rref.version)) + self.last_connect_time = time.time() - self.remote_host = rref.tracker.broker.transport.getPeer() + self.remote_host = rref.rref.tracker.broker.transport.getPeer() self.rref = rref - self.log("connected to %s" % self._nodeid_s) self._ic.add_connection(self._nodeid, self.service_name, rref) @@ -79,6 +97,7 @@ class RemoteServiceConnector: self.remote_host = None self._ic.remove_connection(self._nodeid, self.service_name, rref) + def reset(self): self._reconnector.reset() @@ -119,6 +138,7 @@ class IntroducerClient(service.Service, Referenceable): def startService(self): service.Service.startService(self) + self._introducer_error = None rc = self._tub.connectTo(self.introducer_furl, self._got_introducer) self._introducer_reconnector = rc def connect_failed(failure): @@ -128,7 +148,25 @@ class IntroducerClient(service.Service, Referenceable): d.addErrback(connect_failed) def _got_introducer(self, publisher): - self.log("connected to introducer") + self.log("connected to introducer, getting versions") + default = { "http://allmydata.org/tahoe/protocols/introducer/v1": + { }, + "application-version": "unknown: no get_version()", + } + d = get_versioned_remote_reference(publisher, default) + d.addCallback(self._got_versioned_introducer) + d.addErrback(self._got_error) + + def _got_error(self, f): + # TODO: for the introducer, perhaps this should halt the application + self._introducer_error = f # polled by tests + + def _got_versioned_introducer(self, publisher): + self.log("got introducer version: %s" % (publisher.version,)) + # we require a V1 introducer + needed = "http://allmydata.org/tahoe/protocols/introducer/v1" + if needed not in publisher.version: + raise InsufficientVersionError(needed, publisher.version) self._connected = True self._publisher = publisher publisher.notifyOnDisconnect(self._disconnected) @@ -258,7 +296,7 @@ class IntroducerClient(service.Service, Referenceable): if c[1] == service_name]) def get_permuted_peers(self, service_name, key): - """Return an ordered list of (peerid, rref) tuples.""" + """Return an ordered list of (peerid, versioned-rref) tuples.""" results = [] for (c_peerid, c_service_name, rref) in self._connections: diff --git a/src/allmydata/test/test_helper.py b/src/allmydata/test/test_helper.py index 2f61eab2..6568d078 100644 --- a/src/allmydata/test/test_helper.py +++ b/src/allmydata/test/test_helper.py @@ -74,6 +74,15 @@ def flush_but_dont_ignore(res): d.addCallback(_done) return d +def wait_a_few_turns(ignored=None): + d = eventual.fireEventually() + d.addCallback(eventual.fireEventually) + d.addCallback(eventual.fireEventually) + d.addCallback(eventual.fireEventually) + d.addCallback(eventual.fireEventually) + d.addCallback(eventual.fireEventually) + return d + def upload_data(uploader, data, convergence): u = upload.Data(data, convergence=convergence) return uploader.upload(u) @@ -110,10 +119,7 @@ class AssistedUpload(unittest.TestCase): u = upload.Uploader(self.helper_furl) u.setServiceParent(self.s) - # wait a few turns - d = eventual.fireEventually() - d.addCallback(eventual.fireEventually) - d.addCallback(eventual.fireEventually) + d = wait_a_few_turns() def _ready(res): assert u._helper @@ -164,10 +170,7 @@ class AssistedUpload(unittest.TestCase): u = upload.Uploader(self.helper_furl) u.setServiceParent(self.s) - # wait a few turns - d = eventual.fireEventually() - d.addCallback(eventual.fireEventually) - d.addCallback(eventual.fireEventually) + d = wait_a_few_turns() def _ready(res): assert u._helper @@ -194,10 +197,7 @@ class AssistedUpload(unittest.TestCase): u = upload.Uploader(self.helper_furl) u.setServiceParent(self.s) - # wait a few turns - d = eventual.fireEventually() - d.addCallback(eventual.fireEventually) - d.addCallback(eventual.fireEventually) + d = wait_a_few_turns() def _ready(res): assert u._helper diff --git a/src/allmydata/test/test_introducer.py b/src/allmydata/test/test_introducer.py index 104e0cbd..760c4e0d 100644 --- a/src/allmydata/test/test_introducer.py +++ b/src/allmydata/test/test_introducer.py @@ -9,6 +9,7 @@ from twisted.python import log from foolscap import Tub, Referenceable from foolscap.eventual import fireEventually, flushEventualQueue from twisted.application import service +from allmydata.interfaces import InsufficientVersionError from allmydata.introducer.client import IntroducerClient from allmydata.introducer.server import IntroducerService # test compatibility with old introducer .tac files @@ -230,3 +231,44 @@ class SystemTest(SystemTestMixin, unittest.TestCase): self.failIf(c.connected_to_introducer()) d.addCallback(_check4) return d + +class TooNewServer(IntroducerService): + VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999": + { }, + "application-version": "greetings from the crazy future", + } + +class NonV1Server(SystemTestMixin, unittest.TestCase): + # if the 1.3.0 client connects to a server that doesn't provide the 'v1' + # protocol, it is supposed to provide a useful error instead of a weird + # exception. + + def test_failure(self): + i = TooNewServer() + i.setServiceParent(self.parent) + self.introducer_furl = self.central_tub.registerReference(i) + + tub = Tub() + tub.setServiceParent(self.parent) + l = tub.listenOn("tcp:0") + portnum = l.getPortnum() + tub.setLocation("localhost:%d" % portnum) + + n = FakeNode() + c = IntroducerClient(tub, self.introducer_furl, + "nickname-client", "version", "oldest") + c.subscribe_to("storage") + + c.setServiceParent(self.parent) + + # now we wait for it to connect and notice the bad version + + def _got_bad(): + return bool(c._introducer_error) or bool(c._publisher) + d = self.poll(_got_bad) + def _done(res): + self.failUnless(c._introducer_error) + self.failUnless(c._introducer_error.check(InsufficientVersionError)) + d.addCallback(_done) + return d + diff --git a/src/allmydata/util/rrefutil.py b/src/allmydata/util/rrefutil.py new file mode 100644 index 00000000..9a567ebb --- /dev/null +++ b/src/allmydata/util/rrefutil.py @@ -0,0 +1,32 @@ + +from foolscap.tokens import Violation + +class VersionedRemoteReference: + """I wrap a RemoteReference, and add a .version attribute.""" + + def __init__(self, original, version): + self.rref = original + self.version = version + + def callRemote(self, *args, **kwargs): + return self.rref.callRemote(*args, **kwargs) + + def callRemoteOnly(self, *args, **kwargs): + return self.rref.callRemoteOnly(*args, **kwargs) + + def notifyOnDisconnect(self, *args, **kwargs): + return self.rref.notifyOnDisconnect(*args, **kwargs) + + +def get_versioned_remote_reference(rref, default): + """I return a Deferred that fires with a VersionedRemoteReference""" + d = rref.callRemote("get_version") + def _no_get_version(f): + f.trap(Violation, AttributeError) + return default + d.addErrback(_no_get_version) + def _got_version(version): + return VersionedRemoteReference(rref, version) + d.addCallback(_got_version) + return d +