From daecca65899dd476e92bfdd869b4fdc07d2a6653 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Tue, 5 Feb 2008 13:05:13 -0700 Subject: [PATCH] big introducer refactoring: separate publish+subscribe. Addresses #271. --- src/allmydata/checker.py | 8 +- src/allmydata/client.py | 108 +++--- src/allmydata/control.py | 5 +- src/allmydata/download.py | 13 +- src/allmydata/interfaces.py | 146 ++++++-- src/allmydata/introducer.py | 474 +++++++++++++++++++------- src/allmydata/mutable.py | 130 +++---- src/allmydata/offloaded.py | 6 +- src/allmydata/storage.py | 29 +- src/allmydata/test/test_client.py | 32 +- src/allmydata/test/test_helper.py | 2 +- src/allmydata/test/test_introducer.py | 196 ++++------- src/allmydata/test/test_mutable.py | 16 +- src/allmydata/test/test_system.py | 41 +-- src/allmydata/test/test_upload.py | 39 +-- src/allmydata/upload.py | 24 +- 16 files changed, 743 insertions(+), 526 deletions(-) diff --git a/src/allmydata/checker.py b/src/allmydata/checker.py index 66874013..0f47cad5 100644 --- a/src/allmydata/checker.py +++ b/src/allmydata/checker.py @@ -30,7 +30,7 @@ class SimpleCHKFileChecker: # messages (or if we used promises). found = set() for (pmpeerid, peerid, connection) in self.peer_getter(storage_index): - buckets = connection.get_service("storageserver").get_buckets(si) + buckets = connection.get_buckets(si) found.update(buckets.keys()) return len(found) ''' @@ -42,10 +42,8 @@ class SimpleCHKFileChecker: def _get_all_shareholders(self, storage_index): dl = [] - for (pmpeerid, peerid, connection) in self.peer_getter(storage_index): - d = connection.callRemote("get_service", "storageserver") - d.addCallback(lambda ss: ss.callRemote("get_buckets", - storage_index)) + for (peerid, ss) in self.peer_getter("storage", storage_index): + d = ss.callRemote("get_buckets", storage_index) d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peerid,)) dl.append(d) diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 6209abca..3efc6bb1 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -1,8 +1,6 @@ -import os, sha, stat, time, re -from foolscap import Referenceable -from zope.interface import implements -from allmydata.interfaces import RIClient +import os, stat, time, re +from allmydata.interfaces import RIStorageServer from allmydata import node from twisted.internet import reactor @@ -31,8 +29,7 @@ GiB=1024*MiB TiB=1024*GiB PiB=1024*TiB -class Client(node.Node, Referenceable, testutil.PollMixin): - implements(RIClient) +class Client(node.Node, testutil.PollMixin): PORTNUMFILE = "client.port" STOREDIR = 'storage' NODETYPE = "client" @@ -46,17 +43,19 @@ class Client(node.Node, Referenceable, testutil.PollMixin): # that we will abort an upload unless we can allocate space for at least # this many. 'total' is the total number of shares created by encoding. # If everybody has room then this is is how many we will upload. - DEFAULT_ENCODING_PARAMETERS = {"k":25, - "happy": 75, - "n": 100, + DEFAULT_ENCODING_PARAMETERS = {"k": 3, + "happy": 7, + "n": 10, "max_segment_size": 1*MiB, } def __init__(self, basedir="."): node.Node.__init__(self, basedir) self.logSource="Client" - self.my_furl = None - self.introducer_client = None + self.nickname = self.get_config("nickname") + if self.nickname is None: + self.nickname = "" + self.init_introducer_client() self.init_stats_provider() self.init_lease_secret() self.init_storage() @@ -67,8 +66,6 @@ class Client(node.Node, Referenceable, testutil.PollMixin): self.add_service(Checker()) # ControlServer and Helper are attached after Tub startup - self.introducer_furl = self.get_config("introducer.furl", required=True) - hotline_file = os.path.join(self.basedir, self.SUICIDE_PREVENTION_HOTLINE_FILE) if os.path.exists(hotline_file): @@ -81,6 +78,17 @@ class Client(node.Node, Referenceable, testutil.PollMixin): if webport: self.init_web(webport) # strports string + def init_introducer_client(self): + self.introducer_furl = self.get_config("introducer.furl", required=True) + ic = IntroducerClient(self.tub, self.introducer_furl, + self.nickname, + str(allmydata.__version__), + str(self.OLDEST_SUPPORTED_VERSION)) + self.introducer_client = ic + ic.setServiceParent(self) + # nodes that want to upload and download will need storage servers + ic.subscribe_to("storage") + def init_stats_provider(self): gatherer_furl = self.get_config('stats_gatherer.furl') if gatherer_furl: @@ -96,6 +104,12 @@ class Client(node.Node, Referenceable, testutil.PollMixin): self._lease_secret = idlib.a2b(secret_s) def init_storage(self): + # should we run a storage server (and publish it for others to use)? + provide_storage = (self.get_config("no_storage") is None) + if not provide_storage: + return + readonly_storage = (self.get_config("readonly_storage") is not None) + storedir = os.path.join(self.basedir, self.STOREDIR) sizelimit = None @@ -115,8 +129,21 @@ class Client(node.Node, Referenceable, testutil.PollMixin): "G": 1000 * 1000 * 1000, }[suffix] sizelimit = int(number) * multiplier - no_storage = self.get_config("debug_no_storage") is not None - self.add_service(StorageServer(storedir, sizelimit, no_storage, self.stats_provider)) + discard_storage = self.get_config("debug_discard_storage") is not None + ss = StorageServer(storedir, sizelimit, + discard_storage, readonly_storage, + self.stats_provider) + self.add_service(ss) + d = self.when_tub_ready() + # we can't do registerReference until the Tub is ready + def _publish(res): + furl_file = os.path.join(self.basedir, "private", "storage.furl") + furl = self.tub.registerReference(ss, furlFile=furl_file) + ri_name = RIStorageServer.__remote_name__ + self.introducer_client.publish(furl, "storage", ri_name) + d.addCallback(_publish) + d.addErrback(log.err, facility="tahoe.storage", level=log.BAD) + def init_options(self): self.push_to_ourselves = None @@ -148,20 +175,10 @@ class Client(node.Node, Referenceable, testutil.PollMixin): self.log("tub_ready") node.Node.tub_ready(self) - furl_file = os.path.join(self.basedir, "myself.furl") - self.my_furl = self.tub.registerReference(self, furlFile=furl_file) - - # should we publish ourselves as a server? - provide_storage = (self.get_config("no_storage") is None) - if provide_storage: - my_furl = self.my_furl - else: - my_furl = None - - ic = IntroducerClient(self.tub, self.introducer_furl, my_furl) - self.introducer_client = ic - ic.setServiceParent(self) - + # TODO: replace register_control() with an init_control() that + # internally uses self.when_tub_ready() to stall registerReference. + # Do the same for register_helper(). That will remove the need for + # this tub_ready() method. self.register_control() self.register_helper() @@ -185,43 +202,22 @@ class Client(node.Node, Referenceable, testutil.PollMixin): helper_furlfile = os.path.join(self.basedir, "private", "helper.furl") self.tub.registerReference(h, furlFile=helper_furlfile) - def remote_get_versions(self): - return str(allmydata.__version__), str(self.OLDEST_SUPPORTED_VERSION) - - def remote_get_service(self, name): - if name in ("storageserver",): - return self.getServiceNamed(name) - raise RuntimeError("I am unwilling to give you service %s" % name) - - def remote_get_nodeid(self): - return self.nodeid - def get_all_peerids(self): - if not self.introducer_client: - return [] return self.introducer_client.get_all_peerids() - def get_permuted_peers(self, key, include_myself=True): + def get_permuted_peers(self, service_name, key): """ - @return: list of (permuted-peerid, peerid, connection,) + @return: list of (peerid, connection,) """ - results = [] - for peerid, connection in self.introducer_client.get_all_peers(): - assert isinstance(peerid, str) - if not include_myself and peerid == self.nodeid: - self.log("get_permuted_peers: removing myself from the list") - continue - permuted = sha.new(key + peerid).digest() - results.append((permuted, peerid, connection)) - results.sort() - return results + assert isinstance(service_name, str) + assert isinstance(key, str) + return self.introducer_client.get_permuted_peers(service_name, key) def get_push_to_ourselves(self): return self.push_to_ourselves def get_encoding_parameters(self): - if not self.introducer_client: - return self.DEFAULT_ENCODING_PARAMETERS + return self.DEFAULT_ENCODING_PARAMETERS p = self.introducer_client.encoding_parameters # a tuple # TODO: make the 0.7.1 introducer publish a dict instead of a tuple params = {"k": p[0], diff --git a/src/allmydata/control.py b/src/allmydata/control.py index 2428948b..6e7fb910 100644 --- a/src/allmydata/control.py +++ b/src/allmydata/control.py @@ -69,7 +69,8 @@ class ControlServer(Referenceable, service.Service, testutil.PollMixin): # phase to take more than 10 seconds. Expect worst-case latency to be # 300ms. results = {} - everyone = list(self.parent.introducer_client.get_all_peers()) + conns = self.parent.introducer_client.get_all_connections_for("storage") + everyone = [(peerid,rref) for (peerid, service_name, rref) in conns] num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3))) everyone = everyone * num_pings d = self._do_one_ping(None, everyone, results) @@ -79,7 +80,7 @@ class ControlServer(Referenceable, service.Service, testutil.PollMixin): return results peerid, connection = everyone_left.pop(0) start = time.time() - d = connection.callRemote("get_nodeid") + d = connection.callRemote("get_versions") def _done(ignored): stop = time.time() elapsed = stop - start diff --git a/src/allmydata/download.py b/src/allmydata/download.py index 28810a5f..592ae42f 100644 --- a/src/allmydata/download.py +++ b/src/allmydata/download.py @@ -412,17 +412,14 @@ class FileDownloader: def _get_all_shareholders(self): dl = [] - for (permutedpeerid, peerid, connection) in self._client.get_permuted_peers(self._storage_index): - d = connection.callRemote("get_service", "storageserver") - d.addCallback(lambda ss: ss.callRemote("get_buckets", - self._storage_index)) - d.addCallbacks(self._got_response, self._got_error, - callbackArgs=(connection,)) + for (peerid,ss) in self._client.get_permuted_peers("storage", + self._storage_index): + d = ss.callRemote("get_buckets", self._storage_index) + d.addCallbacks(self._got_response, self._got_error) dl.append(d) return defer.DeferredList(dl) - def _got_response(self, buckets, connection): - _assert(isinstance(buckets, dict), buckets) # soon foolscap will check this for us with its DictOf schema constraint + def _got_response(self, buckets): for sharenum, bucket in buckets.iteritems(): b = storage.ReadBucketProxy(bucket) self.add_share_bucket(sharenum, b) diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 84a3d5e6..b9e50abc 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -22,10 +22,26 @@ URIExtensionData = StringConstraint(1000) LeaseRenewSecret = Hash # used to protect bucket lease renewal requests LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests - -class RIIntroducerClient(RemoteInterface): - def new_peers(furls=SetOf(FURL)): +# Announcements are (FURL, service_name, remoteinterface_name, +# nickname, my_version, oldest_supported) +# the (FURL, service_name, remoteinterface_name) refer to the service being +# announced. The (nickname, my_version, oldest_supported) refer to the +# client as a whole. The my_version/oldest_supported strings can be parsed +# by an allmydata.util.version.Version instance, and then compared. The +# first goal is to make sure that nodes are not confused by speaking to an +# incompatible peer. The second goal is to enable the development of +# backwards-compatibility code. + +Announcement = TupleOf(FURL, str, str, + str, str, str) + +class RIIntroducerSubscriberClient(RemoteInterface): + __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com" + + def announce(announcements=SetOf(Announcement)): + """I accept announcements from the publisher.""" return None + def set_encoding_parameters(parameters=(int, int, int)): """Advise the client of the recommended k-of-n encoding parameters for this grid. 'parameters' is a tuple of (k, desired, n), where 'n' @@ -43,28 +59,103 @@ class RIIntroducerClient(RemoteInterface): """ return None -class RIIntroducer(RemoteInterface): - def hello(node=RIIntroducerClient, furl=ChoiceOf(FURL, None)): +# When Foolscap can handle multiple interfaces (Foolscap#17), the +# full-powered introducer will implement both RIIntroducerPublisher and +# RIIntroducerSubscriberService. Until then, we define +# RIIntroducerPublisherAndSubscriberService as a combination of the two, and +# make everybody use that. + +class RIIntroducerPublisher(RemoteInterface): + """To publish a service to the world, connect to me and give me your + announcement message. I will deliver a copy to all connected subscribers.""" + __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com" + + def publish(announcement=Announcement): + # canary? return None -class RIClient(RemoteInterface): - def get_versions(): - """Return a tuple of (my_version, oldest_supported) strings. +class RIIntroducerSubscriberService(RemoteInterface): + __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com" - Each string can be parsed by an allmydata.util.version.Version - instance, and then compared. The first goal is to make sure that - nodes are not confused by speaking to an incompatible peer. The - second goal is to enable the development of backwards-compatibility - code. + def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str): + """Give me a subscriber reference, and I will call its new_peers() + method will any announcements that match the desired service name. I + will ignore duplicate subscriptions. + """ + return None - This method is likely to change in incompatible ways until we get the - whole compatibility scheme nailed down. +class RIIntroducerPublisherAndSubscriberService(RemoteInterface): + __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com" + def publish(announcement=Announcement): + return None + def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str): + return None + +class IIntroducerClient(Interface): + """I provide service introduction facilities for a node. I help nodes + publish their services to the rest of the world, and I help them learn + about services available on other nodes.""" + + def publish(furl, service_name, remoteinterface_name): + """Once you call this, I will tell the world that the Referenceable + available at FURL is available to provide a service named + SERVICE_NAME. The precise definition of the service being provided is + identified by the Foolscap 'remote interface name' in the last + parameter: this is supposed to be a globally-unique string that + identifies the RemoteInterface that is implemented.""" + + def subscribe_to(service_name): + """Call this if you will eventually want to use services with the + given SERVICE_NAME. This will prompt me to subscribe to announcements + of those services. You can pick up the announcements later by calling + get_all_connections_for() or get_permuted_peers(). """ - return TupleOf(str, str) - def get_service(name=str): - return Referenceable - def get_nodeid(): - return Nodeid + + def get_all_connections(): + """Return a frozenset of (nodeid, service_name, rref) tuples, one for + each active connection we've established to a remote service. This is + mostly useful for unit tests that need to wait until a certain number + of connections have been made.""" + + def get_all_connectors(): + """Return a dict that maps from (nodeid, service_name) to a + RemoteServiceConnector instance for all services that we are actively + trying to connect to. Each RemoteServiceConnector has the following + public attributes:: + + announcement_time: when we first heard about this service + last_connect_time: when we last established a connection + last_loss_time: when we last lost a connection + + version: the peer's version, from the most recent connection + oldest_supported: the peer's oldest supported version, same + + rref: the RemoteReference, if connected, otherwise None + remote_host: the IAddress, if connected, otherwise None + + This method is intended for monitoring interfaces, such as a web page + which describes connecting and connected peers. + """ + + def get_all_peerids(): + """Return a frozenset of all peerids to whom we have a connection (to + one or more services) established. Mostly useful for unit tests.""" + + def get_all_connections_for(service_name): + """Return a frozenset of (nodeid, service_name, rref) tuples, one + for each active connection that provides the given SERVICE_NAME.""" + + def get_permuted_peers(service_name, key): + """Returns an ordered list of (peerid, rref) tuples, selecting from + the connections that provide SERVICE_NAME, using a hash-based + permutation keyed by KEY. This randomizes the service list in a + repeatable way, to distribute load over many peers. + """ + + def connected_to_introducer(): + """Returns a boolean, True if we are currently connected to the + introducer, False if not.""" + class RIBucketWriter(RemoteInterface): def write(offset=int, data=ShareData): @@ -103,6 +194,21 @@ ReadData = ListOf(ShareData) # returns data[offset:offset+length] for each element of TestVector class RIStorageServer(RemoteInterface): + __remote_name__ = "RIStorageServer.tahoe.allmydata.com" + + def get_versions(): + """Return a tuple of (my_version, oldest_supported) strings. + Each string can be parsed by an allmydata.util.version.Version + instance, and then compared. The first goal is to make sure that + nodes are not confused by speaking to an incompatible peer. The + second goal is to enable the development of backwards-compatibility + code. + + This method is likely to change in incompatible ways until we get the + whole compatibility scheme nailed down. + """ + return TupleOf(str, str) + def allocate_buckets(storage_index=StorageIndex, renew_secret=LeaseRenewSecret, cancel_secret=LeaseCancelSecret, diff --git a/src/allmydata/introducer.py b/src/allmydata/introducer.py index c8e08d25..33319864 100644 --- a/src/allmydata/introducer.py +++ b/src/allmydata/introducer.py @@ -1,14 +1,14 @@ -import re -from base64 import b32encode, b32decode +import re, time, sha +from base64 import b32decode from zope.interface import implements from twisted.application import service from twisted.internet import defer -from twisted.python import log from foolscap import Referenceable from allmydata import node -from allmydata.interfaces import RIIntroducer, RIIntroducerClient -from allmydata.util import observer +from allmydata.interfaces import RIIntroducerPublisherAndSubscriberService, \ + RIIntroducerSubscriberClient, IIntroducerClient +from allmydata.util import observer, log, idlib class IntroducerNode(node.Node): PORTNUMFILE = "introducer.port" @@ -29,49 +29,65 @@ class IntroducerNode(node.Node): self.write_config("introducer.furl", self.introducer_url + "\n") class IntroducerService(service.MultiService, Referenceable): - implements(RIIntroducer) + implements(RIIntroducerPublisherAndSubscriberService) name = "introducer" def __init__(self, basedir=".", encoding_parameters=None): service.MultiService.__init__(self) self.introducer_url = None - self.nodes = set() - self.furls = set() + self._announcements = set() + self._subscribers = {} self._encoding_parameters = encoding_parameters - def remote_hello(self, node, furl): - log.msg("introducer: new contact at %s, node is %s" % (furl, node)) + def log(self, *args, **kwargs): + if "facility" not in kwargs: + kwargs["facility"] = "tahoe.introducer" + return log.msg(*args, **kwargs) + + def remote_publish(self, announcement): + self.log("introducer: announcement published: %s" % (announcement,) ) + (furl, service_name, ri_name, nickname, ver, oldest) = announcement + if announcement in self._announcements: + self.log("but we already knew it, ignoring", level=log.NOISY) + return + self._announcements.add(announcement) + for s in self._subscribers.get(service_name, []): + s.callRemote("announce", set([announcement])) + + def remote_subscribe(self, subscriber, service_name): + self.log("introducer: subscription[%s] request at %s" % (service_name, + subscriber)) + if service_name not in self._subscribers: + self._subscribers[service_name] = set() + subscribers = self._subscribers[service_name] + if subscriber in subscribers: + self.log("but they're already subscribed, ignoring", + level=log.UNUSUAL) + return + subscribers.add(subscriber) def _remove(): - log.msg(" introducer: removing %s %s" % (node, furl)) - self.nodes.remove(node) - if furl is not None: - self.furls.remove(furl) - node.notifyOnDisconnect(_remove) - if furl is not None: - self.furls.add(furl) - for othernode in self.nodes: - othernode.callRemote("new_peers", set([furl])) - node.callRemote("new_peers", self.furls) - if self._encoding_parameters is not None: - node.callRemote("set_encoding_parameters", - self._encoding_parameters) - self.nodes.add(node) + self.log("introducer: unsubscribing[%s] %s" % (service_name, + subscriber)) + subscribers.remove(subscriber) + subscriber.notifyOnDisconnect(_remove) -class IntroducerClient(service.Service, Referenceable): - implements(RIIntroducerClient) + announcements = set( [ a + for a in self._announcements + if a[1] == service_name ] ) + d = subscriber.callRemote("announce", announcements) + d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL) - def __init__(self, tub, introducer_furl, my_furl): - self.tub = tub - self.introducer_furl = introducer_furl - self.my_furl = my_furl + def UNKNOWN(): # TODO + if self._encoding_parameters is not None: + node.callRemote("set_encoding_parameters", + self._encoding_parameters) - self.connections = {} # k: nodeid, v: ref - self.reconnectors = {} # k: FURL, v: reconnector - self._connected = False - self.connection_observers = observer.ObserverList() - self.encoding_parameters = None +class PeerCountObserver: + # This is used by unit test code to wait until peer connections have been + # established. + def __init__(self): # The N'th element of _observers_of_enough_peers is None if nobody has # asked to be informed when N peers become connected, it is a # OneShotObserverList if someone has asked to be informed, and that list @@ -92,33 +108,7 @@ class IntroducerClient(service.Service, Referenceable): # interested in (i.e., there are never trailing Nones in # _observers_of_fewer_than_peers). self._observers_of_fewer_than_peers = [] - - def startService(self): - service.Service.startService(self) - self.introducer_reconnector = self.tub.connectTo(self.introducer_furl, - self._got_introducer) - def connect_failed(failure): - self.log("\n\nInitial Introducer connection failed: " - "perhaps it's down\n") - self.log(str(failure)) - d = self.tub.getReference(self.introducer_furl) - d.addErrback(connect_failed) - - def log(self, msg): - self.parent.log(msg) - - def remote_new_peers(self, furls): - for furl in furls: - self._new_peer(furl) - - def remote_set_encoding_parameters(self, parameters): - self.encoding_parameters = parameters - - def stopService(self): - service.Service.stopService(self) - self.introducer_reconnector.stopConnecting() - for reconnector in self.reconnectors.itervalues(): - reconnector.stopConnecting() + self.connection_observers = observer.ObserverList() def _notify_observers_of_enough_peers(self, numpeers): if len(self._observers_of_enough_peers) > numpeers: @@ -141,72 +131,6 @@ class IntroducerClient(service.Service, Referenceable): while len(self._observers_of_fewer_than_peers) > numpeers and (not self._observers_of_fewer_than_peers[-1]): self._observers_of_fewer_than_peers.pop() - def _new_peer(self, furl): - if furl in self.reconnectors: - return - # TODO: rather than using the TubID as a nodeid, we should use - # something else. The thing that requires the least additional - # mappings is to use the foolscap "identifier" (the last component of - # the furl), since these are unguessable. Before we can do that, - # though, we need a way to conveniently make these identifiers - # persist from one run of the client program to the next. Also, using - # the foolscap identifier would mean that anyone who knows the name - # of the node also has all the secrets they need to contact and use - # them, which may or may not be what we want. - m = re.match(r'pb://(\w+)@', furl) - assert m - nodeid = b32decode(m.group(1).upper()) - def _got_peer(rref): - self.log("connected to %s" % b32encode(nodeid).lower()[:8]) - self.connection_observers.notify(nodeid, rref) - self.connections[nodeid] = rref - self._notify_observers_of_enough_peers(len(self.connections)) - self._notify_observers_of_fewer_than_peers(len(self.connections)) - def _lost(): - # TODO: notifyOnDisconnect uses eventually(), but connects do - # not. Could this cause a problem? - - # We know that this observer list must have been fired, since we - # had enough peers before this one was lost. - self._remove_observers_of_enough_peers(len(self.connections)) - self._notify_observers_of_fewer_than_peers(len(self.connections)+1) - - del self.connections[nodeid] - - rref.notifyOnDisconnect(_lost) - self.log("connecting to %s" % b32encode(nodeid).lower()[:8]) - self.reconnectors[furl] = self.tub.connectTo(furl, _got_peer) - - def _got_introducer(self, introducer): - if self.my_furl: - my_furl_s = self.my_furl[6:13] - else: - my_furl_s = "" - self.log("introducing ourselves: %s, %s" % (self, my_furl_s)) - self._connected = True - d = introducer.callRemote("hello", - node=self, - furl=self.my_furl) - introducer.notifyOnDisconnect(self._disconnected) - - def _disconnected(self): - self.log("bummer, we've lost our connection to the introducer") - self._connected = False - - def notify_on_new_connection(self, cb): - """Register a callback that will be fired (with nodeid, rref) when - a new connection is established.""" - self.connection_observers.subscribe(cb) - - def connected_to_introducer(self): - return self._connected - - def get_all_peerids(self): - return self.connections.iterkeys() - - def get_all_peers(self): - return self.connections.iteritems() - def when_enough_peers(self, numpeers): """ I return a deferred that fires the next time that at least @@ -233,3 +157,297 @@ class IntroducerClient(service.Service, Referenceable): if not self._observers_of_fewer_than_peers[numpeers]: self._observers_of_fewer_than_peers[numpeers] = observer.OneShotObserverList() return self._observers_of_fewer_than_peers[numpeers].when_fired() + + def notify_on_new_connection(self, cb): + """Register a callback that will be fired (with nodeid, rref) when + a new connection is established.""" + self.connection_observers.subscribe(cb) + + def add_peer(self, ann): + self._notify_observers_of_enough_peers(len(self.connections)) + self._notify_observers_of_fewer_than_peers(len(self.connections)) + + def remove_peer(self, ann): + self._remove_observers_of_enough_peers(len(self.connections)) + self._notify_observers_of_fewer_than_peers(len(self.connections)+1) + + + +class RemoteServiceConnector: + """I hold information about a peer service that we want to connect to. If + we are connected, I hold the RemoteReference, the peer's address, and the + peer's version information. I remember information about when we were + last connected to the peer too, even if we aren't currently connected. + + @ivar announcement_time: when we first heard about this service + @ivar last_connect_time: when we last established a connection + @ivar last_loss_time: when we last lost a connection + + @ivar version: the peer's version, from the most recent connection + @ivar oldest_supported: the peer's oldest supported version, same + + @ivar rref: the RemoteReference, if connected, otherwise None + @ivar remote_host: the IAddress, if connected, otherwise None + """ + + def __init__(self, announcement, tub, ic): + self._tub = tub + self._announcement = announcement + self._ic = ic + (furl, service_name, ri_name, nickname, ver, oldest) = announcement + + self._furl = furl + m = re.match(r'pb://(\w+)@', furl) + assert m + self._nodeid = b32decode(m.group(1).upper()) + self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid) + + self._index = (self._nodeid, service_name) + self._service_name = service_name + + self.log("attempting to connect to %s" % self._nodeid_s) + self.announcement_time = time.time() + self.last_loss_time = None + self.rref = None + self.remote_host = None + self.last_connect_time = None + self.version = None + self.oldest_supported = None + + def log(self, *args, **kwargs): + return self._ic.log(*args, **kwargs) + + def get_index(self): + return self._index + + def startConnecting(self): + self._reconnector = self._tub.connectTo(self._furl, self._got_service) + + def stopConnecting(self): + self._reconnector.stopConnecting() + + def _got_service(self, rref): + self.last_connect_time = time.time() + self.remote_host = str(rref.tracker.broker.transport.getPeer()) + + self.rref = rref + self.log("connected to %s" % self._nodeid_s) + + self._ic.add_connection(self._nodeid, self._service_name, rref) + + rref.notifyOnDisconnect(self._lost, rref) + + def _lost(self, rref): + self.log("lost connection to %s" % self._nodeid_s) + self.last_loss_time = time.time() + self.rref = None + self.remote_host = None + self._ic.remove_connection(self._nodeid, self._service_name, rref) + + + +class IntroducerClient(service.Service, Referenceable): + implements(RIIntroducerSubscriberClient, IIntroducerClient) + + def __init__(self, tub, introducer_furl, + nickname, my_version, oldest_supported): + self._tub = tub + self.introducer_furl = introducer_furl + + self._nickname = nickname + self._my_version = my_version + self._oldest_supported = oldest_supported + + self._published_announcements = set() + + self._publisher = None + self._connected = False + + self._subscribed_service_names = set() + self._subscriptions = set() # requests we've actually sent + self._received_announcements = set() + # TODO: this set will grow without bound, until the node is restarted + + # we only accept one announcement per (peerid+service_name) pair. + # This insures that an upgraded host replace their previous + # announcement. It also means that each peer must have their own Tub + # (no sharing), which is slightly weird but consistent with the rest + # of the Tahoe codebase. + self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector + # self._connections is a set of (peerid, service_name, rref) tuples + self._connections = set() + + #self.counter = PeerCountObserver() + self.counter = 0 # incremented each time we change state, for tests + self.encoding_parameters = None + + def startService(self): + service.Service.startService(self) + rc = self._tub.connectTo(self.introducer_furl, self._got_introducer) + self._introducer_reconnector = rc + def connect_failed(failure): + self.log("Initial Introducer connection failed: perhaps it's down", + level=log.WEIRD, failure=failure) + d = self._tub.getReference(self.introducer_furl) + d.addErrback(connect_failed) + + def _got_introducer(self, publisher): + self.log("connected to introducer") + self._connected = True + self._publisher = publisher + publisher.notifyOnDisconnect(self._disconnected) + self._maybe_publish() + self._maybe_subscribe() + + def _disconnected(self): + self.log("bummer, we've lost our connection to the introducer") + self._connected = False + self._publisher = None + self._subscriptions.clear() + + def stopService(self): + service.Service.stopService(self) + self._introducer_reconnector.stopConnecting() + for rsc in self._connectors.itervalues(): + rsc.stopConnecting() + + def log(self, *args, **kwargs): + if "facility" not in kwargs: + kwargs["facility"] = "tahoe.introducer" + return log.msg(*args, **kwargs) + + + def publish(self, furl, service_name, remoteinterface_name): + ann = (furl, service_name, remoteinterface_name, + self._nickname, self._my_version, self._oldest_supported) + self._published_announcements.add(ann) + self._maybe_publish() + + def subscribe_to(self, service_name): + self._subscribed_service_names.add(service_name) + self._maybe_subscribe() + + def _maybe_subscribe(self): + if not self._publisher: + self.log("want to subscribe, but no introducer yet", + level=log.NOISY) + return + for service_name in self._subscribed_service_names: + if service_name not in self._subscriptions: + # there is a race here, but the subscription desk ignores + # duplicate requests. + self._subscriptions.add(service_name) + d = self._publisher.callRemote("subscribe", self, service_name) + d.addErrback(log.err, facility="tahoe.introducer", + level=log.WEIRD) + + def _maybe_publish(self): + if not self._publisher: + self.log("want to publish, but no introducer yet", level=log.NOISY) + return + # this re-publishes everything. The Introducer ignores duplicates + for ann in self._published_announcements: + d = self._publisher.callRemote("publish", ann) + d.addErrback(log.err, facility="tahoe.introducer", + level=log.WEIRD) + + + + def remote_announce(self, announcements): + for ann in announcements: + self.log("received %d announcements" % len(announcements)) + (furl, service_name, ri_name, nickname, ver, oldest) = ann + if service_name not in self._subscribed_service_names: + self.log("announcement for a service we don't care about [%s]" + % (service_name,), level=log.WEIRD) + continue + if ann in self._received_announcements: + self.log("ignoring old announcement: %s" % (ann,), + level=log.NOISY) + continue + self.log("new announcement[%s]: %s" % (service_name, ann)) + self._received_announcements.add(ann) + self._new_announcement(ann) + + def _new_announcement(self, announcement): + # this will only be called for new announcements + rsc = RemoteServiceConnector(announcement, self._tub, self) + index = rsc.get_index() + if index in self._connectors: + self._connectors[index].stopConnecting() + self._connectors[index] = rsc + rsc.startConnecting() + + def add_connection(self, nodeid, service_name, rref): + self._connections.add( (nodeid, service_name, rref) ) + self.counter += 1 + + def remove_connection(self, nodeid, service_name, rref): + self._connections.discard( (nodeid, service_name, rref) ) + self.counter += 1 + + + def get_all_connections(self): + return frozenset(self._connections) + + def get_all_connectors(self): + return self._connectors.copy() + + def get_all_peerids(self): + return frozenset([peerid + for (peerid, service_name, rref) + in self._connections]) + + def get_all_connections_for(self, service_name): + return frozenset([c + for c in self._connections + if c[1] == service_name]) + + def get_permuted_peers(self, service_name, key): + """Return an ordered list of (peerid, rref) tuples.""" + # TODO: flags like add-myself-at-beginning and remove-myself? maybe + # not. + + results = [] + for (c_peerid, c_service_name, rref) in self._connections: + assert isinstance(c_peerid, str) + if c_service_name != service_name: + continue + #if not include_myself and peerid == self.nodeid: + # self.log("get_permuted_peers: removing myself from the list") + # continue + permuted = sha.new(key + c_peerid).digest() + results.append((permuted, c_peerid, rref)) + + results.sort(lambda a,b: cmp(a[0], b[0])) + return [ (r[1], r[2]) for r in results ] + + def _TODO__add_ourselves(self, partial_peerlist, peerlist): + # moved here from mutable.Publish + my_peerid = self._node._client.nodeid + for (permutedid, peerid, conn) in partial_peerlist: + if peerid == my_peerid: + # we're already in there + return partial_peerlist + for (permutedid, peerid, conn) in peerlist: + if peerid == self._node._client.nodeid: + # found it + partial_peerlist.append( (permutedid, peerid, conn) ) + return partial_peerlist + self.log("we aren't in our own peerlist??", level=log.WEIRD) + return partial_peerlist + + + + def remote_set_encoding_parameters(self, parameters): + self.encoding_parameters = parameters + + def connected_to_introducer(self): + return self._connected + + def debug_disconnect_from_peerid(self, victim_nodeid): + # for unit tests: locate and sever all connections to the given + # peerid. + for (nodeid, service_name, rref) in self._connections: + if nodeid == victim_nodeid: + rref.tracker.broker.transport.loseConnection() diff --git a/src/allmydata/mutable.py b/src/allmydata/mutable.py index b5ee5a34..f58f54f5 100644 --- a/src/allmydata/mutable.py +++ b/src/allmydata/mutable.py @@ -301,16 +301,17 @@ class Retrieve: def _choose_initial_peers(self, numqueries): n = self._node - full_peerlist = n._client.get_permuted_peers(self._storage_index, - include_myself=True) + full_peerlist = n._client.get_permuted_peers("storage", + self._storage_index) + # TODO: include_myself=True + # _peerlist is a list of (peerid,conn) tuples for peers that are # worth talking too. This starts with the first numqueries in the # permuted list. If that's not enough to get us a recoverable # version, we expand this to include the first 2*total_shares peerids # (assuming we learn what total_shares is from one of the first # numqueries peers) - self._peerlist = [(p[1],p[2]) - for p in islice(full_peerlist, numqueries)] + self._peerlist = [p for p in islice(full_peerlist, numqueries)] # _peerlist_limit is the query limit we used to build this list. If # we later increase this limit, it may be useful to re-scan the # permuted list. @@ -323,33 +324,20 @@ class Retrieve: self._queries_outstanding = set() self._used_peers = set() self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..] - self._peer_storage_servers = {} dl = [] - for (peerid, conn) in peerlist: + for (peerid, ss) in peerlist: self._queries_outstanding.add(peerid) - self._do_query(conn, peerid, self._storage_index, self._read_size, - self._peer_storage_servers) + self._do_query(ss, peerid, self._storage_index, self._read_size) # control flow beyond this point: state machine. Receiving responses # from queries is the input. We might send out more queries, or we # might produce a result. return None - def _do_query(self, conn, peerid, storage_index, readsize, - peer_storage_servers): + def _do_query(self, ss, peerid, storage_index, readsize): self._queries_outstanding.add(peerid) - if peerid in peer_storage_servers: - d = defer.succeed(peer_storage_servers[peerid]) - else: - d = conn.callRemote("get_service", "storageserver") - def _got_storageserver(ss): - peer_storage_servers[peerid] = ss - return ss - d.addCallback(_got_storageserver) - d.addCallback(lambda ss: ss.callRemote("slot_readv", storage_index, - [], [(0, readsize)])) - d.addCallback(self._got_results, peerid, readsize, - (conn, storage_index, peer_storage_servers)) + d = ss.callRemote("slot_readv", storage_index, [], [(0, readsize)]) + d.addCallback(self._got_results, peerid, readsize, (ss, storage_index)) d.addErrback(self._query_failed, peerid) # errors that aren't handled by _query_failed (and errors caused by # _query_failed) get logged, but we still want to check for doneness. @@ -377,9 +365,8 @@ class Retrieve: # TODO: for MDMF, sanity-check self._read_size: don't let one # server cause us to try to read gigabytes of data from all # other servers. - (conn, storage_index, peer_storage_servers) = stuff - self._do_query(conn, peerid, storage_index, self._read_size, - peer_storage_servers) + (ss, storage_index) = stuff + self._do_query(ss, peerid, storage_index, self._read_size) return except CorruptShareError, e: # log it and give the other shares a chance to be processed @@ -514,19 +501,19 @@ class Retrieve: self.log("search_distance=%d" % search_distance, level=log.UNUSUAL) if self._peerlist_limit < search_distance: # we might be able to get some more peers from the list - peers = self._node._client.get_permuted_peers(self._storage_index, - include_myself=True) - self._peerlist = [(p[1],p[2]) - for p in islice(peers, search_distance)] + peers = self._node._client.get_permuted_peers("storage", + self._storage_index) + # TODO: include_myself=True + self._peerlist = [p for p in islice(peers, search_distance)] self._peerlist_limit = search_distance self.log("added peers, peerlist=%d, peerlist_limit=%d" % (len(self._peerlist), self._peerlist_limit), level=log.UNUSUAL) # are there any peers on the list that we haven't used? new_query_peers = [] - for (peerid, conn) in self._peerlist: + for (peerid, ss) in self._peerlist: if peerid not in self._used_peers: - new_query_peers.append( (peerid, conn) ) + new_query_peers.append( (peerid, ss) ) if len(new_query_peers) > 5: # only query in batches of 5. TODO: this is pretty # arbitrary, really I want this to be something like @@ -535,10 +522,8 @@ class Retrieve: if new_query_peers: self.log("sending %d new queries (read %d bytes)" % (len(new_query_peers), self._read_size), level=log.UNUSUAL) - for (peerid, conn) in new_query_peers: - self._do_query(conn, peerid, - self._storage_index, self._read_size, - self._peer_storage_servers) + for (peerid, ss) in new_query_peers: + self._do_query(ss, peerid, self._storage_index, self._read_size) # we'll retrigger when those queries come back return @@ -803,26 +788,27 @@ class Publish: # the share we use for ourselves didn't count against the N total.. # maybe use N+1 if we find ourselves in the permuted list? - peerlist = self._node._client.get_permuted_peers(storage_index, - include_myself=True) + peerlist = self._node._client.get_permuted_peers("storage", + storage_index) + # make sure our local server is in the list + # TODO: include_myself_at_beginning=True current_share_peers = DictOfSets() reachable_peers = {} - # list of (peerid, offset, length) where the encprivkey might be found + # list of (peerid, shnum, offset, length) where the encprivkey might + # be found self._encprivkey_shares = [] EPSILON = total_shares / 2 #partial_peerlist = islice(peerlist, total_shares + EPSILON) partial_peerlist = peerlist[:total_shares+EPSILON] - # make sure our local server is in the list - partial_peerlist = self._add_ourselves(partial_peerlist, peerlist) + self._storage_servers = {} - peer_storage_servers = {} dl = [] - for (permutedid, peerid, conn) in partial_peerlist: - d = self._do_query(conn, peerid, peer_storage_servers, - storage_index) + for permutedid, (peerid, ss) in enumerate(partial_peerlist): + self._storage_servers[peerid] = ss + d = self._do_query(ss, peerid, storage_index) d.addCallback(self._got_query_results, peerid, permutedid, reachable_peers, current_share_peers) @@ -830,7 +816,7 @@ class Publish: d = defer.DeferredList(dl) d.addCallback(self._got_all_query_results, total_shares, reachable_peers, - current_share_peers, peer_storage_servers) + current_share_peers) # TODO: add an errback to, probably to ignore that peer # TODO: if we can't get a privkey from these servers, consider # looking farther afield. Make sure we include ourselves in the @@ -839,28 +825,10 @@ class Publish: # but ourselves. return d - def _add_ourselves(self, partial_peerlist, peerlist): - my_peerid = self._node._client.nodeid - for (permutedid, peerid, conn) in partial_peerlist: - if peerid == my_peerid: - # we're already in there - return partial_peerlist - for (permutedid, peerid, conn) in peerlist: - if peerid == self._node._client.nodeid: - # found it - partial_peerlist.append( (permutedid, peerid, conn) ) - return partial_peerlist - self.log("we aren't in our own peerlist??", level=log.WEIRD) - return partial_peerlist - - def _do_query(self, conn, peerid, peer_storage_servers, storage_index): + def _do_query(self, ss, peerid, storage_index): self.log("querying %s" % idlib.shortnodeid_b2a(peerid)) - d = conn.callRemote("get_service", "storageserver") - def _got_storageserver(ss): - peer_storage_servers[peerid] = ss - return ss.callRemote("slot_readv", - storage_index, [], [(0, self._read_size)]) - d.addCallback(_got_storageserver) + d = ss.callRemote("slot_readv", + storage_index, [], [(0, self._read_size)]) return d def _got_query_results(self, datavs, peerid, permutedid, @@ -927,7 +895,7 @@ class Publish: # files (since the privkey will be small enough to fit in the # write cap). - self._encprivkey_shares.append( (peerid, shnum, offset, length) ) + self._encprivkey_shares.append( (peerid, shnum, offset, length)) return (seqnum, root_hash, IV, k, N, segsize, datalen, @@ -954,7 +922,7 @@ class Publish: def _got_all_query_results(self, res, total_shares, reachable_peers, - current_share_peers, peer_storage_servers): + current_share_peers): self.log("_got_all_query_results") # now that we know everything about the shares currently out there, # decide where to place the new shares. @@ -1019,7 +987,7 @@ class Publish: assert not shares_needing_homes - target_info = (target_map, shares_per_peer, peer_storage_servers) + target_info = (target_map, shares_per_peer) return target_info def _obtain_privkey(self, target_info): @@ -1032,16 +1000,16 @@ class Publish: # peers one at a time until we get a copy. Only bother asking peers # who've admitted to holding a share. - target_map, shares_per_peer, peer_storage_servers = target_info + target_map, shares_per_peer = target_info # pull shares from self._encprivkey_shares if not self._encprivkey_shares: raise NotEnoughPeersError("Unable to find a copy of the privkey") (peerid, shnum, offset, length) = self._encprivkey_shares.pop(0) + ss = self._storage_servers[peerid] self.log("trying to obtain privkey from %s shnum %d" % (idlib.shortnodeid_b2a(peerid), shnum)) - d = self._do_privkey_query(peer_storage_servers[peerid], peerid, - shnum, offset, length) + d = self._do_privkey_query(ss, peerid, shnum, offset, length) d.addErrback(self.log_err) d.addCallback(lambda res: self._obtain_privkey(target_info)) return d @@ -1174,7 +1142,7 @@ class Publish: # surprises here are *not* indications of UncoordinatedWriteError, # and we'll need to respond to them more gracefully.) - target_map, shares_per_peer, peer_storage_servers = target_info + target_map, shares_per_peer = target_info my_checkstring = pack_checkstring(seqnum, root_hash, IV) peer_messages = {} @@ -1206,7 +1174,7 @@ class Publish: cancel_secret = self._node.get_cancel_secret(peerid) secrets = (write_enabler, renew_secret, cancel_secret) - d = self._do_testreadwrite(peerid, peer_storage_servers, secrets, + d = self._do_testreadwrite(peerid, secrets, tw_vectors, read_vector) d.addCallback(self._got_write_answer, tw_vectors, my_checkstring, peerid, expected_old_shares[peerid], dispatch_map) @@ -1216,16 +1184,16 @@ class Publish: d.addCallback(lambda res: (self._surprised, dispatch_map)) return d - def _do_testreadwrite(self, peerid, peer_storage_servers, secrets, + def _do_testreadwrite(self, peerid, secrets, tw_vectors, read_vector): - conn = peer_storage_servers[peerid] storage_index = self._node._uri.storage_index + ss = self._storage_servers[peerid] - d = conn.callRemote("slot_testv_and_readv_and_writev", - storage_index, - secrets, - tw_vectors, - read_vector) + d = ss.callRemote("slot_testv_and_readv_and_writev", + storage_index, + secrets, + tw_vectors, + read_vector) return d def _got_write_answer(self, answer, tw_vectors, my_checkstring, diff --git a/src/allmydata/offloaded.py b/src/allmydata/offloaded.py index 6e61087c..4da21e2c 100644 --- a/src/allmydata/offloaded.py +++ b/src/allmydata/offloaded.py @@ -50,10 +50,8 @@ class CHKCheckerAndUEBFetcher: def _get_all_shareholders(self, storage_index): dl = [] - for (pmpeerid, peerid, connection) in self._peer_getter(storage_index): - d = connection.callRemote("get_service", "storageserver") - d.addCallback(lambda ss: ss.callRemote("get_buckets", - storage_index)) + for (peerid, ss) in self._peer_getter("storage", storage_index): + d = ss.callRemote("get_buckets", storage_index) d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peerid,)) dl.append(d) diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py index 49fa7eae..c50aa58a 100644 --- a/src/allmydata/storage.py +++ b/src/allmydata/storage.py @@ -11,6 +11,7 @@ from allmydata.interfaces import RIStorageServer, RIBucketWriter, \ BadWriteEnablerError, IStatsProducer from allmydata.util import fileutil, idlib, mathutil, log from allmydata.util.assertutil import precondition, _assert +import allmydata # for __version__ class DataTooLargeError(Exception): pass @@ -669,14 +670,20 @@ class StorageServer(service.MultiService, Referenceable): implements(RIStorageServer, IStatsProducer) name = 'storageserver' - def __init__(self, storedir, sizelimit=None, no_storage=False, stats_provider=None): + # we're pretty narrow-minded right now + OLDEST_SUPPORTED_VERSION = allmydata.__version__ + + def __init__(self, storedir, sizelimit=None, + discard_storage=False, readonly_storage=False, + stats_provider=None): service.MultiService.__init__(self) self.storedir = storedir sharedir = os.path.join(storedir, "shares") fileutil.make_dirs(sharedir) self.sharedir = sharedir self.sizelimit = sizelimit - self.no_storage = no_storage + self.no_storage = discard_storage + self.readonly_storage = readonly_storage self.stats_provider = stats_provider if self.stats_provider: self.stats_provider.register_producer(self) @@ -684,12 +691,17 @@ class StorageServer(service.MultiService, Referenceable): self._clean_incomplete() fileutil.make_dirs(self.incomingdir) self._active_writers = weakref.WeakKeyDictionary() + lp = log.msg("StorageServer created, now measuring space..", + facility="tahoe.storage") self.measure_size() + log.msg(format="space measurement done, consumed=%(consumed)d bytes", + consumed=self.consumed, + parent=lp, facility="tahoe.storage") def log(self, *args, **kwargs): - if self.parent: - return self.parent.log(*args, **kwargs) - return + if "facility" not in kwargs: + kwargs["facility"] = "tahoe.storage" + return log.msg(*args, **kwargs) def setNodeID(self, nodeid): # somebody must set this before any slots can be created or leases @@ -720,6 +732,9 @@ class StorageServer(service.MultiService, Referenceable): space += bw.allocated_size() return space + def remote_get_versions(self): + return (str(allmydata.__version__), str(self.OLDEST_SUPPORTED_VERSION)) + def remote_allocate_buckets(self, storage_index, renew_secret, cancel_secret, sharenums, allocated_size, @@ -754,6 +769,10 @@ class StorageServer(service.MultiService, Referenceable): sf = ShareFile(fn) sf.add_or_renew_lease(lease_info) + if self.readonly_storage: + # we won't accept new shares + return alreadygot, bucketwriters + for shnum in sharenums: incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum) finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum) diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py index 764d25bf..2cab1d49 100644 --- a/src/allmydata/test/test_client.py +++ b/src/allmydata/test/test_client.py @@ -12,10 +12,12 @@ from foolscap.eventual import flushEventualQueue class FakeIntroducerClient(introducer.IntroducerClient): def __init__(self): - self.connections = {} - -def permute(c, key): - return [ y for x, y, z in c.get_permuted_peers(key) ] + self._connections = set() + def add_peer(self, nodeid): + entry = (nodeid, "storage", "rref") + self._connections.add(entry) + def remove_all_peers(self): + self._connections.clear() class Basic(unittest.TestCase): def test_loadable(self): @@ -94,6 +96,10 @@ class Basic(unittest.TestCase): self.failUnlessEqual(c.getServiceNamed("storageserver").sizelimit, None) + def _permute(self, c, key): + return [ peerid + for (peerid,rref) in c.get_permuted_peers("storage", key) ] + def test_permute(self): basedir = "test_client.Basic.test_permute" os.mkdir(basedir) @@ -102,17 +108,18 @@ class Basic(unittest.TestCase): c = client.Client(basedir) c.introducer_client = FakeIntroducerClient() for k in ["%d" % i for i in range(5)]: - c.introducer_client.connections[k] = None - self.failUnlessEqual(permute(c, "one"), ['3','1','0','4','2']) - self.failUnlessEqual(permute(c, "two"), ['0','4','2','1','3']) - c.introducer_client.connections.clear() - self.failUnlessEqual(permute(c, "one"), []) + c.introducer_client.add_peer(k) + + self.failUnlessEqual(self._permute(c, "one"), ['3','1','0','4','2']) + self.failUnlessEqual(self._permute(c, "two"), ['0','4','2','1','3']) + c.introducer_client.remove_all_peers() + self.failUnlessEqual(self._permute(c, "one"), []) c2 = client.Client(basedir) c2.introducer_client = FakeIntroducerClient() for k in ["%d" % i for i in range(5)]: - c2.introducer_client.connections[k] = None - self.failUnlessEqual(permute(c2, "one"), ['3','1','0','4','2']) + c2.introducer_client.add_peer(k) + self.failUnlessEqual(self._permute(c2, "one"), ['3','1','0','4','2']) def test_versions(self): basedir = "test_client.Basic.test_versions" @@ -120,7 +127,8 @@ class Basic(unittest.TestCase): open(os.path.join(basedir, "introducer.furl"), "w").write("") open(os.path.join(basedir, "vdrive.furl"), "w").write("") c = client.Client(basedir) - mine, oldest = c.remote_get_versions() + ss = c.getServiceNamed("storageserver") + mine, oldest = ss.remote_get_versions() self.failUnlessEqual(mine, str(allmydata.__version__)) self.failIfEqual(str(allmydata.__version__), "unknown") self.failUnless("." in str(allmydata.__version__), diff --git a/src/allmydata/test/test_helper.py b/src/allmydata/test/test_helper.py index 85995124..7ce1293b 100644 --- a/src/allmydata/test/test_helper.py +++ b/src/allmydata/test/test_helper.py @@ -43,7 +43,7 @@ class FakeClient(service.MultiService): return True def get_encoding_parameters(self): return self.DEFAULT_ENCODING_PARAMETERS - def get_permuted_peers(self, storage_index): + def get_permuted_peers(self, service_name, storage_index): return [] def flush_but_dont_ignore(res): diff --git a/src/allmydata/test/test_introducer.py b/src/allmydata/test/test_introducer.py index 925acd0d..5f154b4b 100644 --- a/src/allmydata/test/test_introducer.py +++ b/src/allmydata/test/test_introducer.py @@ -1,9 +1,9 @@ -from base64 import b32encode +from base64 import b32decode import os from twisted.trial import unittest -from twisted.internet import defer, reactor +from twisted.internet import defer from twisted.python import log from foolscap import Tub, Referenceable @@ -66,10 +66,8 @@ class TestIntroducer(unittest.TestCase, testutil.PollMixin): def test_create(self): - ic = IntroducerClient(None, "introducer", "myfurl") - def _ignore(nodeid, rref): - pass - ic.notify_on_new_connection(_ignore) + ic = IntroducerClient(None, "introducer.furl", "my_nickname", + "my_version", "oldest_version") def test_listen(self): i = IntroducerService() @@ -87,7 +85,7 @@ class TestIntroducer(unittest.TestCase, testutil.PollMixin): i = IntroducerService() i.setServiceParent(self.parent) - iurl = tub.registerReference(i) + introducer_furl = tub.registerReference(i) NUMCLIENTS = 5 # we have 5 clients who publish themselves, and an extra one which # does not. When the connections are fully established, all six nodes @@ -106,71 +104,82 @@ class TestIntroducer(unittest.TestCase, testutil.PollMixin): n = FakeNode() log.msg("creating client %d: %s" % (i, tub.getShortTubID())) + c = IntroducerClient(tub, introducer_furl, + "nickname-%d" % i, "version", "oldest") if i < NUMCLIENTS: node_furl = tub.registerReference(n) - else: - node_furl = None - c = IntroducerClient(tub, iurl, node_furl) + c.publish(node_furl, "storage", "ri_name") + # the last one does not publish anything + + c.subscribe_to("storage") c.setServiceParent(self.parent) clients.append(c) tubs[c] = tub - def _wait_for_all_connections(res): - dl = [] # list of when_enough_peers() for each peer - # will fire once everybody is connected + def _wait_for_all_connections(): for c in clients: - dl.append(c.when_enough_peers(NUMCLIENTS)) - return defer.DeferredList(dl, fireOnOneErrback=True) - - d = _wait_for_all_connections(None) + if len(c.get_all_connections()) < NUMCLIENTS: + return False + return True + d = self.poll(_wait_for_all_connections, timeout=5) def _check1(res): log.msg("doing _check1") for c in clients: - self.failUnlessEqual(len(c.connections), NUMCLIENTS) - self.failUnless(c._connected) # to the introducer + self.failUnless(c.connected_to_introducer()) + self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS) + self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS) + self.failUnlessEqual(len(c.get_all_connections_for("storage")), + NUMCLIENTS) d.addCallback(_check1) + origin_c = clients[0] def _disconnect_somebody_else(res): # now disconnect somebody's connection to someone else - # find a target that is not themselves - for nodeid,rref in origin_c.connections.items(): - if b32encode(nodeid).lower() != tubs[origin_c].tubID: - victim = rref - break - log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim)) - victim.tracker.broker.transport.loseConnection() + current_counter = origin_c.counter + victim_nodeid = b32decode(tubs[clients[1]].tubID.upper()) + log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, + victim_nodeid)) + origin_c.debug_disconnect_from_peerid(victim_nodeid) log.msg(" did disconnect") + + # then wait until something changes, which ought to be them + # noticing the loss + def _compare(): + return current_counter != origin_c.counter + return self.poll(_compare, timeout=5) + d.addCallback(_disconnect_somebody_else) - def _wait_til_he_notices(res): - # wait til the origin_c notices the loss - log.msg(" waiting until peer notices the disconnection") - return origin_c.when_fewer_than_peers(NUMCLIENTS) - d.addCallback(_wait_til_he_notices) - d.addCallback(_wait_for_all_connections) + + # and wait for them to reconnect + d.addCallback(lambda res: self.poll(_wait_for_all_connections, timeout=5)) def _check2(res): log.msg("doing _check2") for c in clients: - self.failUnlessEqual(len(c.connections), NUMCLIENTS) + self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS) d.addCallback(_check2) + def _disconnect_yourself(res): # now disconnect somebody's connection to themselves. - # find a target that *is* themselves - for nodeid,rref in origin_c.connections.items(): - if b32encode(nodeid).lower() == tubs[origin_c].tubID: - victim = rref - break - log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim)) - victim.tracker.broker.transport.loseConnection() + current_counter = origin_c.counter + victim_nodeid = b32decode(tubs[clients[0]].tubID.upper()) + log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, + victim_nodeid)) + origin_c.debug_disconnect_from_peerid(victim_nodeid) log.msg(" did disconnect from self") + + def _compare(): + return current_counter != origin_c.counter + return self.poll(_compare, timeout=5) d.addCallback(_disconnect_yourself) - d.addCallback(_wait_til_he_notices) - d.addCallback(_wait_for_all_connections) + + d.addCallback(lambda res: self.poll(_wait_for_all_connections, timeout=5)) def _check3(res): log.msg("doing _check3") for c in clients: - self.failUnlessEqual(len(c.connections), NUMCLIENTS) + self.failUnlessEqual(len(c.get_all_connections_for("storage")), + NUMCLIENTS) d.addCallback(_check3) def _shutdown_introducer(res): # now shut down the introducer. We do this by shutting down the @@ -180,100 +189,19 @@ class TestIntroducer(unittest.TestCase, testutil.PollMixin): log.msg("shutting down the introducer") return self.central_tub.disownServiceParent() d.addCallback(_shutdown_introducer) - d.addCallback(self.stall, 2) + def _wait_for_introducer_loss(): + for c in clients: + if c.connected_to_introducer(): + return False + return True + d.addCallback(lambda res: self.poll(_wait_for_introducer_loss, timeout=5)) + def _check4(res): log.msg("doing _check4") for c in clients: - self.failUnlessEqual(len(c.connections), NUMCLIENTS) - self.failIf(c._connected) + self.failUnlessEqual(len(c.get_all_connections_for("storage")), + NUMCLIENTS) + self.failIf(c.connected_to_introducer()) d.addCallback(_check4) return d - test_system.timeout = 2400 - - def stall(self, res, timeout): - d = defer.Deferred() - reactor.callLater(timeout, d.callback, res) - return d - - def test_system_this_one_breaks(self): - # this uses a single Tub, which has a strong effect on the - # failingness - tub = Tub() - tub.setOption("logLocalFailures", True) - tub.setOption("logRemoteFailures", True) - tub.setServiceParent(self.parent) - l = tub.listenOn("tcp:0") - portnum = l.getPortnum() - tub.setLocation("localhost:%d" % portnum) - - i = IntroducerService() - i.setServiceParent(self.parent) - iurl = tub.registerReference(i) - - clients = [] - for i in range(5): - n = FakeNode() - node_furl = tub.registerReference(n) - c = IntroducerClient(tub, iurl, node_furl) - c.setServiceParent(self.parent) - clients.append(c) - - # time passes.. - d = defer.Deferred() - def _check(res): - log.msg("doing _check") - self.failUnlessEqual(len(clients[0].connections), 5) - d.addCallback(_check) - reactor.callLater(2, d.callback, None) - return d - del test_system_this_one_breaks - - - def test_system_this_one_breaks_too(self): - # this one shuts down so quickly that it fails in a different way - self.central_tub = tub = Tub() - tub.setOption("logLocalFailures", True) - tub.setOption("logRemoteFailures", True) - tub.setServiceParent(self.parent) - l = tub.listenOn("tcp:0") - portnum = l.getPortnum() - tub.setLocation("localhost:%d" % portnum) - - i = IntroducerService() - i.setServiceParent(self.parent) - iurl = tub.registerReference(i) - - clients = [] - for i in range(5): - tub = Tub() - tub.setOption("logLocalFailures", True) - tub.setOption("logRemoteFailures", True) - tub.setServiceParent(self.parent) - l = tub.listenOn("tcp:0") - portnum = l.getPortnum() - tub.setLocation("localhost:%d" % portnum) - n = FakeNode() - node_furl = tub.registerReference(n) - c = IntroducerClient(tub, iurl, node_furl) - c.setServiceParent(self.parent) - clients.append(c) - - # time passes.. - d = defer.Deferred() - reactor.callLater(0.01, d.callback, None) - def _check(res): - log.msg("doing _check") - self.fail("BOOM") - for c in clients: - self.failUnlessEqual(len(c.connections), 5) - c.connections.values()[0].tracker.broker.transport.loseConnection() - return self.stall(None, 2) - d.addCallback(_check) - def _check_again(res): - log.msg("doing _check_again") - for c in clients: - self.failUnlessEqual(len(c.connections), 5) - d.addCallback(_check_again) - return d - del test_system_this_one_breaks_too diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index ecea762d..0d3460f5 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -47,12 +47,12 @@ class FakeFilenode(mutable.MutableFileNode): return defer.succeed(None) class FakePublish(mutable.Publish): - def _do_query(self, conn, peerid, peer_storage_servers, storage_index): - assert conn[0] == peerid + def _do_query(self, ss, peerid, storage_index): + assert ss[0] == peerid shares = self._peers[peerid] return defer.succeed(shares) - def _do_testreadwrite(self, peerid, peer_storage_servers, secrets, + def _do_testreadwrite(self, peerid, secrets, tw_vectors, read_vector): # always-pass: parrot the test vectors back to them. readv = {} @@ -113,9 +113,10 @@ class FakeClient: res = FakeFilenode(self).init_from_uri(u) return res - def get_permuted_peers(self, key, include_myself=True): + def get_permuted_peers(self, service_name, key): + # TODO: include_myself=True """ - @return: list of (permuted-peerid, peerid, connection,) + @return: list of (peerid, connection,) """ peers_and_connections = [(pid, (pid,)) for pid in self._peerids] results = [] @@ -124,6 +125,7 @@ class FakeClient: permuted = sha.new(key + peerid).digest() results.append((permuted, peerid, connection)) results.sort() + results = [ (r[1],r[2]) for r in results] return results def upload(self, uploadable): @@ -299,7 +301,7 @@ class Publish(unittest.TestCase): total_shares = 10 d = p._query_peers(total_shares) def _done(target_info): - (target_map, shares_per_peer, peer_storage_servers) = target_info + (target_map, shares_per_peer) = target_info shares_per_peer = {} for shnum in target_map: for (peerid, old_seqnum, old_R) in target_map[shnum]: @@ -321,7 +323,7 @@ class Publish(unittest.TestCase): total_shares = 10 d = p._query_peers(total_shares) def _done(target_info): - (target_map, shares_per_peer, peer_storage_servers) = target_info + (target_map, shares_per_peer) = target_info shares_per_peer = {} for shnum in target_map: for (peerid, old_seqnum, old_R) in target_map[shnum]: diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 8bcb7069..a9c90d68 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -32,7 +32,7 @@ This is some data to publish to the virtual drive, which needs to be large enough to not fit inside a LIT uri. """ -class SystemTest(testutil.SignalMixin, unittest.TestCase): +class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase): def setUp(self): self.sparent = service.MultiService() @@ -135,18 +135,20 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): d.addCallback(lambda res: c) return d + def _check_connections(self): + for c in self.clients: + ic = c.introducer_client + if not ic.connected_to_introducer(): + return False + if len(ic.get_all_peerids()) != self.numclients: + return False + return True + def wait_for_connections(self, ignored=None): # TODO: replace this with something that takes a list of peerids and # fires when they've all been heard from, instead of using a count # and a threshold - for c in self.clients: - if (not c.introducer_client or - len(list(c.get_all_peerids())) != self.numclients): - d = defer.Deferred() - d.addCallback(self.wait_for_connections) - reactor.callLater(0.05, d.callback, None) - return d - return defer.succeed(None) + return self.poll(self._check_connections, timeout=200) def test_connections(self): self.basedir = "system/SystemTest/test_connections" @@ -158,10 +160,8 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): for c in self.clients: all_peerids = list(c.get_all_peerids()) self.failUnlessEqual(len(all_peerids), self.numclients+1) - permuted_peers = list(c.get_permuted_peers("a", True)) + permuted_peers = list(c.get_permuted_peers("storage", "a")) self.failUnlessEqual(len(permuted_peers), self.numclients+1) - permuted_other_peers = list(c.get_permuted_peers("a", False)) - self.failUnlessEqual(len(permuted_other_peers), self.numclients) d.addCallback(_check) def _shutdown_extra_node(res): @@ -196,10 +196,8 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): for c in self.clients: all_peerids = list(c.get_all_peerids()) self.failUnlessEqual(len(all_peerids), self.numclients) - permuted_peers = list(c.get_permuted_peers("a", True)) + permuted_peers = list(c.get_permuted_peers("storage", "a")) self.failUnlessEqual(len(permuted_peers), self.numclients) - permuted_other_peers = list(c.get_permuted_peers("a", False)) - self.failUnlessEqual(len(permuted_other_peers), self.numclients-1) d.addCallback(_check_connections) def _do_upload(res): log.msg("UPLOADING") @@ -266,8 +264,12 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): def _download_nonexistent_uri(res): baduri = self.mangle_uri(self.uri) + log.msg("about to download non-existent URI", level=log.UNUSUAL, + facility="tahoe.tests") d1 = self.downloader.download_to_data(baduri) def _baduri_should_fail(res): + log.msg("finished downloading non-existend URI", + level=log.UNUSUAL, facility="tahoe.tests") self.failUnless(isinstance(res, Failure)) self.failUnless(res.check(download.NotEnoughPeersError), "expected NotEnoughPeersError, got %s" % res) @@ -834,9 +836,9 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): d.addCallback(self.log, "GOT WEB LISTENER") return d - def log(self, res, msg): + def log(self, res, msg, **kwargs): # print "MSG: %s RES: %s" % (msg, res) - log.msg(msg) + log.msg(msg, **kwargs) return res def stall(self, res, delay=1.0): @@ -1064,7 +1066,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): d.addCallback(_got_from_uri) # download from a bogus URI, make sure we get a reasonable error - d.addCallback(self.log, "_get_from_bogus_uri") + d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL) def _get_from_bogus_uri(res): d1 = getPage(base + "uri/%s?filename=%s" % (self.mangle_uri(self.uri), "mydata567")) @@ -1072,6 +1074,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): "410") return d1 d.addCallback(_get_from_bogus_uri) + d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL) # upload a file with PUT d.addCallback(self.log, "about to try PUT") @@ -1364,7 +1367,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): peers = set() for shpeers in sharemap.values(): peers.update(shpeers) - self.failUnlessEqual(len(peers), self.numclients-1) + self.failUnlessEqual(len(peers), self.numclients) d.addCallback(_check_checker_results) def _check_stored_results(res): diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index c402d08a..0bda732b 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -3,7 +3,6 @@ import os from twisted.trial import unittest from twisted.python.failure import Failure from twisted.python import log -from twisted.internet import defer from cStringIO import StringIO from allmydata import upload, encode, uri @@ -69,20 +68,6 @@ class Uploadable(unittest.TestCase): d.addCallback(lambda res: u.close()) return d -class FakePeer: - def __init__(self, mode="good"): - self.ss = FakeStorageServer(mode) - - def callRemote(self, methname, *args, **kwargs): - def _call(): - meth = getattr(self, methname) - return meth(*args, **kwargs) - return defer.maybeDeferred(_call) - - def get_service(self, sname): - assert sname == "storageserver" - return self.ss - class FakeStorageServer: def __init__(self, mode): self.mode = mode @@ -155,9 +140,9 @@ class FakeClient: def log(self, *args, **kwargs): pass def get_permuted_peers(self, storage_index, include_myself): - peers = [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),) + peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),) for fakeid in range(self.num_servers) ] - self.last_peers = [p[2] for p in peers] + self.last_peers = [p[1] for p in peers] return peers def get_push_to_ourselves(self): return None @@ -353,9 +338,9 @@ class PeerSelection(unittest.TestCase): d.addCallback(self._check_large, SIZE_LARGE) def _check(res): for p in self.node.last_peers: - allocated = p.ss.allocated + allocated = p.allocated self.failUnlessEqual(len(allocated), 1) - self.failUnlessEqual(p.ss.queries, 1) + self.failUnlessEqual(p.queries, 1) d.addCallback(_check) return d @@ -370,9 +355,9 @@ class PeerSelection(unittest.TestCase): d.addCallback(self._check_large, SIZE_LARGE) def _check(res): for p in self.node.last_peers: - allocated = p.ss.allocated + allocated = p.allocated self.failUnlessEqual(len(allocated), 2) - self.failUnlessEqual(p.ss.queries, 2) + self.failUnlessEqual(p.queries, 2) d.addCallback(_check) return d @@ -389,13 +374,13 @@ class PeerSelection(unittest.TestCase): got_one = [] got_two = [] for p in self.node.last_peers: - allocated = p.ss.allocated + allocated = p.allocated self.failUnless(len(allocated) in (1,2), len(allocated)) if len(allocated) == 1: - self.failUnlessEqual(p.ss.queries, 1) + self.failUnlessEqual(p.queries, 1) got_one.append(p) else: - self.failUnlessEqual(p.ss.queries, 2) + self.failUnlessEqual(p.queries, 2) got_two.append(p) self.failUnlessEqual(len(got_one), 49) self.failUnlessEqual(len(got_two), 1) @@ -414,9 +399,9 @@ class PeerSelection(unittest.TestCase): d.addCallback(self._check_large, SIZE_LARGE) def _check(res): for p in self.node.last_peers: - allocated = p.ss.allocated + allocated = p.allocated self.failUnlessEqual(len(allocated), 4) - self.failUnlessEqual(p.ss.queries, 2) + self.failUnlessEqual(p.queries, 2) d.addCallback(_check) return d @@ -432,7 +417,7 @@ class PeerSelection(unittest.TestCase): def _check(res): counts = {} for p in self.node.last_peers: - allocated = p.ss.allocated + allocated = p.allocated counts[len(allocated)] = counts.get(len(allocated), 0) + 1 histogram = [counts.get(i, 0) for i in range(5)] self.failUnlessEqual(histogram, [0,0,0,2,1]) diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index ed147f16..78fa8cb5 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -44,18 +44,17 @@ class TooFullError(Exception): EXTENSION_SIZE = 1000 class PeerTracker: - def __init__(self, peerid, permutedid, connection, + def __init__(self, peerid, storage_server, sharesize, blocksize, num_segments, num_share_hashes, storage_index, bucket_renewal_secret, bucket_cancel_secret): precondition(isinstance(peerid, str), peerid) precondition(len(peerid) == 20, peerid) self.peerid = peerid - self.permutedid = permutedid - self.connection = connection # to an RIClient + self._storageserver = storage_server # to an RIStorageServer self.buckets = {} # k: shareid, v: IRemoteBucketWriter self.sharesize = sharesize - #print "PeerTracker", peerid, permutedid, sharesize + #print "PeerTracker", peerid, sharesize as = storage.allocated_size(sharesize, num_segments, num_share_hashes, @@ -66,7 +65,6 @@ class PeerTracker: self.num_segments = num_segments self.num_share_hashes = num_share_hashes self.storage_index = storage_index - self._storageserver = None self.renew_secret = bucket_renewal_secret self.cancel_secret = bucket_cancel_secret @@ -77,15 +75,6 @@ class PeerTracker: idlib.b2a(self.storage_index)[:6])) def query(self, sharenums): - if not self._storageserver: - d = self.connection.callRemote("get_service", "storageserver") - d.addCallback(self._got_storageserver) - d.addCallback(lambda res: self._query(sharenums)) - return d - return self._query(sharenums) - def _got_storageserver(self, storageserver): - self._storageserver = storageserver - def _query(self, sharenums): #print " query", self.peerid, len(sharenums) d = self._storageserver.callRemote("allocate_buckets", self.storage_index, @@ -144,7 +133,8 @@ class Tahoe2PeerSelector: self.use_peers = set() # PeerTrackers that have shares assigned to them self.preexisting_shares = {} # sharenum -> PeerTracker holding the share - peers = client.get_permuted_peers(storage_index, push_to_ourselves) + peers = client.get_permuted_peers("storage", storage_index) + # TODO: push_to_ourselves if not peers: raise encode.NotEnoughPeersError("client gave us zero peers") @@ -167,7 +157,7 @@ class Tahoe2PeerSelector: file_cancel_secret = file_cancel_secret_hash(client_cancel_secret, storage_index) - trackers = [ PeerTracker(peerid, permutedid, conn, + trackers = [ PeerTracker(peerid, conn, share_size, block_size, num_segments, num_share_hashes, storage_index, @@ -176,7 +166,7 @@ class Tahoe2PeerSelector: bucket_cancel_secret_hash(file_cancel_secret, peerid), ) - for permutedid, peerid, conn in peers ] + for (peerid, conn) in peers ] self.uncontacted_peers = trackers d = defer.maybeDeferred(self._loop) -- 2.45.2