]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
#538: fetch version and attach to the rref. Make IntroducerClient demand v1 support.
authorBrian Warner <warner@allmydata.com>
Sat, 22 Nov 2008 03:07:27 +0000 (20:07 -0700)
committerBrian Warner <warner@allmydata.com>
Sat, 22 Nov 2008 03:07:27 +0000 (20:07 -0700)
src/allmydata/immutable/upload.py
src/allmydata/interfaces.py
src/allmydata/introducer/client.py
src/allmydata/test/test_helper.py
src/allmydata/test/test_introducer.py
src/allmydata/util/rrefutil.py [new file with mode: 0644]

index 8c219935f3c97815ad94ba56ff169c7efe0b001c..6ebcdc689db9f6025e562b0ae7f979b82e713856 100644 (file)
@@ -16,6 +16,7 @@ from allmydata import storage, hashtree, uri
 from allmydata.immutable import encode
 from allmydata.util import base32, idlib, mathutil
 from allmydata.util.assertutil import precondition
+from allmydata.util.rrefutil import get_versioned_remote_reference
 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, NotEnoughSharesError
 from allmydata.immutable import layout
@@ -1216,8 +1217,18 @@ class Uploader(service.MultiService):
                                       self._got_helper)
 
     def _got_helper(self, helper):
+        log.msg("got helper connection, getting versions")
+        default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
+                    { },
+                    "application-version": "unknown: no get_version()",
+                    }
+        d = get_versioned_remote_reference(helper, default)
+        d.addCallback(self._got_versioned_helper)
+
+    def _got_versioned_helper(self, helper):
         self._helper = helper
         helper.notifyOnDisconnect(self._lost_helper)
+
     def _lost_helper(self):
         self._helper = None
 
@@ -1225,6 +1236,7 @@ class Uploader(service.MultiService):
         # return a tuple of (helper_furl_or_None, connected_bool)
         return (self._helper_furl, bool(self._helper))
 
+
     def upload(self, uploadable):
         # this returns the URI
         assert self.parent
index bcfbf626fcb83005356add671053b3ef89578171..47f7afbacb54ae91c7e9f44b0f1f2ee939535e10 100644 (file)
@@ -2156,3 +2156,11 @@ class RIKeyGenerator(RemoteInterface):
 class FileTooLargeError(Exception):
     pass
 
+
+class InsufficientVersionError(Exception):
+    def __init__(self, needed, got):
+        self.needed = needed
+        self.got = got
+    def __repr__(self):
+        return "InsufficientVersionError(need '%s', got %s)" % (self.needed,
+                                                                self.got)
index 085e0ccce16948a5482989f1c5da5345031ccfd0..8b59c9cca5b1313941740872d1db6eee3e311997 100644 (file)
@@ -4,9 +4,11 @@ from base64 import b32decode
 from zope.interface import implements
 from twisted.application import service
 from foolscap import Referenceable
+from allmydata.interfaces import InsufficientVersionError
 from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
      IIntroducerClient
 from allmydata.util import log, idlib
+from allmydata.util.rrefutil import get_versioned_remote_reference
 from allmydata.introducer.common import make_index
 
 
@@ -28,6 +30,14 @@ class RemoteServiceConnector:
     @ivar remote_host: the IAddress, if connected, otherwise None
     """
 
+    VERSION_DEFAULTS = {
+        "storage": { "http://allmydata.org/tahoe/protocols/storage/v1" :
+                     { "maximum-immutable-share-size": 2**32 },
+                     "application-version": "unknown: no get_version()",
+                     },
+        "stub_client": { },
+        }
+
     def __init__(self, announcement, tub, ic):
         self._tub = tub
         self._announcement = announcement
@@ -62,11 +72,19 @@ class RemoteServiceConnector:
         self._reconnector.stopConnecting()
 
     def _got_service(self, rref):
+        self.log("got connection to %s, getting versions" % self._nodeid_s)
+
+        default = self.VERSION_DEFAULTS.get(self.service_name, {})
+        d = get_versioned_remote_reference(rref, default)
+        d.addCallback(self._got_versioned_service)
+
+    def _got_versioned_service(self, rref):
+        self.log("connected to %s, version %s" % (self._nodeid_s, rref.version))
+
         self.last_connect_time = time.time()
-        self.remote_host = rref.tracker.broker.transport.getPeer()
+        self.remote_host = rref.rref.tracker.broker.transport.getPeer()
 
         self.rref = rref
-        self.log("connected to %s" % self._nodeid_s)
 
         self._ic.add_connection(self._nodeid, self.service_name, rref)
 
@@ -79,6 +97,7 @@ class RemoteServiceConnector:
         self.remote_host = None
         self._ic.remove_connection(self._nodeid, self.service_name, rref)
 
+
     def reset(self):
         self._reconnector.reset()
 
@@ -119,6 +138,7 @@ class IntroducerClient(service.Service, Referenceable):
 
     def startService(self):
         service.Service.startService(self)
+        self._introducer_error = None
         rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
         self._introducer_reconnector = rc
         def connect_failed(failure):
@@ -128,7 +148,25 @@ class IntroducerClient(service.Service, Referenceable):
         d.addErrback(connect_failed)
 
     def _got_introducer(self, publisher):
-        self.log("connected to introducer")
+        self.log("connected to introducer, getting versions")
+        default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
+                    { },
+                    "application-version": "unknown: no get_version()",
+                    }
+        d = get_versioned_remote_reference(publisher, default)
+        d.addCallback(self._got_versioned_introducer)
+        d.addErrback(self._got_error)
+
+    def _got_error(self, f):
+        # TODO: for the introducer, perhaps this should halt the application
+        self._introducer_error = f # polled by tests
+
+    def _got_versioned_introducer(self, publisher):
+        self.log("got introducer version: %s" % (publisher.version,))
+        # we require a V1 introducer
+        needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
+        if needed not in publisher.version:
+            raise InsufficientVersionError(needed, publisher.version)
         self._connected = True
         self._publisher = publisher
         publisher.notifyOnDisconnect(self._disconnected)
@@ -258,7 +296,7 @@ class IntroducerClient(service.Service, Referenceable):
                           if c[1] == service_name])
 
     def get_permuted_peers(self, service_name, key):
-        """Return an ordered list of (peerid, rref) tuples."""
+        """Return an ordered list of (peerid, versioned-rref) tuples."""
 
         results = []
         for (c_peerid, c_service_name, rref) in self._connections:
index 2f61eab260b19946163d975d8c12098e344fac25..6568d07845348193fc6f6e12b85b78dd125aa1b8 100644 (file)
@@ -74,6 +74,15 @@ def flush_but_dont_ignore(res):
     d.addCallback(_done)
     return d
 
+def wait_a_few_turns(ignored=None):
+    d = eventual.fireEventually()
+    d.addCallback(eventual.fireEventually)
+    d.addCallback(eventual.fireEventually)
+    d.addCallback(eventual.fireEventually)
+    d.addCallback(eventual.fireEventually)
+    d.addCallback(eventual.fireEventually)
+    return d
+
 def upload_data(uploader, data, convergence):
     u = upload.Data(data, convergence=convergence)
     return uploader.upload(u)
@@ -110,10 +119,7 @@ class AssistedUpload(unittest.TestCase):
         u = upload.Uploader(self.helper_furl)
         u.setServiceParent(self.s)
 
-        # wait a few turns
-        d = eventual.fireEventually()
-        d.addCallback(eventual.fireEventually)
-        d.addCallback(eventual.fireEventually)
+        d = wait_a_few_turns()
 
         def _ready(res):
             assert u._helper
@@ -164,10 +170,7 @@ class AssistedUpload(unittest.TestCase):
         u = upload.Uploader(self.helper_furl)
         u.setServiceParent(self.s)
 
-        # wait a few turns
-        d = eventual.fireEventually()
-        d.addCallback(eventual.fireEventually)
-        d.addCallback(eventual.fireEventually)
+        d = wait_a_few_turns()
 
         def _ready(res):
             assert u._helper
@@ -194,10 +197,7 @@ class AssistedUpload(unittest.TestCase):
         u = upload.Uploader(self.helper_furl)
         u.setServiceParent(self.s)
 
-        # wait a few turns
-        d = eventual.fireEventually()
-        d.addCallback(eventual.fireEventually)
-        d.addCallback(eventual.fireEventually)
+        d = wait_a_few_turns()
 
         def _ready(res):
             assert u._helper
index 104e0cbd5a35cfbd0c10e0e94d97494538fe0ec7..760c4e0d5354ce783b55698f799e3445cf688b20 100644 (file)
@@ -9,6 +9,7 @@ from twisted.python import log
 from foolscap import Tub, Referenceable
 from foolscap.eventual import fireEventually, flushEventualQueue
 from twisted.application import service
+from allmydata.interfaces import InsufficientVersionError
 from allmydata.introducer.client import IntroducerClient
 from allmydata.introducer.server import IntroducerService
 # test compatibility with old introducer .tac files
@@ -230,3 +231,44 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
                 self.failIf(c.connected_to_introducer())
         d.addCallback(_check4)
         return d
+
+class TooNewServer(IntroducerService):
+    VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
+                 { },
+                "application-version": "greetings from the crazy future",
+                }
+
+class NonV1Server(SystemTestMixin, unittest.TestCase):
+    # if the 1.3.0 client connects to a server that doesn't provide the 'v1'
+    # protocol, it is supposed to provide a useful error instead of a weird
+    # exception.
+
+    def test_failure(self):
+        i = TooNewServer()
+        i.setServiceParent(self.parent)
+        self.introducer_furl = self.central_tub.registerReference(i)
+
+        tub = Tub()
+        tub.setServiceParent(self.parent)
+        l = tub.listenOn("tcp:0")
+        portnum = l.getPortnum()
+        tub.setLocation("localhost:%d" % portnum)
+
+        n = FakeNode()
+        c = IntroducerClient(tub, self.introducer_furl,
+                             "nickname-client", "version", "oldest")
+        c.subscribe_to("storage")
+
+        c.setServiceParent(self.parent)
+
+        # now we wait for it to connect and notice the bad version
+
+        def _got_bad():
+            return bool(c._introducer_error) or bool(c._publisher)
+        d = self.poll(_got_bad)
+        def _done(res):
+            self.failUnless(c._introducer_error)
+            self.failUnless(c._introducer_error.check(InsufficientVersionError))
+        d.addCallback(_done)
+        return d
+
diff --git a/src/allmydata/util/rrefutil.py b/src/allmydata/util/rrefutil.py
new file mode 100644 (file)
index 0000000..9a567eb
--- /dev/null
@@ -0,0 +1,32 @@
+
+from foolscap.tokens import Violation
+
+class VersionedRemoteReference:
+    """I wrap a RemoteReference, and add a .version attribute."""
+
+    def __init__(self, original, version):
+        self.rref = original
+        self.version = version
+
+    def callRemote(self, *args, **kwargs):
+        return self.rref.callRemote(*args, **kwargs)
+
+    def callRemoteOnly(self, *args, **kwargs):
+        return self.rref.callRemoteOnly(*args, **kwargs)
+
+    def notifyOnDisconnect(self, *args, **kwargs):
+        return self.rref.notifyOnDisconnect(*args, **kwargs)
+
+
+def get_versioned_remote_reference(rref, default):
+    """I return a Deferred that fires with a VersionedRemoteReference"""
+    d = rref.callRemote("get_version")
+    def _no_get_version(f):
+        f.trap(Violation, AttributeError)
+        return default
+    d.addErrback(_no_get_version)
+    def _got_version(version):
+        return VersionedRemoteReference(rref, version)
+    d.addCallback(_got_version)
+    return d
+