big introducer refactoring: separate publish+subscribe. Addresses #271.
authorBrian Warner <warner@allmydata.com>
Tue, 5 Feb 2008 20:05:13 +0000 (13:05 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 5 Feb 2008 20:05:13 +0000 (13:05 -0700)
16 files changed:
src/allmydata/checker.py
src/allmydata/client.py
src/allmydata/control.py
src/allmydata/download.py
src/allmydata/interfaces.py
src/allmydata/introducer.py
src/allmydata/mutable.py
src/allmydata/offloaded.py
src/allmydata/storage.py
src/allmydata/test/test_client.py
src/allmydata/test/test_helper.py
src/allmydata/test/test_introducer.py
src/allmydata/test/test_mutable.py
src/allmydata/test/test_system.py
src/allmydata/test/test_upload.py
src/allmydata/upload.py

index 66874013734ddc5a83ae72656b0dcd19c99364c3..0f47cad5a30475471ca2487d870c8a0c8132a342 100644 (file)
@@ -30,7 +30,7 @@ class SimpleCHKFileChecker:
         # messages (or if we used promises).
         found = set()
         for (pmpeerid, peerid, connection) in self.peer_getter(storage_index):
-            buckets = connection.get_service("storageserver").get_buckets(si)
+            buckets = connection.get_buckets(si)
             found.update(buckets.keys())
         return len(found)
     '''
@@ -42,10 +42,8 @@ class SimpleCHKFileChecker:
 
     def _get_all_shareholders(self, storage_index):
         dl = []
-        for (pmpeerid, peerid, connection) in self.peer_getter(storage_index):
-            d = connection.callRemote("get_service", "storageserver")
-            d.addCallback(lambda ss: ss.callRemote("get_buckets",
-                                                   storage_index))
+        for (peerid, ss) in self.peer_getter("storage", storage_index):
+            d = ss.callRemote("get_buckets", storage_index)
             d.addCallbacks(self._got_response, self._got_error,
                            callbackArgs=(peerid,))
             dl.append(d)
index 6209abca4e88a45d0c1b972ad68ca469d58b5ab2..3efc6bb10d12eb75c4f0f1b17ebdbcb073efc090 100644 (file)
@@ -1,8 +1,6 @@
 
-import os, sha, stat, time, re
-from foolscap import Referenceable
-from zope.interface import implements
-from allmydata.interfaces import RIClient
+import os, stat, time, re
+from allmydata.interfaces import RIStorageServer
 from allmydata import node
 
 from twisted.internet import reactor
@@ -31,8 +29,7 @@ GiB=1024*MiB
 TiB=1024*GiB
 PiB=1024*TiB
 
-class Client(node.Node, Referenceable, testutil.PollMixin):
-    implements(RIClient)
+class Client(node.Node, testutil.PollMixin):
     PORTNUMFILE = "client.port"
     STOREDIR = 'storage'
     NODETYPE = "client"
@@ -46,17 +43,19 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
     # that we will abort an upload unless we can allocate space for at least
     # this many. 'total' is the total number of shares created by encoding.
     # If everybody has room then this is is how many we will upload.
-    DEFAULT_ENCODING_PARAMETERS = {"k":25,
-                                   "happy": 75,
-                                   "n": 100,
+    DEFAULT_ENCODING_PARAMETERS = {"k": 3,
+                                   "happy": 7,
+                                   "n": 10,
                                    "max_segment_size": 1*MiB,
                                    }
 
     def __init__(self, basedir="."):
         node.Node.__init__(self, basedir)
         self.logSource="Client"
-        self.my_furl = None
-        self.introducer_client = None
+        self.nickname = self.get_config("nickname")
+        if self.nickname is None:
+            self.nickname = "<unspecified>"
+        self.init_introducer_client()
         self.init_stats_provider()
         self.init_lease_secret()
         self.init_storage()
@@ -67,8 +66,6 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
         self.add_service(Checker())
         # ControlServer and Helper are attached after Tub startup
 
-        self.introducer_furl = self.get_config("introducer.furl", required=True)
-
         hotline_file = os.path.join(self.basedir,
                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
         if os.path.exists(hotline_file):
@@ -81,6 +78,17 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
         if webport:
             self.init_web(webport) # strports string
 
+    def init_introducer_client(self):
+        self.introducer_furl = self.get_config("introducer.furl", required=True)
+        ic = IntroducerClient(self.tub, self.introducer_furl,
+                              self.nickname,
+                              str(allmydata.__version__),
+                              str(self.OLDEST_SUPPORTED_VERSION))
+        self.introducer_client = ic
+        ic.setServiceParent(self)
+        # nodes that want to upload and download will need storage servers
+        ic.subscribe_to("storage")
+
     def init_stats_provider(self):
         gatherer_furl = self.get_config('stats_gatherer.furl')
         if gatherer_furl:
@@ -96,6 +104,12 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
         self._lease_secret = idlib.a2b(secret_s)
 
     def init_storage(self):
+        # should we run a storage server (and publish it for others to use)?
+        provide_storage = (self.get_config("no_storage") is None)
+        if not provide_storage:
+            return
+        readonly_storage = (self.get_config("readonly_storage") is not None)
+
         storedir = os.path.join(self.basedir, self.STOREDIR)
         sizelimit = None
 
@@ -115,8 +129,21 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
                               "G": 1000 * 1000 * 1000,
                               }[suffix]
                 sizelimit = int(number) * multiplier
-        no_storage = self.get_config("debug_no_storage") is not None
-        self.add_service(StorageServer(storedir, sizelimit, no_storage, self.stats_provider))
+        discard_storage = self.get_config("debug_discard_storage") is not None
+        ss = StorageServer(storedir, sizelimit,
+                           discard_storage, readonly_storage,
+                           self.stats_provider)
+        self.add_service(ss)
+        d = self.when_tub_ready()
+        # we can't do registerReference until the Tub is ready
+        def _publish(res):
+            furl_file = os.path.join(self.basedir, "private", "storage.furl")
+            furl = self.tub.registerReference(ss, furlFile=furl_file)
+            ri_name = RIStorageServer.__remote_name__
+            self.introducer_client.publish(furl, "storage", ri_name)
+        d.addCallback(_publish)
+        d.addErrback(log.err, facility="tahoe.storage", level=log.BAD)
+
 
     def init_options(self):
         self.push_to_ourselves = None
@@ -148,20 +175,10 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
         self.log("tub_ready")
         node.Node.tub_ready(self)
 
-        furl_file = os.path.join(self.basedir, "myself.furl")
-        self.my_furl = self.tub.registerReference(self, furlFile=furl_file)
-
-        # should we publish ourselves as a server?
-        provide_storage = (self.get_config("no_storage") is None)
-        if provide_storage:
-            my_furl = self.my_furl
-        else:
-            my_furl = None
-
-        ic = IntroducerClient(self.tub, self.introducer_furl, my_furl)
-        self.introducer_client = ic
-        ic.setServiceParent(self)
-
+        # TODO: replace register_control() with an init_control() that
+        # internally uses self.when_tub_ready() to stall registerReference.
+        # Do the same for register_helper(). That will remove the need for
+        # this tub_ready() method.
         self.register_control()
         self.register_helper()
 
@@ -185,43 +202,22 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
         helper_furlfile = os.path.join(self.basedir, "private", "helper.furl")
         self.tub.registerReference(h, furlFile=helper_furlfile)
 
-    def remote_get_versions(self):
-        return str(allmydata.__version__), str(self.OLDEST_SUPPORTED_VERSION)
-
-    def remote_get_service(self, name):
-        if name in ("storageserver",):
-            return self.getServiceNamed(name)
-        raise RuntimeError("I am unwilling to give you service %s" % name)
-
-    def remote_get_nodeid(self):
-        return self.nodeid
-
     def get_all_peerids(self):
-        if not self.introducer_client:
-            return []
         return self.introducer_client.get_all_peerids()
 
-    def get_permuted_peers(self, key, include_myself=True):
+    def get_permuted_peers(self, service_name, key):
         """
-        @return: list of (permuted-peerid, peerid, connection,)
+        @return: list of (peerid, connection,)
         """
-        results = []
-        for peerid, connection in self.introducer_client.get_all_peers():
-            assert isinstance(peerid, str)
-            if not include_myself and peerid == self.nodeid:
-                self.log("get_permuted_peers: removing myself from the list")
-                continue
-            permuted = sha.new(key + peerid).digest()
-            results.append((permuted, peerid, connection))
-        results.sort()
-        return results
+        assert isinstance(service_name, str)
+        assert isinstance(key, str)
+        return self.introducer_client.get_permuted_peers(service_name, key)
 
     def get_push_to_ourselves(self):
         return self.push_to_ourselves
 
     def get_encoding_parameters(self):
-        if not self.introducer_client:
-            return self.DEFAULT_ENCODING_PARAMETERS
+        return self.DEFAULT_ENCODING_PARAMETERS
         p = self.introducer_client.encoding_parameters # a tuple
         # TODO: make the 0.7.1 introducer publish a dict instead of a tuple
         params = {"k": p[0],
index 2428948b92543751cb5ef5725ada8ddac3ee397f..6e7fb91075f149d74a9ae88be4b28faf625925aa 100644 (file)
@@ -69,7 +69,8 @@ class ControlServer(Referenceable, service.Service, testutil.PollMixin):
         # phase to take more than 10 seconds. Expect worst-case latency to be
         # 300ms.
         results = {}
-        everyone = list(self.parent.introducer_client.get_all_peers())
+        conns = self.parent.introducer_client.get_all_connections_for("storage")
+        everyone = [(peerid,rref) for (peerid, service_name, rref) in conns]
         num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
         everyone = everyone * num_pings
         d = self._do_one_ping(None, everyone, results)
@@ -79,7 +80,7 @@ class ControlServer(Referenceable, service.Service, testutil.PollMixin):
             return results
         peerid, connection = everyone_left.pop(0)
         start = time.time()
-        d = connection.callRemote("get_nodeid")
+        d = connection.callRemote("get_versions")
         def _done(ignored):
             stop = time.time()
             elapsed = stop - start
index 28810a5fb2c36aa217d248968654318c89bb18c5..592ae42f32d267b3255aca29940e5a617afe2a95 100644 (file)
@@ -412,17 +412,14 @@ class FileDownloader:
 
     def _get_all_shareholders(self):
         dl = []
-        for (permutedpeerid, peerid, connection) in self._client.get_permuted_peers(self._storage_index):
-            d = connection.callRemote("get_service", "storageserver")
-            d.addCallback(lambda ss: ss.callRemote("get_buckets",
-                                                   self._storage_index))
-            d.addCallbacks(self._got_response, self._got_error,
-                           callbackArgs=(connection,))
+        for (peerid,ss) in self._client.get_permuted_peers("storage",
+                                                           self._storage_index):
+            d = ss.callRemote("get_buckets", self._storage_index)
+            d.addCallbacks(self._got_response, self._got_error)
             dl.append(d)
         return defer.DeferredList(dl)
 
-    def _got_response(self, buckets, connection):
-        _assert(isinstance(buckets, dict), buckets) # soon foolscap will check this for us with its DictOf schema constraint
+    def _got_response(self, buckets):
         for sharenum, bucket in buckets.iteritems():
             b = storage.ReadBucketProxy(bucket)
             self.add_share_bucket(sharenum, b)
index 84a3d5e6daba07e05fd6d239bcd0b1efaeaea87a..b9e50abc1513c0db038ffd40d4f2f6b19fb8295d 100644 (file)
@@ -22,10 +22,26 @@ URIExtensionData = StringConstraint(1000)
 LeaseRenewSecret = Hash # used to protect bucket lease renewal requests
 LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests
 
-
-class RIIntroducerClient(RemoteInterface):
-    def new_peers(furls=SetOf(FURL)):
+# Announcements are (FURL, service_name, remoteinterface_name,
+#                    nickname, my_version, oldest_supported)
+#  the (FURL, service_name, remoteinterface_name) refer to the service being
+#  announced. The (nickname, my_version, oldest_supported) refer to the
+#  client as a whole. The my_version/oldest_supported strings can be parsed
+#  by an allmydata.util.version.Version instance, and then compared. The
+#  first goal is to make sure that nodes are not confused by speaking to an
+#  incompatible peer. The second goal is to enable the development of
+#  backwards-compatibility code.
+
+Announcement = TupleOf(FURL, str, str,
+                       str, str, str)
+
+class RIIntroducerSubscriberClient(RemoteInterface):
+    __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com"
+
+    def announce(announcements=SetOf(Announcement)):
+        """I accept announcements from the publisher."""
         return None
+
     def set_encoding_parameters(parameters=(int, int, int)):
         """Advise the client of the recommended k-of-n encoding parameters
         for this grid. 'parameters' is a tuple of (k, desired, n), where 'n'
@@ -43,28 +59,103 @@ class RIIntroducerClient(RemoteInterface):
         """
         return None
 
-class RIIntroducer(RemoteInterface):
-    def hello(node=RIIntroducerClient, furl=ChoiceOf(FURL, None)):
+# When Foolscap can handle multiple interfaces (Foolscap#17), the
+# full-powered introducer will implement both RIIntroducerPublisher and
+# RIIntroducerSubscriberService. Until then, we define
+# RIIntroducerPublisherAndSubscriberService as a combination of the two, and
+# make everybody use that.
+
+class RIIntroducerPublisher(RemoteInterface):
+    """To publish a service to the world, connect to me and give me your
+    announcement message. I will deliver a copy to all connected subscribers."""
+    __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com"
+
+    def publish(announcement=Announcement):
+        # canary?
         return None
 
-class RIClient(RemoteInterface):
-    def get_versions():
-        """Return a tuple of (my_version, oldest_supported) strings.
+class RIIntroducerSubscriberService(RemoteInterface):
+    __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com"
 
-        Each string can be parsed by an allmydata.util.version.Version
-        instance, and then compared. The first goal is to make sure that
-        nodes are not confused by speaking to an incompatible peer. The
-        second goal is to enable the development of backwards-compatibility
-        code.
+    def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str):
+        """Give me a subscriber reference, and I will call its new_peers()
+        method will any announcements that match the desired service name. I
+        will ignore duplicate subscriptions.
+        """
+        return None
 
-        This method is likely to change in incompatible ways until we get the
-        whole compatibility scheme nailed down.
+class RIIntroducerPublisherAndSubscriberService(RemoteInterface):
+    __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com"
+    def publish(announcement=Announcement):
+        return None
+    def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str):
+        return None
+
+class IIntroducerClient(Interface):
+    """I provide service introduction facilities for a node. I help nodes
+    publish their services to the rest of the world, and I help them learn
+    about services available on other nodes."""
+
+    def publish(furl, service_name, remoteinterface_name):
+        """Once you call this, I will tell the world that the Referenceable
+        available at FURL is available to provide a service named
+        SERVICE_NAME. The precise definition of the service being provided is
+        identified by the Foolscap 'remote interface name' in the last
+        parameter: this is supposed to be a globally-unique string that
+        identifies the RemoteInterface that is implemented."""
+
+    def subscribe_to(service_name):
+        """Call this if you will eventually want to use services with the
+        given SERVICE_NAME. This will prompt me to subscribe to announcements
+        of those services. You can pick up the announcements later by calling
+        get_all_connections_for() or get_permuted_peers().
         """
-        return TupleOf(str, str)
-    def get_service(name=str):
-        return Referenceable
-    def get_nodeid():
-        return Nodeid
+
+    def get_all_connections():
+        """Return a frozenset of (nodeid, service_name, rref) tuples, one for
+        each active connection we've established to a remote service. This is
+        mostly useful for unit tests that need to wait until a certain number
+        of connections have been made."""
+
+    def get_all_connectors():
+        """Return a dict that maps from (nodeid, service_name) to a
+        RemoteServiceConnector instance for all services that we are actively
+        trying to connect to. Each RemoteServiceConnector has the following
+        public attributes::
+
+          announcement_time: when we first heard about this service
+          last_connect_time: when we last established a connection
+          last_loss_time: when we last lost a connection
+
+          version: the peer's version, from the most recent connection
+          oldest_supported: the peer's oldest supported version, same
+
+          rref: the RemoteReference, if connected, otherwise None
+          remote_host: the IAddress, if connected, otherwise None
+
+        This method is intended for monitoring interfaces, such as a web page
+        which describes connecting and connected peers.
+        """
+
+    def get_all_peerids():
+        """Return a frozenset of all peerids to whom we have a connection (to
+        one or more services) established. Mostly useful for unit tests."""
+
+    def get_all_connections_for(service_name):
+        """Return a frozenset of (nodeid, service_name, rref) tuples, one
+        for each active connection that provides the given SERVICE_NAME."""
+
+    def get_permuted_peers(service_name, key):
+        """Returns an ordered list of (peerid, rref) tuples, selecting from
+        the connections that provide SERVICE_NAME, using a hash-based
+        permutation keyed by KEY. This randomizes the service list in a
+        repeatable way, to distribute load over many peers.
+        """
+
+    def connected_to_introducer():
+        """Returns a boolean, True if we are currently connected to the
+        introducer, False if not."""
+
 
 class RIBucketWriter(RemoteInterface):
     def write(offset=int, data=ShareData):
@@ -103,6 +194,21 @@ ReadData = ListOf(ShareData)
 # returns data[offset:offset+length] for each element of TestVector
 
 class RIStorageServer(RemoteInterface):
+    __remote_name__ = "RIStorageServer.tahoe.allmydata.com"
+
+    def get_versions():
+        """Return a tuple of (my_version, oldest_supported) strings.
+        Each string can be parsed by an allmydata.util.version.Version
+        instance, and then compared. The first goal is to make sure that
+        nodes are not confused by speaking to an incompatible peer. The
+        second goal is to enable the development of backwards-compatibility
+        code.
+
+        This method is likely to change in incompatible ways until we get the
+        whole compatibility scheme nailed down.
+        """
+        return TupleOf(str, str)
+
     def allocate_buckets(storage_index=StorageIndex,
                          renew_secret=LeaseRenewSecret,
                          cancel_secret=LeaseCancelSecret,
index c8e08d2567f74fb1eda89fada346864f99890472..33319864ff57281ef05b9f5a1064cf65149bb37c 100644 (file)
@@ -1,14 +1,14 @@
 
-import re
-from base64 import b32encode, b32decode
+import re, time, sha
+from base64 import b32decode
 from zope.interface import implements
 from twisted.application import service
 from twisted.internet import defer
-from twisted.python import log
 from foolscap import Referenceable
 from allmydata import node
-from allmydata.interfaces import RIIntroducer, RIIntroducerClient
-from allmydata.util import observer
+from allmydata.interfaces import RIIntroducerPublisherAndSubscriberService, \
+     RIIntroducerSubscriberClient, IIntroducerClient
+from allmydata.util import observer, log, idlib
 
 class IntroducerNode(node.Node):
     PORTNUMFILE = "introducer.port"
@@ -29,49 +29,65 @@ class IntroducerNode(node.Node):
         self.write_config("introducer.furl", self.introducer_url + "\n")
 
 class IntroducerService(service.MultiService, Referenceable):
-    implements(RIIntroducer)
+    implements(RIIntroducerPublisherAndSubscriberService)
     name = "introducer"
 
     def __init__(self, basedir=".", encoding_parameters=None):
         service.MultiService.__init__(self)
         self.introducer_url = None
-        self.nodes = set()
-        self.furls = set()
+        self._announcements = set()
+        self._subscribers = {}
         self._encoding_parameters = encoding_parameters
 
-    def remote_hello(self, node, furl):
-        log.msg("introducer: new contact at %s, node is %s" % (furl, node))
+    def log(self, *args, **kwargs):
+        if "facility" not in kwargs:
+            kwargs["facility"] = "tahoe.introducer"
+        return log.msg(*args, **kwargs)
+
+    def remote_publish(self, announcement):
+        self.log("introducer: announcement published: %s" % (announcement,) )
+        (furl, service_name, ri_name, nickname, ver, oldest) = announcement
+        if announcement in self._announcements:
+            self.log("but we already knew it, ignoring", level=log.NOISY)
+            return
+        self._announcements.add(announcement)
+        for s in self._subscribers.get(service_name, []):
+            s.callRemote("announce", set([announcement]))
+
+    def remote_subscribe(self, subscriber, service_name):
+        self.log("introducer: subscription[%s] request at %s" % (service_name,
+                                                                 subscriber))
+        if service_name not in self._subscribers:
+            self._subscribers[service_name] = set()
+        subscribers = self._subscribers[service_name]
+        if subscriber in subscribers:
+            self.log("but they're already subscribed, ignoring",
+                     level=log.UNUSUAL)
+            return
+        subscribers.add(subscriber)
         def _remove():
-            log.msg(" introducer: removing %s %s" % (node, furl))
-            self.nodes.remove(node)
-            if furl is not None:
-                self.furls.remove(furl)
-        node.notifyOnDisconnect(_remove)
-        if furl is not None:
-            self.furls.add(furl)
-            for othernode in self.nodes:
-                othernode.callRemote("new_peers", set([furl]))
-        node.callRemote("new_peers", self.furls)
-        if self._encoding_parameters is not None:
-            node.callRemote("set_encoding_parameters",
-                            self._encoding_parameters)
-        self.nodes.add(node)
+            self.log("introducer: unsubscribing[%s] %s" % (service_name,
+                                                           subscriber))
+            subscribers.remove(subscriber)
+        subscriber.notifyOnDisconnect(_remove)
 
-class IntroducerClient(service.Service, Referenceable):
-    implements(RIIntroducerClient)
+        announcements = set( [ a
+                               for a in self._announcements
+                               if a[1] == service_name ] )
+        d = subscriber.callRemote("announce", announcements)
+        d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)
 
-    def __init__(self, tub, introducer_furl, my_furl):
-        self.tub = tub
-        self.introducer_furl = introducer_furl
-        self.my_furl = my_furl
+        def UNKNOWN(): # TODO
+            if self._encoding_parameters is not None:
+                node.callRemote("set_encoding_parameters",
+                                self._encoding_parameters)
 
-        self.connections = {} # k: nodeid, v: ref
-        self.reconnectors = {} # k: FURL, v: reconnector
-        self._connected = False
 
-        self.connection_observers = observer.ObserverList()
-        self.encoding_parameters = None
+class PeerCountObserver:
+    # This is used by unit test code to wait until peer connections have been
+    # established.
 
+    def __init__(self):
         # The N'th element of _observers_of_enough_peers is None if nobody has
         # asked to be informed when N peers become connected, it is a
         # OneShotObserverList if someone has asked to be informed, and that list
@@ -92,33 +108,7 @@ class IntroducerClient(service.Service, Referenceable):
         # interested in (i.e., there are never trailing Nones in
         # _observers_of_fewer_than_peers).
         self._observers_of_fewer_than_peers = []
-
-    def startService(self):
-        service.Service.startService(self)
-        self.introducer_reconnector = self.tub.connectTo(self.introducer_furl,
-                                                         self._got_introducer)
-        def connect_failed(failure):
-            self.log("\n\nInitial Introducer connection failed: "
-                     "perhaps it's down\n")
-            self.log(str(failure))
-        d = self.tub.getReference(self.introducer_furl)
-        d.addErrback(connect_failed)
-
-    def log(self, msg):
-        self.parent.log(msg)
-
-    def remote_new_peers(self, furls):
-        for furl in furls:
-            self._new_peer(furl)
-
-    def remote_set_encoding_parameters(self, parameters):
-        self.encoding_parameters = parameters
-
-    def stopService(self):
-        service.Service.stopService(self)
-        self.introducer_reconnector.stopConnecting()
-        for reconnector in self.reconnectors.itervalues():
-            reconnector.stopConnecting()
+        self.connection_observers = observer.ObserverList()
 
     def _notify_observers_of_enough_peers(self, numpeers):
         if len(self._observers_of_enough_peers) > numpeers:
@@ -141,72 +131,6 @@ class IntroducerClient(service.Service, Referenceable):
                 while len(self._observers_of_fewer_than_peers) > numpeers and (not self._observers_of_fewer_than_peers[-1]):
                     self._observers_of_fewer_than_peers.pop()
 
-    def _new_peer(self, furl):
-        if furl in self.reconnectors:
-            return
-        # TODO: rather than using the TubID as a nodeid, we should use
-        # something else. The thing that requires the least additional
-        # mappings is to use the foolscap "identifier" (the last component of
-        # the furl), since these are unguessable. Before we can do that,
-        # though, we need a way to conveniently make these identifiers
-        # persist from one run of the client program to the next. Also, using
-        # the foolscap identifier would mean that anyone who knows the name
-        # of the node also has all the secrets they need to contact and use
-        # them, which may or may not be what we want.
-        m = re.match(r'pb://(\w+)@', furl)
-        assert m
-        nodeid = b32decode(m.group(1).upper())
-        def _got_peer(rref):
-            self.log("connected to %s" % b32encode(nodeid).lower()[:8])
-            self.connection_observers.notify(nodeid, rref)
-            self.connections[nodeid] = rref
-            self._notify_observers_of_enough_peers(len(self.connections))
-            self._notify_observers_of_fewer_than_peers(len(self.connections))
-            def _lost():
-                # TODO: notifyOnDisconnect uses eventually(), but connects do
-                # not. Could this cause a problem?
-
-                # We know that this observer list must have been fired, since we
-                # had enough peers before this one was lost.
-                self._remove_observers_of_enough_peers(len(self.connections))
-                self._notify_observers_of_fewer_than_peers(len(self.connections)+1)
-
-                del self.connections[nodeid]
-
-            rref.notifyOnDisconnect(_lost)
-        self.log("connecting to %s" % b32encode(nodeid).lower()[:8])
-        self.reconnectors[furl] = self.tub.connectTo(furl, _got_peer)
-
-    def _got_introducer(self, introducer):
-        if self.my_furl:
-            my_furl_s = self.my_furl[6:13]
-        else:
-            my_furl_s = "<none>"
-        self.log("introducing ourselves: %s, %s" % (self, my_furl_s))
-        self._connected = True
-        d = introducer.callRemote("hello",
-                                  node=self,
-                                  furl=self.my_furl)
-        introducer.notifyOnDisconnect(self._disconnected)
-
-    def _disconnected(self):
-        self.log("bummer, we've lost our connection to the introducer")
-        self._connected = False
-
-    def notify_on_new_connection(self, cb):
-        """Register a callback that will be fired (with nodeid, rref) when
-        a new connection is established."""
-        self.connection_observers.subscribe(cb)
-
-    def connected_to_introducer(self):
-        return self._connected
-
-    def get_all_peerids(self):
-        return self.connections.iterkeys()
-
-    def get_all_peers(self):
-        return self.connections.iteritems()
-
     def when_enough_peers(self, numpeers):
         """
         I return a deferred that fires the next time that at least
@@ -233,3 +157,297 @@ class IntroducerClient(service.Service, Referenceable):
             if not self._observers_of_fewer_than_peers[numpeers]:
                 self._observers_of_fewer_than_peers[numpeers] = observer.OneShotObserverList()
             return self._observers_of_fewer_than_peers[numpeers].when_fired()
+
+    def notify_on_new_connection(self, cb):
+        """Register a callback that will be fired (with nodeid, rref) when
+        a new connection is established."""
+        self.connection_observers.subscribe(cb)
+
+    def add_peer(self, ann):
+        self._notify_observers_of_enough_peers(len(self.connections))
+        self._notify_observers_of_fewer_than_peers(len(self.connections))
+
+    def remove_peer(self, ann):
+        self._remove_observers_of_enough_peers(len(self.connections))
+        self._notify_observers_of_fewer_than_peers(len(self.connections)+1)
+
+
+
+class RemoteServiceConnector:
+    """I hold information about a peer service that we want to connect to. If
+    we are connected, I hold the RemoteReference, the peer's address, and the
+    peer's version information. I remember information about when we were
+    last connected to the peer too, even if we aren't currently connected.
+
+    @ivar announcement_time: when we first heard about this service
+    @ivar last_connect_time: when we last established a connection
+    @ivar last_loss_time: when we last lost a connection
+
+    @ivar version: the peer's version, from the most recent connection
+    @ivar oldest_supported: the peer's oldest supported version, same
+
+    @ivar rref: the RemoteReference, if connected, otherwise None
+    @ivar remote_host: the IAddress, if connected, otherwise None
+    """
+
+    def __init__(self, announcement, tub, ic):
+        self._tub = tub
+        self._announcement = announcement
+        self._ic = ic
+        (furl, service_name, ri_name, nickname, ver, oldest) = announcement
+
+        self._furl = furl
+        m = re.match(r'pb://(\w+)@', furl)
+        assert m
+        self._nodeid = b32decode(m.group(1).upper())
+        self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
+
+        self._index = (self._nodeid, service_name)
+        self._service_name = service_name
+
+        self.log("attempting to connect to %s" % self._nodeid_s)
+        self.announcement_time = time.time()
+        self.last_loss_time = None
+        self.rref = None
+        self.remote_host = None
+        self.last_connect_time = None
+        self.version = None
+        self.oldest_supported = None
+
+    def log(self, *args, **kwargs):
+        return self._ic.log(*args, **kwargs)
+
+    def get_index(self):
+        return self._index
+
+    def startConnecting(self):
+        self._reconnector = self._tub.connectTo(self._furl, self._got_service)
+
+    def stopConnecting(self):
+        self._reconnector.stopConnecting()
+
+    def _got_service(self, rref):
+        self.last_connect_time = time.time()
+        self.remote_host = str(rref.tracker.broker.transport.getPeer())
+
+        self.rref = rref
+        self.log("connected to %s" % self._nodeid_s)
+
+        self._ic.add_connection(self._nodeid, self._service_name, rref)
+
+        rref.notifyOnDisconnect(self._lost, rref)
+
+    def _lost(self, rref):
+        self.log("lost connection to %s" % self._nodeid_s)
+        self.last_loss_time = time.time()
+        self.rref = None
+        self.remote_host = None
+        self._ic.remove_connection(self._nodeid, self._service_name, rref)
+
+
+
+class IntroducerClient(service.Service, Referenceable):
+    implements(RIIntroducerSubscriberClient, IIntroducerClient)
+
+    def __init__(self, tub, introducer_furl,
+                 nickname, my_version, oldest_supported):
+        self._tub = tub
+        self.introducer_furl = introducer_furl
+
+        self._nickname = nickname
+        self._my_version = my_version
+        self._oldest_supported = oldest_supported
+
+        self._published_announcements = set()
+
+        self._publisher = None
+        self._connected = False
+
+        self._subscribed_service_names = set()
+        self._subscriptions = set() # requests we've actually sent
+        self._received_announcements = set()
+        # TODO: this set will grow without bound, until the node is restarted
+
+        # we only accept one announcement per (peerid+service_name) pair.
+        # This insures that an upgraded host replace their previous
+        # announcement. It also means that each peer must have their own Tub
+        # (no sharing), which is slightly weird but consistent with the rest
+        # of the Tahoe codebase.
+        self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
+        # self._connections is a set of (peerid, service_name, rref) tuples
+        self._connections = set()
+
+        #self.counter = PeerCountObserver()
+        self.counter = 0 # incremented each time we change state, for tests
+        self.encoding_parameters = None
+
+    def startService(self):
+        service.Service.startService(self)
+        rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
+        self._introducer_reconnector = rc
+        def connect_failed(failure):
+            self.log("Initial Introducer connection failed: perhaps it's down",
+                     level=log.WEIRD, failure=failure)
+        d = self._tub.getReference(self.introducer_furl)
+        d.addErrback(connect_failed)
+
+    def _got_introducer(self, publisher):
+        self.log("connected to introducer")
+        self._connected = True
+        self._publisher = publisher
+        publisher.notifyOnDisconnect(self._disconnected)
+        self._maybe_publish()
+        self._maybe_subscribe()
+
+    def _disconnected(self):
+        self.log("bummer, we've lost our connection to the introducer")
+        self._connected = False
+        self._publisher = None
+        self._subscriptions.clear()
+
+    def stopService(self):
+        service.Service.stopService(self)
+        self._introducer_reconnector.stopConnecting()
+        for rsc in self._connectors.itervalues():
+            rsc.stopConnecting()
+
+    def log(self, *args, **kwargs):
+        if "facility" not in kwargs:
+            kwargs["facility"] = "tahoe.introducer"
+        return log.msg(*args, **kwargs)
+
+
+    def publish(self, furl, service_name, remoteinterface_name):
+        ann = (furl, service_name, remoteinterface_name,
+               self._nickname, self._my_version, self._oldest_supported)
+        self._published_announcements.add(ann)
+        self._maybe_publish()
+
+    def subscribe_to(self, service_name):
+        self._subscribed_service_names.add(service_name)
+        self._maybe_subscribe()
+
+    def _maybe_subscribe(self):
+        if not self._publisher:
+            self.log("want to subscribe, but no introducer yet",
+                     level=log.NOISY)
+            return
+        for service_name in self._subscribed_service_names:
+            if service_name not in self._subscriptions:
+                # there is a race here, but the subscription desk ignores
+                # duplicate requests.
+                self._subscriptions.add(service_name)
+                d = self._publisher.callRemote("subscribe", self, service_name)
+                d.addErrback(log.err, facility="tahoe.introducer",
+                             level=log.WEIRD)
+
+    def _maybe_publish(self):
+        if not self._publisher:
+            self.log("want to publish, but no introducer yet", level=log.NOISY)
+            return
+        # this re-publishes everything. The Introducer ignores duplicates
+        for ann in self._published_announcements:
+            d = self._publisher.callRemote("publish", ann)
+            d.addErrback(log.err, facility="tahoe.introducer",
+                         level=log.WEIRD)
+
+
+
+    def remote_announce(self, announcements):
+        for ann in announcements:
+            self.log("received %d announcements" % len(announcements))
+            (furl, service_name, ri_name, nickname, ver, oldest) = ann
+            if service_name not in self._subscribed_service_names:
+                self.log("announcement for a service we don't care about [%s]"
+                         % (service_name,), level=log.WEIRD)
+                continue
+            if ann in self._received_announcements:
+                self.log("ignoring old announcement: %s" % (ann,),
+                         level=log.NOISY)
+                continue
+            self.log("new announcement[%s]: %s" % (service_name, ann))
+            self._received_announcements.add(ann)
+            self._new_announcement(ann)
+
+    def _new_announcement(self, announcement):
+        # this will only be called for new announcements
+        rsc = RemoteServiceConnector(announcement, self._tub, self)
+        index = rsc.get_index()
+        if index in self._connectors:
+            self._connectors[index].stopConnecting()
+        self._connectors[index] = rsc
+        rsc.startConnecting()
+
+    def add_connection(self, nodeid, service_name, rref):
+        self._connections.add( (nodeid, service_name, rref) )
+        self.counter += 1
+
+    def remove_connection(self, nodeid, service_name, rref):
+        self._connections.discard( (nodeid, service_name, rref) )
+        self.counter += 1
+
+
+    def get_all_connections(self):
+        return frozenset(self._connections)
+
+    def get_all_connectors(self):
+        return self._connectors.copy()
+
+    def get_all_peerids(self):
+        return frozenset([peerid
+                          for (peerid, service_name, rref)
+                          in self._connections])
+
+    def get_all_connections_for(self, service_name):
+        return frozenset([c
+                          for c in self._connections
+                          if c[1] == service_name])
+
+    def get_permuted_peers(self, service_name, key):
+        """Return an ordered list of (peerid, rref) tuples."""
+        # TODO: flags like add-myself-at-beginning and remove-myself? maybe
+        # not.
+
+        results = []
+        for (c_peerid, c_service_name, rref) in self._connections:
+            assert isinstance(c_peerid, str)
+            if c_service_name != service_name:
+                continue
+            #if not include_myself and peerid == self.nodeid:
+            #    self.log("get_permuted_peers: removing myself from the list")
+            #    continue
+            permuted = sha.new(key + c_peerid).digest()
+            results.append((permuted, c_peerid, rref))
+
+        results.sort(lambda a,b: cmp(a[0], b[0]))
+        return [ (r[1], r[2]) for r in results ]
+
+    def _TODO__add_ourselves(self, partial_peerlist, peerlist):
+        # moved here from mutable.Publish
+        my_peerid = self._node._client.nodeid
+        for (permutedid, peerid, conn) in partial_peerlist:
+            if peerid == my_peerid:
+                # we're already in there
+                return partial_peerlist
+        for (permutedid, peerid, conn) in peerlist:
+            if peerid == self._node._client.nodeid:
+                # found it
+                partial_peerlist.append( (permutedid, peerid, conn) )
+                return partial_peerlist
+        self.log("we aren't in our own peerlist??", level=log.WEIRD)
+        return partial_peerlist
+
+
+
+    def remote_set_encoding_parameters(self, parameters):
+        self.encoding_parameters = parameters
+
+    def connected_to_introducer(self):
+        return self._connected
+
+    def debug_disconnect_from_peerid(self, victim_nodeid):
+        # for unit tests: locate and sever all connections to the given
+        # peerid.
+        for (nodeid, service_name, rref) in self._connections:
+            if nodeid == victim_nodeid:
+                rref.tracker.broker.transport.loseConnection()
index b5ee5a340c145ff1371320b06727fcaf8d8f6217..f58f54f5db41cc908fdff18d05d9ec34315d3c4b 100644 (file)
@@ -301,16 +301,17 @@ class Retrieve:
 
     def _choose_initial_peers(self, numqueries):
         n = self._node
-        full_peerlist = n._client.get_permuted_peers(self._storage_index,
-                                                     include_myself=True)
+        full_peerlist = n._client.get_permuted_peers("storage",
+                                                     self._storage_index)
+        # TODO: include_myself=True
+
         # _peerlist is a list of (peerid,conn) tuples for peers that are
         # worth talking too. This starts with the first numqueries in the
         # permuted list. If that's not enough to get us a recoverable
         # version, we expand this to include the first 2*total_shares peerids
         # (assuming we learn what total_shares is from one of the first
         # numqueries peers)
-        self._peerlist = [(p[1],p[2])
-                          for p in islice(full_peerlist, numqueries)]
+        self._peerlist = [p for p in islice(full_peerlist, numqueries)]
         # _peerlist_limit is the query limit we used to build this list. If
         # we later increase this limit, it may be useful to re-scan the
         # permuted list.
@@ -323,33 +324,20 @@ class Retrieve:
         self._queries_outstanding = set()
         self._used_peers = set()
         self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
-        self._peer_storage_servers = {}
         dl = []
-        for (peerid, conn) in peerlist:
+        for (peerid, ss) in peerlist:
             self._queries_outstanding.add(peerid)
-            self._do_query(conn, peerid, self._storage_index, self._read_size,
-                           self._peer_storage_servers)
+            self._do_query(ss, peerid, self._storage_index, self._read_size)
 
         # control flow beyond this point: state machine. Receiving responses
         # from queries is the input. We might send out more queries, or we
         # might produce a result.
         return None
 
-    def _do_query(self, conn, peerid, storage_index, readsize,
-                  peer_storage_servers):
+    def _do_query(self, ss, peerid, storage_index, readsize):
         self._queries_outstanding.add(peerid)
-        if peerid in peer_storage_servers:
-            d = defer.succeed(peer_storage_servers[peerid])
-        else:
-            d = conn.callRemote("get_service", "storageserver")
-            def _got_storageserver(ss):
-                peer_storage_servers[peerid] = ss
-                return ss
-            d.addCallback(_got_storageserver)
-        d.addCallback(lambda ss: ss.callRemote("slot_readv", storage_index,
-                                               [], [(0, readsize)]))
-        d.addCallback(self._got_results, peerid, readsize,
-                      (conn, storage_index, peer_storage_servers))
+        d = ss.callRemote("slot_readv", storage_index, [], [(0, readsize)])
+        d.addCallback(self._got_results, peerid, readsize, (ss, storage_index))
         d.addErrback(self._query_failed, peerid)
         # errors that aren't handled by _query_failed (and errors caused by
         # _query_failed) get logged, but we still want to check for doneness.
@@ -377,9 +365,8 @@ class Retrieve:
                 # TODO: for MDMF, sanity-check self._read_size: don't let one
                 # server cause us to try to read gigabytes of data from all
                 # other servers.
-                (conn, storage_index, peer_storage_servers) = stuff
-                self._do_query(conn, peerid, storage_index, self._read_size,
-                               peer_storage_servers)
+                (ss, storage_index) = stuff
+                self._do_query(ss, peerid, storage_index, self._read_size)
                 return
             except CorruptShareError, e:
                 # log it and give the other shares a chance to be processed
@@ -514,19 +501,19 @@ class Retrieve:
         self.log("search_distance=%d" % search_distance, level=log.UNUSUAL)
         if self._peerlist_limit < search_distance:
             # we might be able to get some more peers from the list
-            peers = self._node._client.get_permuted_peers(self._storage_index,
-                                                          include_myself=True)
-            self._peerlist = [(p[1],p[2])
-                              for p in islice(peers, search_distance)]
+            peers = self._node._client.get_permuted_peers("storage",
+                                                          self._storage_index)
+            # TODO: include_myself=True
+            self._peerlist = [p for p in islice(peers, search_distance)]
             self._peerlist_limit = search_distance
             self.log("added peers, peerlist=%d, peerlist_limit=%d"
                      % (len(self._peerlist), self._peerlist_limit),
                      level=log.UNUSUAL)
         # are there any peers on the list that we haven't used?
         new_query_peers = []
-        for (peerid, conn) in self._peerlist:
+        for (peerid, ss) in self._peerlist:
             if peerid not in self._used_peers:
-                new_query_peers.append( (peerid, conn) )
+                new_query_peers.append( (peerid, ss) )
                 if len(new_query_peers) > 5:
                     # only query in batches of 5. TODO: this is pretty
                     # arbitrary, really I want this to be something like
@@ -535,10 +522,8 @@ class Retrieve:
         if new_query_peers:
             self.log("sending %d new queries (read %d bytes)" %
                      (len(new_query_peers), self._read_size), level=log.UNUSUAL)
-            for (peerid, conn) in new_query_peers:
-                self._do_query(conn, peerid,
-                               self._storage_index, self._read_size,
-                               self._peer_storage_servers)
+            for (peerid, ss) in new_query_peers:
+                self._do_query(ss, peerid, self._storage_index, self._read_size)
             # we'll retrigger when those queries come back
             return
 
@@ -803,26 +788,27 @@ class Publish:
         # the share we use for ourselves didn't count against the N total..
         # maybe use N+1 if we find ourselves in the permuted list?
 
-        peerlist = self._node._client.get_permuted_peers(storage_index,
-                                                         include_myself=True)
+        peerlist = self._node._client.get_permuted_peers("storage",
+                                                         storage_index)
+        # make sure our local server is in the list
+        # TODO: include_myself_at_beginning=True
 
         current_share_peers = DictOfSets()
         reachable_peers = {}
-        # list of (peerid, offset, length) where the encprivkey might be found
+        # list of (peerid, shnum, offset, length) where the encprivkey might
+        # be found
         self._encprivkey_shares = []
 
         EPSILON = total_shares / 2
         #partial_peerlist = islice(peerlist, total_shares + EPSILON)
         partial_peerlist = peerlist[:total_shares+EPSILON]
 
-        # make sure our local server is in the list
-        partial_peerlist = self._add_ourselves(partial_peerlist, peerlist)
+        self._storage_servers = {}
 
-        peer_storage_servers = {}
         dl = []
-        for (permutedid, peerid, conn) in partial_peerlist:
-            d = self._do_query(conn, peerid, peer_storage_servers,
-                               storage_index)
+        for permutedid, (peerid, ss) in enumerate(partial_peerlist):
+            self._storage_servers[peerid] = ss
+            d = self._do_query(ss, peerid, storage_index)
             d.addCallback(self._got_query_results,
                           peerid, permutedid,
                           reachable_peers, current_share_peers)
@@ -830,7 +816,7 @@ class Publish:
         d = defer.DeferredList(dl)
         d.addCallback(self._got_all_query_results,
                       total_shares, reachable_peers,
-                      current_share_peers, peer_storage_servers)
+                      current_share_peers)
         # TODO: add an errback to, probably to ignore that peer
         # TODO: if we can't get a privkey from these servers, consider
         # looking farther afield. Make sure we include ourselves in the
@@ -839,28 +825,10 @@ class Publish:
         # but ourselves.
         return d
 
-    def _add_ourselves(self, partial_peerlist, peerlist):
-        my_peerid = self._node._client.nodeid
-        for (permutedid, peerid, conn) in partial_peerlist:
-            if peerid == my_peerid:
-                # we're already in there
-                return partial_peerlist
-        for (permutedid, peerid, conn) in peerlist:
-            if peerid == self._node._client.nodeid:
-                # found it
-                partial_peerlist.append( (permutedid, peerid, conn) )
-                return partial_peerlist
-        self.log("we aren't in our own peerlist??", level=log.WEIRD)
-        return partial_peerlist
-
-    def _do_query(self, conn, peerid, peer_storage_servers, storage_index):
+    def _do_query(self, ss, peerid, storage_index):
         self.log("querying %s" % idlib.shortnodeid_b2a(peerid))
-        d = conn.callRemote("get_service", "storageserver")
-        def _got_storageserver(ss):
-            peer_storage_servers[peerid] = ss
-            return ss.callRemote("slot_readv",
-                                 storage_index, [], [(0, self._read_size)])
-        d.addCallback(_got_storageserver)
+        d = ss.callRemote("slot_readv",
+                          storage_index, [], [(0, self._read_size)])
         return d
 
     def _got_query_results(self, datavs, peerid, permutedid,
@@ -927,7 +895,7 @@ class Publish:
             # files (since the privkey will be small enough to fit in the
             # write cap).
 
-            self._encprivkey_shares.append( (peerid, shnum, offset, length) )
+            self._encprivkey_shares.append( (peerid, shnum, offset, length))
             return
 
         (seqnum, root_hash, IV, k, N, segsize, datalen,
@@ -954,7 +922,7 @@ class Publish:
 
     def _got_all_query_results(self, res,
                                total_shares, reachable_peers,
-                               current_share_peers, peer_storage_servers):
+                               current_share_peers):
         self.log("_got_all_query_results")
         # now that we know everything about the shares currently out there,
         # decide where to place the new shares.
@@ -1019,7 +987,7 @@ class Publish:
 
         assert not shares_needing_homes
 
-        target_info = (target_map, shares_per_peer, peer_storage_servers)
+        target_info = (target_map, shares_per_peer)
         return target_info
 
     def _obtain_privkey(self, target_info):
@@ -1032,16 +1000,16 @@ class Publish:
         # peers one at a time until we get a copy. Only bother asking peers
         # who've admitted to holding a share.
 
-        target_map, shares_per_peer, peer_storage_servers = target_info
+        target_map, shares_per_peer = target_info
         # pull shares from self._encprivkey_shares
         if not self._encprivkey_shares:
             raise NotEnoughPeersError("Unable to find a copy of the privkey")
 
         (peerid, shnum, offset, length) = self._encprivkey_shares.pop(0)
+        ss = self._storage_servers[peerid]
         self.log("trying to obtain privkey from %s shnum %d" %
                  (idlib.shortnodeid_b2a(peerid), shnum))
-        d = self._do_privkey_query(peer_storage_servers[peerid], peerid,
-                                   shnum, offset, length)
+        d = self._do_privkey_query(ss, peerid, shnum, offset, length)
         d.addErrback(self.log_err)
         d.addCallback(lambda res: self._obtain_privkey(target_info))
         return d
@@ -1174,7 +1142,7 @@ class Publish:
         # surprises here are *not* indications of UncoordinatedWriteError,
         # and we'll need to respond to them more gracefully.)
 
-        target_map, shares_per_peer, peer_storage_servers = target_info
+        target_map, shares_per_peer = target_info
 
         my_checkstring = pack_checkstring(seqnum, root_hash, IV)
         peer_messages = {}
@@ -1206,7 +1174,7 @@ class Publish:
             cancel_secret = self._node.get_cancel_secret(peerid)
             secrets = (write_enabler, renew_secret, cancel_secret)
 
-            d = self._do_testreadwrite(peerid, peer_storage_servers, secrets,
+            d = self._do_testreadwrite(peerid, secrets,
                                        tw_vectors, read_vector)
             d.addCallback(self._got_write_answer, tw_vectors, my_checkstring,
                           peerid, expected_old_shares[peerid], dispatch_map)
@@ -1216,16 +1184,16 @@ class Publish:
         d.addCallback(lambda res: (self._surprised, dispatch_map))
         return d
 
-    def _do_testreadwrite(self, peerid, peer_storage_servers, secrets,
+    def _do_testreadwrite(self, peerid, secrets,
                           tw_vectors, read_vector):
-        conn = peer_storage_servers[peerid]
         storage_index = self._node._uri.storage_index
+        ss = self._storage_servers[peerid]
 
-        d = conn.callRemote("slot_testv_and_readv_and_writev",
-                            storage_index,
-                            secrets,
-                            tw_vectors,
-                            read_vector)
+        d = ss.callRemote("slot_testv_and_readv_and_writev",
+                          storage_index,
+                          secrets,
+                          tw_vectors,
+                          read_vector)
         return d
 
     def _got_write_answer(self, answer, tw_vectors, my_checkstring,
index 6e61087cbbd5ad57d3f017b997bc293254d0ed53..4da21e2c4bd206f2a213da4583c424c832c92c50 100644 (file)
@@ -50,10 +50,8 @@ class CHKCheckerAndUEBFetcher:
 
     def _get_all_shareholders(self, storage_index):
         dl = []
-        for (pmpeerid, peerid, connection) in self._peer_getter(storage_index):
-            d = connection.callRemote("get_service", "storageserver")
-            d.addCallback(lambda ss: ss.callRemote("get_buckets",
-                                                   storage_index))
+        for (peerid, ss) in self._peer_getter("storage", storage_index):
+            d = ss.callRemote("get_buckets", storage_index)
             d.addCallbacks(self._got_response, self._got_error,
                            callbackArgs=(peerid,))
             dl.append(d)
index 49fa7eae857976040390d91d0a91f280344c250a..c50aa58a2125589e631f530f51d1f4e4164265a5 100644 (file)
@@ -11,6 +11,7 @@ from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
      BadWriteEnablerError, IStatsProducer
 from allmydata.util import fileutil, idlib, mathutil, log
 from allmydata.util.assertutil import precondition, _assert
+import allmydata # for __version__
 
 class DataTooLargeError(Exception):
     pass
@@ -669,14 +670,20 @@ class StorageServer(service.MultiService, Referenceable):
     implements(RIStorageServer, IStatsProducer)
     name = 'storageserver'
 
-    def __init__(self, storedir, sizelimit=None, no_storage=False, stats_provider=None):
+    # we're pretty narrow-minded right now
+    OLDEST_SUPPORTED_VERSION = allmydata.__version__
+
+    def __init__(self, storedir, sizelimit=None,
+                 discard_storage=False, readonly_storage=False,
+                 stats_provider=None):
         service.MultiService.__init__(self)
         self.storedir = storedir
         sharedir = os.path.join(storedir, "shares")
         fileutil.make_dirs(sharedir)
         self.sharedir = sharedir
         self.sizelimit = sizelimit
-        self.no_storage = no_storage
+        self.no_storage = discard_storage
+        self.readonly_storage = readonly_storage
         self.stats_provider = stats_provider
         if self.stats_provider:
             self.stats_provider.register_producer(self)
@@ -684,12 +691,17 @@ class StorageServer(service.MultiService, Referenceable):
         self._clean_incomplete()
         fileutil.make_dirs(self.incomingdir)
         self._active_writers = weakref.WeakKeyDictionary()
+        lp = log.msg("StorageServer created, now measuring space..",
+                     facility="tahoe.storage")
         self.measure_size()
+        log.msg(format="space measurement done, consumed=%(consumed)d bytes",
+                consumed=self.consumed,
+                parent=lp, facility="tahoe.storage")
 
     def log(self, *args, **kwargs):
-        if self.parent:
-            return self.parent.log(*args, **kwargs)
-        return
+        if "facility" not in kwargs:
+            kwargs["facility"] = "tahoe.storage"
+        return log.msg(*args, **kwargs)
 
     def setNodeID(self, nodeid):
         # somebody must set this before any slots can be created or leases
@@ -720,6 +732,9 @@ class StorageServer(service.MultiService, Referenceable):
             space += bw.allocated_size()
         return space
 
+    def remote_get_versions(self):
+        return (str(allmydata.__version__), str(self.OLDEST_SUPPORTED_VERSION))
+
     def remote_allocate_buckets(self, storage_index,
                                 renew_secret, cancel_secret,
                                 sharenums, allocated_size,
@@ -754,6 +769,10 @@ class StorageServer(service.MultiService, Referenceable):
             sf = ShareFile(fn)
             sf.add_or_renew_lease(lease_info)
 
+        if self.readonly_storage:
+            # we won't accept new shares
+            return alreadygot, bucketwriters
+
         for shnum in sharenums:
             incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
             finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
index 764d25bf2db7eaa653971e5ed5aa393786a52481..2cab1d4963c87803352c504aaf9a1907fb628c6e 100644 (file)
@@ -12,10 +12,12 @@ from foolscap.eventual import flushEventualQueue
 
 class FakeIntroducerClient(introducer.IntroducerClient):
     def __init__(self):
-        self.connections = {}
-
-def permute(c, key):
-    return [ y for x, y, z in c.get_permuted_peers(key) ]
+        self._connections = set()
+    def add_peer(self, nodeid):
+        entry = (nodeid, "storage", "rref")
+        self._connections.add(entry)
+    def remove_all_peers(self):
+        self._connections.clear()
 
 class Basic(unittest.TestCase):
     def test_loadable(self):
@@ -94,6 +96,10 @@ class Basic(unittest.TestCase):
         self.failUnlessEqual(c.getServiceNamed("storageserver").sizelimit,
                              None)
 
+    def _permute(self, c, key):
+        return [ peerid
+                 for (peerid,rref) in c.get_permuted_peers("storage", key) ]
+
     def test_permute(self):
         basedir = "test_client.Basic.test_permute"
         os.mkdir(basedir)
@@ -102,17 +108,18 @@ class Basic(unittest.TestCase):
         c = client.Client(basedir)
         c.introducer_client = FakeIntroducerClient()
         for k in ["%d" % i for i in range(5)]:
-            c.introducer_client.connections[k] = None
-        self.failUnlessEqual(permute(c, "one"), ['3','1','0','4','2'])
-        self.failUnlessEqual(permute(c, "two"), ['0','4','2','1','3'])
-        c.introducer_client.connections.clear()
-        self.failUnlessEqual(permute(c, "one"), [])
+            c.introducer_client.add_peer(k)
+
+        self.failUnlessEqual(self._permute(c, "one"), ['3','1','0','4','2'])
+        self.failUnlessEqual(self._permute(c, "two"), ['0','4','2','1','3'])
+        c.introducer_client.remove_all_peers()
+        self.failUnlessEqual(self._permute(c, "one"), [])
 
         c2 = client.Client(basedir)
         c2.introducer_client = FakeIntroducerClient()
         for k in ["%d" % i for i in range(5)]:
-            c2.introducer_client.connections[k] = None
-        self.failUnlessEqual(permute(c2, "one"), ['3','1','0','4','2'])
+            c2.introducer_client.add_peer(k)
+        self.failUnlessEqual(self._permute(c2, "one"), ['3','1','0','4','2'])
 
     def test_versions(self):
         basedir = "test_client.Basic.test_versions"
@@ -120,7 +127,8 @@ class Basic(unittest.TestCase):
         open(os.path.join(basedir, "introducer.furl"), "w").write("")
         open(os.path.join(basedir, "vdrive.furl"), "w").write("")
         c = client.Client(basedir)
-        mine, oldest = c.remote_get_versions()
+        ss = c.getServiceNamed("storageserver")
+        mine, oldest = ss.remote_get_versions()
         self.failUnlessEqual(mine, str(allmydata.__version__))
         self.failIfEqual(str(allmydata.__version__), "unknown")
         self.failUnless("." in str(allmydata.__version__),
index 8599512463ae4a051592217107a0ed038e538804..7ce1293b4e01e9074cf0063f3e0176db58c98921 100644 (file)
@@ -43,7 +43,7 @@ class FakeClient(service.MultiService):
         return True
     def get_encoding_parameters(self):
         return self.DEFAULT_ENCODING_PARAMETERS
-    def get_permuted_peers(self, storage_index):
+    def get_permuted_peers(self, service_name, storage_index):
         return []
 
 def flush_but_dont_ignore(res):
index 925acd0dc71dedafa8bcf8aaed1b49377d10ff54..5f154b4bfe656e834534769ba237d4cd0ae915d9 100644 (file)
@@ -1,9 +1,9 @@
-from base64 import b32encode
+from base64 import b32decode
 
 import os
 
 from twisted.trial import unittest
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from twisted.python import log
 
 from foolscap import Tub, Referenceable
@@ -66,10 +66,8 @@ class TestIntroducer(unittest.TestCase, testutil.PollMixin):
 
 
     def test_create(self):
-        ic = IntroducerClient(None, "introducer", "myfurl")
-        def _ignore(nodeid, rref):
-            pass
-        ic.notify_on_new_connection(_ignore)
+        ic = IntroducerClient(None, "introducer.furl", "my_nickname",
+                              "my_version", "oldest_version")
 
     def test_listen(self):
         i = IntroducerService()
@@ -87,7 +85,7 @@ class TestIntroducer(unittest.TestCase, testutil.PollMixin):
 
         i = IntroducerService()
         i.setServiceParent(self.parent)
-        iurl = tub.registerReference(i)
+        introducer_furl = tub.registerReference(i)
         NUMCLIENTS = 5
         # we have 5 clients who publish themselves, and an extra one which
         # does not. When the connections are fully established, all six nodes
@@ -106,71 +104,82 @@ class TestIntroducer(unittest.TestCase, testutil.PollMixin):
 
             n = FakeNode()
             log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
+            c = IntroducerClient(tub, introducer_furl,
+                                 "nickname-%d" % i, "version", "oldest")
             if i < NUMCLIENTS:
                 node_furl = tub.registerReference(n)
-            else:
-                node_furl = None
-            c = IntroducerClient(tub, iurl, node_furl)
+                c.publish(node_furl, "storage", "ri_name")
+            # the last one does not publish anything
+
+            c.subscribe_to("storage")
 
             c.setServiceParent(self.parent)
             clients.append(c)
             tubs[c] = tub
 
-        def _wait_for_all_connections(res):
-            dl = [] # list of when_enough_peers() for each peer
-            # will fire once everybody is connected
+        def _wait_for_all_connections():
             for c in clients:
-                dl.append(c.when_enough_peers(NUMCLIENTS))
-            return defer.DeferredList(dl, fireOnOneErrback=True)
-
-        d = _wait_for_all_connections(None)
+                if len(c.get_all_connections()) < NUMCLIENTS:
+                    return False
+            return True
+        d = self.poll(_wait_for_all_connections, timeout=5)
 
         def _check1(res):
             log.msg("doing _check1")
             for c in clients:
-                self.failUnlessEqual(len(c.connections), NUMCLIENTS)
-                self.failUnless(c._connected) # to the introducer
+                self.failUnless(c.connected_to_introducer())
+                self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
+                self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
+                self.failUnlessEqual(len(c.get_all_connections_for("storage")),
+                                     NUMCLIENTS)
         d.addCallback(_check1)
+
         origin_c = clients[0]
         def _disconnect_somebody_else(res):
             # now disconnect somebody's connection to someone else
-            # find a target that is not themselves
-            for nodeid,rref in origin_c.connections.items():
-                if b32encode(nodeid).lower() != tubs[origin_c].tubID:
-                    victim = rref
-                    break
-            log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
-            victim.tracker.broker.transport.loseConnection()
+            current_counter = origin_c.counter
+            victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
+            log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID,
+                                               victim_nodeid))
+            origin_c.debug_disconnect_from_peerid(victim_nodeid)
             log.msg(" did disconnect")
+
+            # then wait until something changes, which ought to be them
+            # noticing the loss
+            def _compare():
+                return current_counter != origin_c.counter
+            return self.poll(_compare, timeout=5)
+
         d.addCallback(_disconnect_somebody_else)
-        def _wait_til_he_notices(res):
-            # wait til the origin_c notices the loss
-            log.msg(" waiting until peer notices the disconnection")
-            return origin_c.when_fewer_than_peers(NUMCLIENTS)
-        d.addCallback(_wait_til_he_notices)
-        d.addCallback(_wait_for_all_connections)
+
+        # and wait for them to reconnect
+        d.addCallback(lambda res: self.poll(_wait_for_all_connections, timeout=5))
         def _check2(res):
             log.msg("doing _check2")
             for c in clients:
-                self.failUnlessEqual(len(c.connections), NUMCLIENTS)
+                self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
         d.addCallback(_check2)
+
         def _disconnect_yourself(res):
             # now disconnect somebody's connection to themselves.
-            # find a target that *is* themselves
-            for nodeid,rref in origin_c.connections.items():
-                if b32encode(nodeid).lower() == tubs[origin_c].tubID:
-                    victim = rref
-                    break
-            log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
-            victim.tracker.broker.transport.loseConnection()
+            current_counter = origin_c.counter
+            victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
+            log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID,
+                                               victim_nodeid))
+            origin_c.debug_disconnect_from_peerid(victim_nodeid)
             log.msg(" did disconnect from self")
+
+            def _compare():
+                return current_counter != origin_c.counter
+            return self.poll(_compare, timeout=5)
         d.addCallback(_disconnect_yourself)
-        d.addCallback(_wait_til_he_notices)
-        d.addCallback(_wait_for_all_connections)
+
+        d.addCallback(lambda res: self.poll(_wait_for_all_connections, timeout=5))
         def _check3(res):
             log.msg("doing _check3")
             for c in clients:
-                self.failUnlessEqual(len(c.connections), NUMCLIENTS)
+                self.failUnlessEqual(len(c.get_all_connections_for("storage")),
+                                     NUMCLIENTS)
         d.addCallback(_check3)
         def _shutdown_introducer(res):
             # now shut down the introducer. We do this by shutting down the
@@ -180,100 +189,19 @@ class TestIntroducer(unittest.TestCase, testutil.PollMixin):
             log.msg("shutting down the introducer")
             return self.central_tub.disownServiceParent()
         d.addCallback(_shutdown_introducer)
-        d.addCallback(self.stall, 2)
+        def _wait_for_introducer_loss():
+            for c in clients:
+                if c.connected_to_introducer():
+                    return False
+            return True
+        d.addCallback(lambda res: self.poll(_wait_for_introducer_loss, timeout=5))
+
         def _check4(res):
             log.msg("doing _check4")
             for c in clients:
-                self.failUnlessEqual(len(c.connections), NUMCLIENTS)
-                self.failIf(c._connected)
+                self.failUnlessEqual(len(c.get_all_connections_for("storage")),
+                                     NUMCLIENTS)
+                self.failIf(c.connected_to_introducer())
         d.addCallback(_check4)
         return d
-    test_system.timeout = 2400
-
-    def stall(self, res, timeout):
-        d = defer.Deferred()
-        reactor.callLater(timeout, d.callback, res)
-        return d
-
-    def test_system_this_one_breaks(self):
-        # this uses a single Tub, which has a strong effect on the
-        # failingness
-        tub = Tub()
-        tub.setOption("logLocalFailures", True)
-        tub.setOption("logRemoteFailures", True)
-        tub.setServiceParent(self.parent)
-        l = tub.listenOn("tcp:0")
-        portnum = l.getPortnum()
-        tub.setLocation("localhost:%d" % portnum)
-
-        i = IntroducerService()
-        i.setServiceParent(self.parent)
-        iurl = tub.registerReference(i)
-
-        clients = []
-        for i in range(5):
-            n = FakeNode()
-            node_furl = tub.registerReference(n)
-            c = IntroducerClient(tub, iurl, node_furl)
-            c.setServiceParent(self.parent)
-            clients.append(c)
-
-        # time passes..
-        d = defer.Deferred()
-        def _check(res):
-            log.msg("doing _check")
-            self.failUnlessEqual(len(clients[0].connections), 5)
-        d.addCallback(_check)
-        reactor.callLater(2, d.callback, None)
-        return d
-    del test_system_this_one_breaks
-
-
-    def test_system_this_one_breaks_too(self):
-        # this one shuts down so quickly that it fails in a different way
-        self.central_tub = tub = Tub()
-        tub.setOption("logLocalFailures", True)
-        tub.setOption("logRemoteFailures", True)
-        tub.setServiceParent(self.parent)
-        l = tub.listenOn("tcp:0")
-        portnum = l.getPortnum()
-        tub.setLocation("localhost:%d" % portnum)
-
-        i = IntroducerService()
-        i.setServiceParent(self.parent)
-        iurl = tub.registerReference(i)
-
-        clients = []
-        for i in range(5):
-            tub = Tub()
-            tub.setOption("logLocalFailures", True)
-            tub.setOption("logRemoteFailures", True)
-            tub.setServiceParent(self.parent)
-            l = tub.listenOn("tcp:0")
-            portnum = l.getPortnum()
-            tub.setLocation("localhost:%d" % portnum)
 
-            n = FakeNode()
-            node_furl = tub.registerReference(n)
-            c = IntroducerClient(tub, iurl, node_furl)
-            c.setServiceParent(self.parent)
-            clients.append(c)
-
-        # time passes..
-        d = defer.Deferred()
-        reactor.callLater(0.01, d.callback, None)
-        def _check(res):
-            log.msg("doing _check")
-            self.fail("BOOM")
-            for c in clients:
-                self.failUnlessEqual(len(c.connections), 5)
-            c.connections.values()[0].tracker.broker.transport.loseConnection()
-            return self.stall(None, 2)
-        d.addCallback(_check)
-        def _check_again(res):
-            log.msg("doing _check_again")
-            for c in clients:
-                self.failUnlessEqual(len(c.connections), 5)
-        d.addCallback(_check_again)
-        return d
-    del test_system_this_one_breaks_too
index ecea762d463c097b081206afddbe1c362918a28f..0d3460f50d096e064be5edc83a12888f75c5df6d 100644 (file)
@@ -47,12 +47,12 @@ class FakeFilenode(mutable.MutableFileNode):
         return defer.succeed(None)
 
 class FakePublish(mutable.Publish):
-    def _do_query(self, conn, peerid, peer_storage_servers, storage_index):
-        assert conn[0] == peerid
+    def _do_query(self, ss, peerid, storage_index):
+        assert ss[0] == peerid
         shares = self._peers[peerid]
         return defer.succeed(shares)
 
-    def _do_testreadwrite(self, peerid, peer_storage_servers, secrets,
+    def _do_testreadwrite(self, peerid, secrets,
                           tw_vectors, read_vector):
         # always-pass: parrot the test vectors back to them.
         readv = {}
@@ -113,9 +113,10 @@ class FakeClient:
         res = FakeFilenode(self).init_from_uri(u)
         return res
 
-    def get_permuted_peers(self, key, include_myself=True):
+    def get_permuted_peers(self, service_name, key):
+        # TODO: include_myself=True
         """
-        @return: list of (permuted-peerid, peerid, connection,)
+        @return: list of (peerid, connection,)
         """
         peers_and_connections = [(pid, (pid,)) for pid in self._peerids]
         results = []
@@ -124,6 +125,7 @@ class FakeClient:
             permuted = sha.new(key + peerid).digest()
             results.append((permuted, peerid, connection))
         results.sort()
+        results = [ (r[1],r[2]) for r in results]
         return results
 
     def upload(self, uploadable):
@@ -299,7 +301,7 @@ class Publish(unittest.TestCase):
         total_shares = 10
         d = p._query_peers(total_shares)
         def _done(target_info):
-            (target_map, shares_per_peer, peer_storage_servers) = target_info
+            (target_map, shares_per_peer) = target_info
             shares_per_peer = {}
             for shnum in target_map:
                 for (peerid, old_seqnum, old_R) in target_map[shnum]:
@@ -321,7 +323,7 @@ class Publish(unittest.TestCase):
         total_shares = 10
         d = p._query_peers(total_shares)
         def _done(target_info):
-            (target_map, shares_per_peer, peer_storage_servers) = target_info
+            (target_map, shares_per_peer) = target_info
             shares_per_peer = {}
             for shnum in target_map:
                 for (peerid, old_seqnum, old_R) in target_map[shnum]:
index 8bcb70698f75a9b228d88d5d2828b6aaba561d3a..a9c90d6839e92a6d4d67e478b2a07468ace59456 100644 (file)
@@ -32,7 +32,7 @@ This is some data to publish to the virtual drive, which needs to be large
 enough to not fit inside a LIT uri.
 """
 
-class SystemTest(testutil.SignalMixin, unittest.TestCase):
+class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
 
     def setUp(self):
         self.sparent = service.MultiService()
@@ -135,18 +135,20 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
         d.addCallback(lambda res: c)
         return d
 
+    def _check_connections(self):
+        for c in self.clients:
+            ic = c.introducer_client
+            if not ic.connected_to_introducer():
+                return False
+            if len(ic.get_all_peerids()) != self.numclients:
+                return False
+        return True
+
     def wait_for_connections(self, ignored=None):
         # TODO: replace this with something that takes a list of peerids and
         # fires when they've all been heard from, instead of using a count
         # and a threshold
-        for c in self.clients:
-            if (not c.introducer_client or
-                len(list(c.get_all_peerids())) != self.numclients):
-                d = defer.Deferred()
-                d.addCallback(self.wait_for_connections)
-                reactor.callLater(0.05, d.callback, None)
-                return d
-        return defer.succeed(None)
+        return self.poll(self._check_connections, timeout=200)
 
     def test_connections(self):
         self.basedir = "system/SystemTest/test_connections"
@@ -158,10 +160,8 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
             for c in self.clients:
                 all_peerids = list(c.get_all_peerids())
                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
-                permuted_peers = list(c.get_permuted_peers("a", True))
+                permuted_peers = list(c.get_permuted_peers("storage", "a"))
                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
-                permuted_other_peers = list(c.get_permuted_peers("a", False))
-                self.failUnlessEqual(len(permuted_other_peers), self.numclients)
 
         d.addCallback(_check)
         def _shutdown_extra_node(res):
@@ -196,10 +196,8 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
             for c in self.clients:
                 all_peerids = list(c.get_all_peerids())
                 self.failUnlessEqual(len(all_peerids), self.numclients)
-                permuted_peers = list(c.get_permuted_peers("a", True))
+                permuted_peers = list(c.get_permuted_peers("storage", "a"))
                 self.failUnlessEqual(len(permuted_peers), self.numclients)
-                permuted_other_peers = list(c.get_permuted_peers("a", False))
-                self.failUnlessEqual(len(permuted_other_peers), self.numclients-1)
         d.addCallback(_check_connections)
         def _do_upload(res):
             log.msg("UPLOADING")
@@ -266,8 +264,12 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
 
         def _download_nonexistent_uri(res):
             baduri = self.mangle_uri(self.uri)
+            log.msg("about to download non-existent URI", level=log.UNUSUAL,
+                    facility="tahoe.tests")
             d1 = self.downloader.download_to_data(baduri)
             def _baduri_should_fail(res):
+                log.msg("finished downloading non-existend URI",
+                        level=log.UNUSUAL, facility="tahoe.tests")
                 self.failUnless(isinstance(res, Failure))
                 self.failUnless(res.check(download.NotEnoughPeersError),
                                 "expected NotEnoughPeersError, got %s" % res)
@@ -834,9 +836,9 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
         d.addCallback(self.log, "GOT WEB LISTENER")
         return d
 
-    def log(self, res, msg):
+    def log(self, res, msg, **kwargs):
         # print "MSG: %s  RES: %s" % (msg, res)
-        log.msg(msg)
+        log.msg(msg, **kwargs)
         return res
 
     def stall(self, res, delay=1.0):
@@ -1064,7 +1066,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
         d.addCallback(_got_from_uri)
 
         # download from a bogus URI, make sure we get a reasonable error
-        d.addCallback(self.log, "_get_from_bogus_uri")
+        d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
         def _get_from_bogus_uri(res):
             d1 = getPage(base + "uri/%s?filename=%s"
                          % (self.mangle_uri(self.uri), "mydata567"))
@@ -1072,6 +1074,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
                        "410")
             return d1
         d.addCallback(_get_from_bogus_uri)
+        d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
 
         # upload a file with PUT
         d.addCallback(self.log, "about to try PUT")
@@ -1364,7 +1367,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
                     peers = set()
                     for shpeers in sharemap.values():
                         peers.update(shpeers)
-                    self.failUnlessEqual(len(peers), self.numclients-1)
+                    self.failUnlessEqual(len(peers), self.numclients)
         d.addCallback(_check_checker_results)
 
         def _check_stored_results(res):
index c402d08a2d5ec6d032f0399573dd4a73cec1d958..0bda732b0beeb7b1f7fc578cc4a97b283095c9a6 100644 (file)
@@ -3,7 +3,6 @@ import os
 from twisted.trial import unittest
 from twisted.python.failure import Failure
 from twisted.python import log
-from twisted.internet import defer
 from cStringIO import StringIO
 
 from allmydata import upload, encode, uri
@@ -69,20 +68,6 @@ class Uploadable(unittest.TestCase):
         d.addCallback(lambda res: u.close())
         return d
 
-class FakePeer:
-    def __init__(self, mode="good"):
-        self.ss = FakeStorageServer(mode)
-
-    def callRemote(self, methname, *args, **kwargs):
-        def _call():
-            meth = getattr(self, methname)
-            return meth(*args, **kwargs)
-        return defer.maybeDeferred(_call)
-
-    def get_service(self, sname):
-        assert sname == "storageserver"
-        return self.ss
-
 class FakeStorageServer:
     def __init__(self, mode):
         self.mode = mode
@@ -155,9 +140,9 @@ class FakeClient:
     def log(self, *args, **kwargs):
         pass
     def get_permuted_peers(self, storage_index, include_myself):
-        peers = [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),)
+        peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
                   for fakeid in range(self.num_servers) ]
-        self.last_peers = [p[2] for p in peers]
+        self.last_peers = [p[1] for p in peers]
         return peers
     def get_push_to_ourselves(self):
         return None
@@ -353,9 +338,9 @@ class PeerSelection(unittest.TestCase):
         d.addCallback(self._check_large, SIZE_LARGE)
         def _check(res):
             for p in self.node.last_peers:
-                allocated = p.ss.allocated
+                allocated = p.allocated
                 self.failUnlessEqual(len(allocated), 1)
-                self.failUnlessEqual(p.ss.queries, 1)
+                self.failUnlessEqual(p.queries, 1)
         d.addCallback(_check)
         return d
 
@@ -370,9 +355,9 @@ class PeerSelection(unittest.TestCase):
         d.addCallback(self._check_large, SIZE_LARGE)
         def _check(res):
             for p in self.node.last_peers:
-                allocated = p.ss.allocated
+                allocated = p.allocated
                 self.failUnlessEqual(len(allocated), 2)
-                self.failUnlessEqual(p.ss.queries, 2)
+                self.failUnlessEqual(p.queries, 2)
         d.addCallback(_check)
         return d
 
@@ -389,13 +374,13 @@ class PeerSelection(unittest.TestCase):
             got_one = []
             got_two = []
             for p in self.node.last_peers:
-                allocated = p.ss.allocated
+                allocated = p.allocated
                 self.failUnless(len(allocated) in (1,2), len(allocated))
                 if len(allocated) == 1:
-                    self.failUnlessEqual(p.ss.queries, 1)
+                    self.failUnlessEqual(p.queries, 1)
                     got_one.append(p)
                 else:
-                    self.failUnlessEqual(p.ss.queries, 2)
+                    self.failUnlessEqual(p.queries, 2)
                     got_two.append(p)
             self.failUnlessEqual(len(got_one), 49)
             self.failUnlessEqual(len(got_two), 1)
@@ -414,9 +399,9 @@ class PeerSelection(unittest.TestCase):
         d.addCallback(self._check_large, SIZE_LARGE)
         def _check(res):
             for p in self.node.last_peers:
-                allocated = p.ss.allocated
+                allocated = p.allocated
                 self.failUnlessEqual(len(allocated), 4)
-                self.failUnlessEqual(p.ss.queries, 2)
+                self.failUnlessEqual(p.queries, 2)
         d.addCallback(_check)
         return d
 
@@ -432,7 +417,7 @@ class PeerSelection(unittest.TestCase):
         def _check(res):
             counts = {}
             for p in self.node.last_peers:
-                allocated = p.ss.allocated
+                allocated = p.allocated
                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
             histogram = [counts.get(i, 0) for i in range(5)]
             self.failUnlessEqual(histogram, [0,0,0,2,1])
index ed147f1604f7182fcbb3867c7b062f3c560ddbae..78fa8cb53054f350039efb0ea8712d0d9d1cd45b 100644 (file)
@@ -44,18 +44,17 @@ class TooFullError(Exception):
 EXTENSION_SIZE = 1000
 
 class PeerTracker:
-    def __init__(self, peerid, permutedid, connection,
+    def __init__(self, peerid, storage_server,
                  sharesize, blocksize, num_segments, num_share_hashes,
                  storage_index,
                  bucket_renewal_secret, bucket_cancel_secret):
         precondition(isinstance(peerid, str), peerid)
         precondition(len(peerid) == 20, peerid)
         self.peerid = peerid
-        self.permutedid = permutedid
-        self.connection = connection # to an RIClient
+        self._storageserver = storage_server # to an RIStorageServer
         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
         self.sharesize = sharesize
-        #print "PeerTracker", peerid, permutedid, sharesize
+        #print "PeerTracker", peerid, sharesize
         as = storage.allocated_size(sharesize,
                                     num_segments,
                                     num_share_hashes,
@@ -66,7 +65,6 @@ class PeerTracker:
         self.num_segments = num_segments
         self.num_share_hashes = num_share_hashes
         self.storage_index = storage_index
-        self._storageserver = None
 
         self.renew_secret = bucket_renewal_secret
         self.cancel_secret = bucket_cancel_secret
@@ -77,15 +75,6 @@ class PeerTracker:
                    idlib.b2a(self.storage_index)[:6]))
 
     def query(self, sharenums):
-        if not self._storageserver:
-            d = self.connection.callRemote("get_service", "storageserver")
-            d.addCallback(self._got_storageserver)
-            d.addCallback(lambda res: self._query(sharenums))
-            return d
-        return self._query(sharenums)
-    def _got_storageserver(self, storageserver):
-        self._storageserver = storageserver
-    def _query(self, sharenums):
         #print " query", self.peerid, len(sharenums)
         d = self._storageserver.callRemote("allocate_buckets",
                                            self.storage_index,
@@ -144,7 +133,8 @@ class Tahoe2PeerSelector:
         self.use_peers = set() # PeerTrackers that have shares assigned to them
         self.preexisting_shares = {} # sharenum -> PeerTracker holding the share
 
-        peers = client.get_permuted_peers(storage_index, push_to_ourselves)
+        peers = client.get_permuted_peers("storage", storage_index)
+        # TODO: push_to_ourselves
         if not peers:
             raise encode.NotEnoughPeersError("client gave us zero peers")
 
@@ -167,7 +157,7 @@ class Tahoe2PeerSelector:
         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
                                                      storage_index)
 
-        trackers = [ PeerTracker(peerid, permutedid, conn,
+        trackers = [ PeerTracker(peerid, conn,
                                  share_size, block_size,
                                  num_segments, num_share_hashes,
                                  storage_index,
@@ -176,7 +166,7 @@ class Tahoe2PeerSelector:
                                  bucket_cancel_secret_hash(file_cancel_secret,
                                                            peerid),
                                  )
-                     for permutedid, peerid, conn in peers ]
+                     for (peerid, conn) in peers ]
         self.uncontacted_peers = trackers
 
         d = defer.maybeDeferred(self._loop)