# messages (or if we used promises).
found = set()
for (pmpeerid, peerid, connection) in self.peer_getter(storage_index):
- buckets = connection.get_service("storageserver").get_buckets(si)
+ buckets = connection.get_buckets(si)
found.update(buckets.keys())
return len(found)
'''
def _get_all_shareholders(self, storage_index):
dl = []
- for (pmpeerid, peerid, connection) in self.peer_getter(storage_index):
- d = connection.callRemote("get_service", "storageserver")
- d.addCallback(lambda ss: ss.callRemote("get_buckets",
- storage_index))
+ for (peerid, ss) in self.peer_getter("storage", storage_index):
+ d = ss.callRemote("get_buckets", storage_index)
d.addCallbacks(self._got_response, self._got_error,
callbackArgs=(peerid,))
dl.append(d)
-import os, sha, stat, time, re
-from foolscap import Referenceable
-from zope.interface import implements
-from allmydata.interfaces import RIClient
+import os, stat, time, re
+from allmydata.interfaces import RIStorageServer
from allmydata import node
from twisted.internet import reactor
TiB=1024*GiB
PiB=1024*TiB
-class Client(node.Node, Referenceable, testutil.PollMixin):
- implements(RIClient)
+class Client(node.Node, testutil.PollMixin):
PORTNUMFILE = "client.port"
STOREDIR = 'storage'
NODETYPE = "client"
# that we will abort an upload unless we can allocate space for at least
# this many. 'total' is the total number of shares created by encoding.
# If everybody has room then this is is how many we will upload.
- DEFAULT_ENCODING_PARAMETERS = {"k":25,
- "happy": 75,
- "n": 100,
+ DEFAULT_ENCODING_PARAMETERS = {"k": 3,
+ "happy": 7,
+ "n": 10,
"max_segment_size": 1*MiB,
}
def __init__(self, basedir="."):
node.Node.__init__(self, basedir)
self.logSource="Client"
- self.my_furl = None
- self.introducer_client = None
+ self.nickname = self.get_config("nickname")
+ if self.nickname is None:
+ self.nickname = "<unspecified>"
+ self.init_introducer_client()
self.init_stats_provider()
self.init_lease_secret()
self.init_storage()
self.add_service(Checker())
# ControlServer and Helper are attached after Tub startup
- self.introducer_furl = self.get_config("introducer.furl", required=True)
-
hotline_file = os.path.join(self.basedir,
self.SUICIDE_PREVENTION_HOTLINE_FILE)
if os.path.exists(hotline_file):
if webport:
self.init_web(webport) # strports string
+ def init_introducer_client(self):
+ self.introducer_furl = self.get_config("introducer.furl", required=True)
+ ic = IntroducerClient(self.tub, self.introducer_furl,
+ self.nickname,
+ str(allmydata.__version__),
+ str(self.OLDEST_SUPPORTED_VERSION))
+ self.introducer_client = ic
+ ic.setServiceParent(self)
+ # nodes that want to upload and download will need storage servers
+ ic.subscribe_to("storage")
+
def init_stats_provider(self):
gatherer_furl = self.get_config('stats_gatherer.furl')
if gatherer_furl:
self._lease_secret = idlib.a2b(secret_s)
def init_storage(self):
+ # should we run a storage server (and publish it for others to use)?
+ provide_storage = (self.get_config("no_storage") is None)
+ if not provide_storage:
+ return
+ readonly_storage = (self.get_config("readonly_storage") is not None)
+
storedir = os.path.join(self.basedir, self.STOREDIR)
sizelimit = None
"G": 1000 * 1000 * 1000,
}[suffix]
sizelimit = int(number) * multiplier
- no_storage = self.get_config("debug_no_storage") is not None
- self.add_service(StorageServer(storedir, sizelimit, no_storage, self.stats_provider))
+ discard_storage = self.get_config("debug_discard_storage") is not None
+ ss = StorageServer(storedir, sizelimit,
+ discard_storage, readonly_storage,
+ self.stats_provider)
+ self.add_service(ss)
+ d = self.when_tub_ready()
+ # we can't do registerReference until the Tub is ready
+ def _publish(res):
+ furl_file = os.path.join(self.basedir, "private", "storage.furl")
+ furl = self.tub.registerReference(ss, furlFile=furl_file)
+ ri_name = RIStorageServer.__remote_name__
+ self.introducer_client.publish(furl, "storage", ri_name)
+ d.addCallback(_publish)
+ d.addErrback(log.err, facility="tahoe.storage", level=log.BAD)
+
def init_options(self):
self.push_to_ourselves = None
self.log("tub_ready")
node.Node.tub_ready(self)
- furl_file = os.path.join(self.basedir, "myself.furl")
- self.my_furl = self.tub.registerReference(self, furlFile=furl_file)
-
- # should we publish ourselves as a server?
- provide_storage = (self.get_config("no_storage") is None)
- if provide_storage:
- my_furl = self.my_furl
- else:
- my_furl = None
-
- ic = IntroducerClient(self.tub, self.introducer_furl, my_furl)
- self.introducer_client = ic
- ic.setServiceParent(self)
-
+ # TODO: replace register_control() with an init_control() that
+ # internally uses self.when_tub_ready() to stall registerReference.
+ # Do the same for register_helper(). That will remove the need for
+ # this tub_ready() method.
self.register_control()
self.register_helper()
helper_furlfile = os.path.join(self.basedir, "private", "helper.furl")
self.tub.registerReference(h, furlFile=helper_furlfile)
- def remote_get_versions(self):
- return str(allmydata.__version__), str(self.OLDEST_SUPPORTED_VERSION)
-
- def remote_get_service(self, name):
- if name in ("storageserver",):
- return self.getServiceNamed(name)
- raise RuntimeError("I am unwilling to give you service %s" % name)
-
- def remote_get_nodeid(self):
- return self.nodeid
-
def get_all_peerids(self):
- if not self.introducer_client:
- return []
return self.introducer_client.get_all_peerids()
- def get_permuted_peers(self, key, include_myself=True):
+ def get_permuted_peers(self, service_name, key):
"""
- @return: list of (permuted-peerid, peerid, connection,)
+ @return: list of (peerid, connection,)
"""
- results = []
- for peerid, connection in self.introducer_client.get_all_peers():
- assert isinstance(peerid, str)
- if not include_myself and peerid == self.nodeid:
- self.log("get_permuted_peers: removing myself from the list")
- continue
- permuted = sha.new(key + peerid).digest()
- results.append((permuted, peerid, connection))
- results.sort()
- return results
+ assert isinstance(service_name, str)
+ assert isinstance(key, str)
+ return self.introducer_client.get_permuted_peers(service_name, key)
def get_push_to_ourselves(self):
return self.push_to_ourselves
def get_encoding_parameters(self):
- if not self.introducer_client:
- return self.DEFAULT_ENCODING_PARAMETERS
+ return self.DEFAULT_ENCODING_PARAMETERS
p = self.introducer_client.encoding_parameters # a tuple
# TODO: make the 0.7.1 introducer publish a dict instead of a tuple
params = {"k": p[0],
# phase to take more than 10 seconds. Expect worst-case latency to be
# 300ms.
results = {}
- everyone = list(self.parent.introducer_client.get_all_peers())
+ conns = self.parent.introducer_client.get_all_connections_for("storage")
+ everyone = [(peerid,rref) for (peerid, service_name, rref) in conns]
num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
everyone = everyone * num_pings
d = self._do_one_ping(None, everyone, results)
return results
peerid, connection = everyone_left.pop(0)
start = time.time()
- d = connection.callRemote("get_nodeid")
+ d = connection.callRemote("get_versions")
def _done(ignored):
stop = time.time()
elapsed = stop - start
def _get_all_shareholders(self):
dl = []
- for (permutedpeerid, peerid, connection) in self._client.get_permuted_peers(self._storage_index):
- d = connection.callRemote("get_service", "storageserver")
- d.addCallback(lambda ss: ss.callRemote("get_buckets",
- self._storage_index))
- d.addCallbacks(self._got_response, self._got_error,
- callbackArgs=(connection,))
+ for (peerid,ss) in self._client.get_permuted_peers("storage",
+ self._storage_index):
+ d = ss.callRemote("get_buckets", self._storage_index)
+ d.addCallbacks(self._got_response, self._got_error)
dl.append(d)
return defer.DeferredList(dl)
- def _got_response(self, buckets, connection):
- _assert(isinstance(buckets, dict), buckets) # soon foolscap will check this for us with its DictOf schema constraint
+ def _got_response(self, buckets):
for sharenum, bucket in buckets.iteritems():
b = storage.ReadBucketProxy(bucket)
self.add_share_bucket(sharenum, b)
LeaseRenewSecret = Hash # used to protect bucket lease renewal requests
LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests
-
-class RIIntroducerClient(RemoteInterface):
- def new_peers(furls=SetOf(FURL)):
+# Announcements are (FURL, service_name, remoteinterface_name,
+# nickname, my_version, oldest_supported)
+# the (FURL, service_name, remoteinterface_name) refer to the service being
+# announced. The (nickname, my_version, oldest_supported) refer to the
+# client as a whole. The my_version/oldest_supported strings can be parsed
+# by an allmydata.util.version.Version instance, and then compared. The
+# first goal is to make sure that nodes are not confused by speaking to an
+# incompatible peer. The second goal is to enable the development of
+# backwards-compatibility code.
+
+Announcement = TupleOf(FURL, str, str,
+ str, str, str)
+
+class RIIntroducerSubscriberClient(RemoteInterface):
+ __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com"
+
+ def announce(announcements=SetOf(Announcement)):
+ """I accept announcements from the publisher."""
return None
+
def set_encoding_parameters(parameters=(int, int, int)):
"""Advise the client of the recommended k-of-n encoding parameters
for this grid. 'parameters' is a tuple of (k, desired, n), where 'n'
"""
return None
-class RIIntroducer(RemoteInterface):
- def hello(node=RIIntroducerClient, furl=ChoiceOf(FURL, None)):
+# When Foolscap can handle multiple interfaces (Foolscap#17), the
+# full-powered introducer will implement both RIIntroducerPublisher and
+# RIIntroducerSubscriberService. Until then, we define
+# RIIntroducerPublisherAndSubscriberService as a combination of the two, and
+# make everybody use that.
+
+class RIIntroducerPublisher(RemoteInterface):
+ """To publish a service to the world, connect to me and give me your
+ announcement message. I will deliver a copy to all connected subscribers."""
+ __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com"
+
+ def publish(announcement=Announcement):
+ # canary?
return None
-class RIClient(RemoteInterface):
- def get_versions():
- """Return a tuple of (my_version, oldest_supported) strings.
+class RIIntroducerSubscriberService(RemoteInterface):
+ __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com"
- Each string can be parsed by an allmydata.util.version.Version
- instance, and then compared. The first goal is to make sure that
- nodes are not confused by speaking to an incompatible peer. The
- second goal is to enable the development of backwards-compatibility
- code.
+ def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str):
+ """Give me a subscriber reference, and I will call its new_peers()
+ method will any announcements that match the desired service name. I
+ will ignore duplicate subscriptions.
+ """
+ return None
- This method is likely to change in incompatible ways until we get the
- whole compatibility scheme nailed down.
+class RIIntroducerPublisherAndSubscriberService(RemoteInterface):
+ __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com"
+ def publish(announcement=Announcement):
+ return None
+ def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str):
+ return None
+
+class IIntroducerClient(Interface):
+ """I provide service introduction facilities for a node. I help nodes
+ publish their services to the rest of the world, and I help them learn
+ about services available on other nodes."""
+
+ def publish(furl, service_name, remoteinterface_name):
+ """Once you call this, I will tell the world that the Referenceable
+ available at FURL is available to provide a service named
+ SERVICE_NAME. The precise definition of the service being provided is
+ identified by the Foolscap 'remote interface name' in the last
+ parameter: this is supposed to be a globally-unique string that
+ identifies the RemoteInterface that is implemented."""
+
+ def subscribe_to(service_name):
+ """Call this if you will eventually want to use services with the
+ given SERVICE_NAME. This will prompt me to subscribe to announcements
+ of those services. You can pick up the announcements later by calling
+ get_all_connections_for() or get_permuted_peers().
"""
- return TupleOf(str, str)
- def get_service(name=str):
- return Referenceable
- def get_nodeid():
- return Nodeid
+
+ def get_all_connections():
+ """Return a frozenset of (nodeid, service_name, rref) tuples, one for
+ each active connection we've established to a remote service. This is
+ mostly useful for unit tests that need to wait until a certain number
+ of connections have been made."""
+
+ def get_all_connectors():
+ """Return a dict that maps from (nodeid, service_name) to a
+ RemoteServiceConnector instance for all services that we are actively
+ trying to connect to. Each RemoteServiceConnector has the following
+ public attributes::
+
+ announcement_time: when we first heard about this service
+ last_connect_time: when we last established a connection
+ last_loss_time: when we last lost a connection
+
+ version: the peer's version, from the most recent connection
+ oldest_supported: the peer's oldest supported version, same
+
+ rref: the RemoteReference, if connected, otherwise None
+ remote_host: the IAddress, if connected, otherwise None
+
+ This method is intended for monitoring interfaces, such as a web page
+ which describes connecting and connected peers.
+ """
+
+ def get_all_peerids():
+ """Return a frozenset of all peerids to whom we have a connection (to
+ one or more services) established. Mostly useful for unit tests."""
+
+ def get_all_connections_for(service_name):
+ """Return a frozenset of (nodeid, service_name, rref) tuples, one
+ for each active connection that provides the given SERVICE_NAME."""
+
+ def get_permuted_peers(service_name, key):
+ """Returns an ordered list of (peerid, rref) tuples, selecting from
+ the connections that provide SERVICE_NAME, using a hash-based
+ permutation keyed by KEY. This randomizes the service list in a
+ repeatable way, to distribute load over many peers.
+ """
+
+ def connected_to_introducer():
+ """Returns a boolean, True if we are currently connected to the
+ introducer, False if not."""
+
class RIBucketWriter(RemoteInterface):
def write(offset=int, data=ShareData):
# returns data[offset:offset+length] for each element of TestVector
class RIStorageServer(RemoteInterface):
+ __remote_name__ = "RIStorageServer.tahoe.allmydata.com"
+
+ def get_versions():
+ """Return a tuple of (my_version, oldest_supported) strings.
+ Each string can be parsed by an allmydata.util.version.Version
+ instance, and then compared. The first goal is to make sure that
+ nodes are not confused by speaking to an incompatible peer. The
+ second goal is to enable the development of backwards-compatibility
+ code.
+
+ This method is likely to change in incompatible ways until we get the
+ whole compatibility scheme nailed down.
+ """
+ return TupleOf(str, str)
+
def allocate_buckets(storage_index=StorageIndex,
renew_secret=LeaseRenewSecret,
cancel_secret=LeaseCancelSecret,
-import re
-from base64 import b32encode, b32decode
+import re, time, sha
+from base64 import b32decode
from zope.interface import implements
from twisted.application import service
from twisted.internet import defer
-from twisted.python import log
from foolscap import Referenceable
from allmydata import node
-from allmydata.interfaces import RIIntroducer, RIIntroducerClient
-from allmydata.util import observer
+from allmydata.interfaces import RIIntroducerPublisherAndSubscriberService, \
+ RIIntroducerSubscriberClient, IIntroducerClient
+from allmydata.util import observer, log, idlib
class IntroducerNode(node.Node):
PORTNUMFILE = "introducer.port"
self.write_config("introducer.furl", self.introducer_url + "\n")
class IntroducerService(service.MultiService, Referenceable):
- implements(RIIntroducer)
+ implements(RIIntroducerPublisherAndSubscriberService)
name = "introducer"
def __init__(self, basedir=".", encoding_parameters=None):
service.MultiService.__init__(self)
self.introducer_url = None
- self.nodes = set()
- self.furls = set()
+ self._announcements = set()
+ self._subscribers = {}
self._encoding_parameters = encoding_parameters
- def remote_hello(self, node, furl):
- log.msg("introducer: new contact at %s, node is %s" % (furl, node))
+ def log(self, *args, **kwargs):
+ if "facility" not in kwargs:
+ kwargs["facility"] = "tahoe.introducer"
+ return log.msg(*args, **kwargs)
+
+ def remote_publish(self, announcement):
+ self.log("introducer: announcement published: %s" % (announcement,) )
+ (furl, service_name, ri_name, nickname, ver, oldest) = announcement
+ if announcement in self._announcements:
+ self.log("but we already knew it, ignoring", level=log.NOISY)
+ return
+ self._announcements.add(announcement)
+ for s in self._subscribers.get(service_name, []):
+ s.callRemote("announce", set([announcement]))
+
+ def remote_subscribe(self, subscriber, service_name):
+ self.log("introducer: subscription[%s] request at %s" % (service_name,
+ subscriber))
+ if service_name not in self._subscribers:
+ self._subscribers[service_name] = set()
+ subscribers = self._subscribers[service_name]
+ if subscriber in subscribers:
+ self.log("but they're already subscribed, ignoring",
+ level=log.UNUSUAL)
+ return
+ subscribers.add(subscriber)
def _remove():
- log.msg(" introducer: removing %s %s" % (node, furl))
- self.nodes.remove(node)
- if furl is not None:
- self.furls.remove(furl)
- node.notifyOnDisconnect(_remove)
- if furl is not None:
- self.furls.add(furl)
- for othernode in self.nodes:
- othernode.callRemote("new_peers", set([furl]))
- node.callRemote("new_peers", self.furls)
- if self._encoding_parameters is not None:
- node.callRemote("set_encoding_parameters",
- self._encoding_parameters)
- self.nodes.add(node)
+ self.log("introducer: unsubscribing[%s] %s" % (service_name,
+ subscriber))
+ subscribers.remove(subscriber)
+ subscriber.notifyOnDisconnect(_remove)
-class IntroducerClient(service.Service, Referenceable):
- implements(RIIntroducerClient)
+ announcements = set( [ a
+ for a in self._announcements
+ if a[1] == service_name ] )
+ d = subscriber.callRemote("announce", announcements)
+ d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)
- def __init__(self, tub, introducer_furl, my_furl):
- self.tub = tub
- self.introducer_furl = introducer_furl
- self.my_furl = my_furl
+ def UNKNOWN(): # TODO
+ if self._encoding_parameters is not None:
+ node.callRemote("set_encoding_parameters",
+ self._encoding_parameters)
- self.connections = {} # k: nodeid, v: ref
- self.reconnectors = {} # k: FURL, v: reconnector
- self._connected = False
- self.connection_observers = observer.ObserverList()
- self.encoding_parameters = None
+class PeerCountObserver:
+ # This is used by unit test code to wait until peer connections have been
+ # established.
+ def __init__(self):
# The N'th element of _observers_of_enough_peers is None if nobody has
# asked to be informed when N peers become connected, it is a
# OneShotObserverList if someone has asked to be informed, and that list
# interested in (i.e., there are never trailing Nones in
# _observers_of_fewer_than_peers).
self._observers_of_fewer_than_peers = []
-
- def startService(self):
- service.Service.startService(self)
- self.introducer_reconnector = self.tub.connectTo(self.introducer_furl,
- self._got_introducer)
- def connect_failed(failure):
- self.log("\n\nInitial Introducer connection failed: "
- "perhaps it's down\n")
- self.log(str(failure))
- d = self.tub.getReference(self.introducer_furl)
- d.addErrback(connect_failed)
-
- def log(self, msg):
- self.parent.log(msg)
-
- def remote_new_peers(self, furls):
- for furl in furls:
- self._new_peer(furl)
-
- def remote_set_encoding_parameters(self, parameters):
- self.encoding_parameters = parameters
-
- def stopService(self):
- service.Service.stopService(self)
- self.introducer_reconnector.stopConnecting()
- for reconnector in self.reconnectors.itervalues():
- reconnector.stopConnecting()
+ self.connection_observers = observer.ObserverList()
def _notify_observers_of_enough_peers(self, numpeers):
if len(self._observers_of_enough_peers) > numpeers:
while len(self._observers_of_fewer_than_peers) > numpeers and (not self._observers_of_fewer_than_peers[-1]):
self._observers_of_fewer_than_peers.pop()
- def _new_peer(self, furl):
- if furl in self.reconnectors:
- return
- # TODO: rather than using the TubID as a nodeid, we should use
- # something else. The thing that requires the least additional
- # mappings is to use the foolscap "identifier" (the last component of
- # the furl), since these are unguessable. Before we can do that,
- # though, we need a way to conveniently make these identifiers
- # persist from one run of the client program to the next. Also, using
- # the foolscap identifier would mean that anyone who knows the name
- # of the node also has all the secrets they need to contact and use
- # them, which may or may not be what we want.
- m = re.match(r'pb://(\w+)@', furl)
- assert m
- nodeid = b32decode(m.group(1).upper())
- def _got_peer(rref):
- self.log("connected to %s" % b32encode(nodeid).lower()[:8])
- self.connection_observers.notify(nodeid, rref)
- self.connections[nodeid] = rref
- self._notify_observers_of_enough_peers(len(self.connections))
- self._notify_observers_of_fewer_than_peers(len(self.connections))
- def _lost():
- # TODO: notifyOnDisconnect uses eventually(), but connects do
- # not. Could this cause a problem?
-
- # We know that this observer list must have been fired, since we
- # had enough peers before this one was lost.
- self._remove_observers_of_enough_peers(len(self.connections))
- self._notify_observers_of_fewer_than_peers(len(self.connections)+1)
-
- del self.connections[nodeid]
-
- rref.notifyOnDisconnect(_lost)
- self.log("connecting to %s" % b32encode(nodeid).lower()[:8])
- self.reconnectors[furl] = self.tub.connectTo(furl, _got_peer)
-
- def _got_introducer(self, introducer):
- if self.my_furl:
- my_furl_s = self.my_furl[6:13]
- else:
- my_furl_s = "<none>"
- self.log("introducing ourselves: %s, %s" % (self, my_furl_s))
- self._connected = True
- d = introducer.callRemote("hello",
- node=self,
- furl=self.my_furl)
- introducer.notifyOnDisconnect(self._disconnected)
-
- def _disconnected(self):
- self.log("bummer, we've lost our connection to the introducer")
- self._connected = False
-
- def notify_on_new_connection(self, cb):
- """Register a callback that will be fired (with nodeid, rref) when
- a new connection is established."""
- self.connection_observers.subscribe(cb)
-
- def connected_to_introducer(self):
- return self._connected
-
- def get_all_peerids(self):
- return self.connections.iterkeys()
-
- def get_all_peers(self):
- return self.connections.iteritems()
-
def when_enough_peers(self, numpeers):
"""
I return a deferred that fires the next time that at least
if not self._observers_of_fewer_than_peers[numpeers]:
self._observers_of_fewer_than_peers[numpeers] = observer.OneShotObserverList()
return self._observers_of_fewer_than_peers[numpeers].when_fired()
+
+ def notify_on_new_connection(self, cb):
+ """Register a callback that will be fired (with nodeid, rref) when
+ a new connection is established."""
+ self.connection_observers.subscribe(cb)
+
+ def add_peer(self, ann):
+ self._notify_observers_of_enough_peers(len(self.connections))
+ self._notify_observers_of_fewer_than_peers(len(self.connections))
+
+ def remove_peer(self, ann):
+ self._remove_observers_of_enough_peers(len(self.connections))
+ self._notify_observers_of_fewer_than_peers(len(self.connections)+1)
+
+
+
+class RemoteServiceConnector:
+ """I hold information about a peer service that we want to connect to. If
+ we are connected, I hold the RemoteReference, the peer's address, and the
+ peer's version information. I remember information about when we were
+ last connected to the peer too, even if we aren't currently connected.
+
+ @ivar announcement_time: when we first heard about this service
+ @ivar last_connect_time: when we last established a connection
+ @ivar last_loss_time: when we last lost a connection
+
+ @ivar version: the peer's version, from the most recent connection
+ @ivar oldest_supported: the peer's oldest supported version, same
+
+ @ivar rref: the RemoteReference, if connected, otherwise None
+ @ivar remote_host: the IAddress, if connected, otherwise None
+ """
+
+ def __init__(self, announcement, tub, ic):
+ self._tub = tub
+ self._announcement = announcement
+ self._ic = ic
+ (furl, service_name, ri_name, nickname, ver, oldest) = announcement
+
+ self._furl = furl
+ m = re.match(r'pb://(\w+)@', furl)
+ assert m
+ self._nodeid = b32decode(m.group(1).upper())
+ self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
+
+ self._index = (self._nodeid, service_name)
+ self._service_name = service_name
+
+ self.log("attempting to connect to %s" % self._nodeid_s)
+ self.announcement_time = time.time()
+ self.last_loss_time = None
+ self.rref = None
+ self.remote_host = None
+ self.last_connect_time = None
+ self.version = None
+ self.oldest_supported = None
+
+ def log(self, *args, **kwargs):
+ return self._ic.log(*args, **kwargs)
+
+ def get_index(self):
+ return self._index
+
+ def startConnecting(self):
+ self._reconnector = self._tub.connectTo(self._furl, self._got_service)
+
+ def stopConnecting(self):
+ self._reconnector.stopConnecting()
+
+ def _got_service(self, rref):
+ self.last_connect_time = time.time()
+ self.remote_host = str(rref.tracker.broker.transport.getPeer())
+
+ self.rref = rref
+ self.log("connected to %s" % self._nodeid_s)
+
+ self._ic.add_connection(self._nodeid, self._service_name, rref)
+
+ rref.notifyOnDisconnect(self._lost, rref)
+
+ def _lost(self, rref):
+ self.log("lost connection to %s" % self._nodeid_s)
+ self.last_loss_time = time.time()
+ self.rref = None
+ self.remote_host = None
+ self._ic.remove_connection(self._nodeid, self._service_name, rref)
+
+
+
+class IntroducerClient(service.Service, Referenceable):
+ implements(RIIntroducerSubscriberClient, IIntroducerClient)
+
+ def __init__(self, tub, introducer_furl,
+ nickname, my_version, oldest_supported):
+ self._tub = tub
+ self.introducer_furl = introducer_furl
+
+ self._nickname = nickname
+ self._my_version = my_version
+ self._oldest_supported = oldest_supported
+
+ self._published_announcements = set()
+
+ self._publisher = None
+ self._connected = False
+
+ self._subscribed_service_names = set()
+ self._subscriptions = set() # requests we've actually sent
+ self._received_announcements = set()
+ # TODO: this set will grow without bound, until the node is restarted
+
+ # we only accept one announcement per (peerid+service_name) pair.
+ # This insures that an upgraded host replace their previous
+ # announcement. It also means that each peer must have their own Tub
+ # (no sharing), which is slightly weird but consistent with the rest
+ # of the Tahoe codebase.
+ self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
+ # self._connections is a set of (peerid, service_name, rref) tuples
+ self._connections = set()
+
+ #self.counter = PeerCountObserver()
+ self.counter = 0 # incremented each time we change state, for tests
+ self.encoding_parameters = None
+
+ def startService(self):
+ service.Service.startService(self)
+ rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
+ self._introducer_reconnector = rc
+ def connect_failed(failure):
+ self.log("Initial Introducer connection failed: perhaps it's down",
+ level=log.WEIRD, failure=failure)
+ d = self._tub.getReference(self.introducer_furl)
+ d.addErrback(connect_failed)
+
+ def _got_introducer(self, publisher):
+ self.log("connected to introducer")
+ self._connected = True
+ self._publisher = publisher
+ publisher.notifyOnDisconnect(self._disconnected)
+ self._maybe_publish()
+ self._maybe_subscribe()
+
+ def _disconnected(self):
+ self.log("bummer, we've lost our connection to the introducer")
+ self._connected = False
+ self._publisher = None
+ self._subscriptions.clear()
+
+ def stopService(self):
+ service.Service.stopService(self)
+ self._introducer_reconnector.stopConnecting()
+ for rsc in self._connectors.itervalues():
+ rsc.stopConnecting()
+
+ def log(self, *args, **kwargs):
+ if "facility" not in kwargs:
+ kwargs["facility"] = "tahoe.introducer"
+ return log.msg(*args, **kwargs)
+
+
+ def publish(self, furl, service_name, remoteinterface_name):
+ ann = (furl, service_name, remoteinterface_name,
+ self._nickname, self._my_version, self._oldest_supported)
+ self._published_announcements.add(ann)
+ self._maybe_publish()
+
+ def subscribe_to(self, service_name):
+ self._subscribed_service_names.add(service_name)
+ self._maybe_subscribe()
+
+ def _maybe_subscribe(self):
+ if not self._publisher:
+ self.log("want to subscribe, but no introducer yet",
+ level=log.NOISY)
+ return
+ for service_name in self._subscribed_service_names:
+ if service_name not in self._subscriptions:
+ # there is a race here, but the subscription desk ignores
+ # duplicate requests.
+ self._subscriptions.add(service_name)
+ d = self._publisher.callRemote("subscribe", self, service_name)
+ d.addErrback(log.err, facility="tahoe.introducer",
+ level=log.WEIRD)
+
+ def _maybe_publish(self):
+ if not self._publisher:
+ self.log("want to publish, but no introducer yet", level=log.NOISY)
+ return
+ # this re-publishes everything. The Introducer ignores duplicates
+ for ann in self._published_announcements:
+ d = self._publisher.callRemote("publish", ann)
+ d.addErrback(log.err, facility="tahoe.introducer",
+ level=log.WEIRD)
+
+
+
+ def remote_announce(self, announcements):
+ for ann in announcements:
+ self.log("received %d announcements" % len(announcements))
+ (furl, service_name, ri_name, nickname, ver, oldest) = ann
+ if service_name not in self._subscribed_service_names:
+ self.log("announcement for a service we don't care about [%s]"
+ % (service_name,), level=log.WEIRD)
+ continue
+ if ann in self._received_announcements:
+ self.log("ignoring old announcement: %s" % (ann,),
+ level=log.NOISY)
+ continue
+ self.log("new announcement[%s]: %s" % (service_name, ann))
+ self._received_announcements.add(ann)
+ self._new_announcement(ann)
+
+ def _new_announcement(self, announcement):
+ # this will only be called for new announcements
+ rsc = RemoteServiceConnector(announcement, self._tub, self)
+ index = rsc.get_index()
+ if index in self._connectors:
+ self._connectors[index].stopConnecting()
+ self._connectors[index] = rsc
+ rsc.startConnecting()
+
+ def add_connection(self, nodeid, service_name, rref):
+ self._connections.add( (nodeid, service_name, rref) )
+ self.counter += 1
+
+ def remove_connection(self, nodeid, service_name, rref):
+ self._connections.discard( (nodeid, service_name, rref) )
+ self.counter += 1
+
+
+ def get_all_connections(self):
+ return frozenset(self._connections)
+
+ def get_all_connectors(self):
+ return self._connectors.copy()
+
+ def get_all_peerids(self):
+ return frozenset([peerid
+ for (peerid, service_name, rref)
+ in self._connections])
+
+ def get_all_connections_for(self, service_name):
+ return frozenset([c
+ for c in self._connections
+ if c[1] == service_name])
+
+ def get_permuted_peers(self, service_name, key):
+ """Return an ordered list of (peerid, rref) tuples."""
+ # TODO: flags like add-myself-at-beginning and remove-myself? maybe
+ # not.
+
+ results = []
+ for (c_peerid, c_service_name, rref) in self._connections:
+ assert isinstance(c_peerid, str)
+ if c_service_name != service_name:
+ continue
+ #if not include_myself and peerid == self.nodeid:
+ # self.log("get_permuted_peers: removing myself from the list")
+ # continue
+ permuted = sha.new(key + c_peerid).digest()
+ results.append((permuted, c_peerid, rref))
+
+ results.sort(lambda a,b: cmp(a[0], b[0]))
+ return [ (r[1], r[2]) for r in results ]
+
+ def _TODO__add_ourselves(self, partial_peerlist, peerlist):
+ # moved here from mutable.Publish
+ my_peerid = self._node._client.nodeid
+ for (permutedid, peerid, conn) in partial_peerlist:
+ if peerid == my_peerid:
+ # we're already in there
+ return partial_peerlist
+ for (permutedid, peerid, conn) in peerlist:
+ if peerid == self._node._client.nodeid:
+ # found it
+ partial_peerlist.append( (permutedid, peerid, conn) )
+ return partial_peerlist
+ self.log("we aren't in our own peerlist??", level=log.WEIRD)
+ return partial_peerlist
+
+
+
+ def remote_set_encoding_parameters(self, parameters):
+ self.encoding_parameters = parameters
+
+ def connected_to_introducer(self):
+ return self._connected
+
+ def debug_disconnect_from_peerid(self, victim_nodeid):
+ # for unit tests: locate and sever all connections to the given
+ # peerid.
+ for (nodeid, service_name, rref) in self._connections:
+ if nodeid == victim_nodeid:
+ rref.tracker.broker.transport.loseConnection()
def _choose_initial_peers(self, numqueries):
n = self._node
- full_peerlist = n._client.get_permuted_peers(self._storage_index,
- include_myself=True)
+ full_peerlist = n._client.get_permuted_peers("storage",
+ self._storage_index)
+ # TODO: include_myself=True
+
# _peerlist is a list of (peerid,conn) tuples for peers that are
# worth talking too. This starts with the first numqueries in the
# permuted list. If that's not enough to get us a recoverable
# version, we expand this to include the first 2*total_shares peerids
# (assuming we learn what total_shares is from one of the first
# numqueries peers)
- self._peerlist = [(p[1],p[2])
- for p in islice(full_peerlist, numqueries)]
+ self._peerlist = [p for p in islice(full_peerlist, numqueries)]
# _peerlist_limit is the query limit we used to build this list. If
# we later increase this limit, it may be useful to re-scan the
# permuted list.
self._queries_outstanding = set()
self._used_peers = set()
self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
- self._peer_storage_servers = {}
dl = []
- for (peerid, conn) in peerlist:
+ for (peerid, ss) in peerlist:
self._queries_outstanding.add(peerid)
- self._do_query(conn, peerid, self._storage_index, self._read_size,
- self._peer_storage_servers)
+ self._do_query(ss, peerid, self._storage_index, self._read_size)
# control flow beyond this point: state machine. Receiving responses
# from queries is the input. We might send out more queries, or we
# might produce a result.
return None
- def _do_query(self, conn, peerid, storage_index, readsize,
- peer_storage_servers):
+ def _do_query(self, ss, peerid, storage_index, readsize):
self._queries_outstanding.add(peerid)
- if peerid in peer_storage_servers:
- d = defer.succeed(peer_storage_servers[peerid])
- else:
- d = conn.callRemote("get_service", "storageserver")
- def _got_storageserver(ss):
- peer_storage_servers[peerid] = ss
- return ss
- d.addCallback(_got_storageserver)
- d.addCallback(lambda ss: ss.callRemote("slot_readv", storage_index,
- [], [(0, readsize)]))
- d.addCallback(self._got_results, peerid, readsize,
- (conn, storage_index, peer_storage_servers))
+ d = ss.callRemote("slot_readv", storage_index, [], [(0, readsize)])
+ d.addCallback(self._got_results, peerid, readsize, (ss, storage_index))
d.addErrback(self._query_failed, peerid)
# errors that aren't handled by _query_failed (and errors caused by
# _query_failed) get logged, but we still want to check for doneness.
# TODO: for MDMF, sanity-check self._read_size: don't let one
# server cause us to try to read gigabytes of data from all
# other servers.
- (conn, storage_index, peer_storage_servers) = stuff
- self._do_query(conn, peerid, storage_index, self._read_size,
- peer_storage_servers)
+ (ss, storage_index) = stuff
+ self._do_query(ss, peerid, storage_index, self._read_size)
return
except CorruptShareError, e:
# log it and give the other shares a chance to be processed
self.log("search_distance=%d" % search_distance, level=log.UNUSUAL)
if self._peerlist_limit < search_distance:
# we might be able to get some more peers from the list
- peers = self._node._client.get_permuted_peers(self._storage_index,
- include_myself=True)
- self._peerlist = [(p[1],p[2])
- for p in islice(peers, search_distance)]
+ peers = self._node._client.get_permuted_peers("storage",
+ self._storage_index)
+ # TODO: include_myself=True
+ self._peerlist = [p for p in islice(peers, search_distance)]
self._peerlist_limit = search_distance
self.log("added peers, peerlist=%d, peerlist_limit=%d"
% (len(self._peerlist), self._peerlist_limit),
level=log.UNUSUAL)
# are there any peers on the list that we haven't used?
new_query_peers = []
- for (peerid, conn) in self._peerlist:
+ for (peerid, ss) in self._peerlist:
if peerid not in self._used_peers:
- new_query_peers.append( (peerid, conn) )
+ new_query_peers.append( (peerid, ss) )
if len(new_query_peers) > 5:
# only query in batches of 5. TODO: this is pretty
# arbitrary, really I want this to be something like
if new_query_peers:
self.log("sending %d new queries (read %d bytes)" %
(len(new_query_peers), self._read_size), level=log.UNUSUAL)
- for (peerid, conn) in new_query_peers:
- self._do_query(conn, peerid,
- self._storage_index, self._read_size,
- self._peer_storage_servers)
+ for (peerid, ss) in new_query_peers:
+ self._do_query(ss, peerid, self._storage_index, self._read_size)
# we'll retrigger when those queries come back
return
# the share we use for ourselves didn't count against the N total..
# maybe use N+1 if we find ourselves in the permuted list?
- peerlist = self._node._client.get_permuted_peers(storage_index,
- include_myself=True)
+ peerlist = self._node._client.get_permuted_peers("storage",
+ storage_index)
+ # make sure our local server is in the list
+ # TODO: include_myself_at_beginning=True
current_share_peers = DictOfSets()
reachable_peers = {}
- # list of (peerid, offset, length) where the encprivkey might be found
+ # list of (peerid, shnum, offset, length) where the encprivkey might
+ # be found
self._encprivkey_shares = []
EPSILON = total_shares / 2
#partial_peerlist = islice(peerlist, total_shares + EPSILON)
partial_peerlist = peerlist[:total_shares+EPSILON]
- # make sure our local server is in the list
- partial_peerlist = self._add_ourselves(partial_peerlist, peerlist)
+ self._storage_servers = {}
- peer_storage_servers = {}
dl = []
- for (permutedid, peerid, conn) in partial_peerlist:
- d = self._do_query(conn, peerid, peer_storage_servers,
- storage_index)
+ for permutedid, (peerid, ss) in enumerate(partial_peerlist):
+ self._storage_servers[peerid] = ss
+ d = self._do_query(ss, peerid, storage_index)
d.addCallback(self._got_query_results,
peerid, permutedid,
reachable_peers, current_share_peers)
d = defer.DeferredList(dl)
d.addCallback(self._got_all_query_results,
total_shares, reachable_peers,
- current_share_peers, peer_storage_servers)
+ current_share_peers)
# TODO: add an errback to, probably to ignore that peer
# TODO: if we can't get a privkey from these servers, consider
# looking farther afield. Make sure we include ourselves in the
# but ourselves.
return d
- def _add_ourselves(self, partial_peerlist, peerlist):
- my_peerid = self._node._client.nodeid
- for (permutedid, peerid, conn) in partial_peerlist:
- if peerid == my_peerid:
- # we're already in there
- return partial_peerlist
- for (permutedid, peerid, conn) in peerlist:
- if peerid == self._node._client.nodeid:
- # found it
- partial_peerlist.append( (permutedid, peerid, conn) )
- return partial_peerlist
- self.log("we aren't in our own peerlist??", level=log.WEIRD)
- return partial_peerlist
-
- def _do_query(self, conn, peerid, peer_storage_servers, storage_index):
+ def _do_query(self, ss, peerid, storage_index):
self.log("querying %s" % idlib.shortnodeid_b2a(peerid))
- d = conn.callRemote("get_service", "storageserver")
- def _got_storageserver(ss):
- peer_storage_servers[peerid] = ss
- return ss.callRemote("slot_readv",
- storage_index, [], [(0, self._read_size)])
- d.addCallback(_got_storageserver)
+ d = ss.callRemote("slot_readv",
+ storage_index, [], [(0, self._read_size)])
return d
def _got_query_results(self, datavs, peerid, permutedid,
# files (since the privkey will be small enough to fit in the
# write cap).
- self._encprivkey_shares.append( (peerid, shnum, offset, length) )
+ self._encprivkey_shares.append( (peerid, shnum, offset, length))
return
(seqnum, root_hash, IV, k, N, segsize, datalen,
def _got_all_query_results(self, res,
total_shares, reachable_peers,
- current_share_peers, peer_storage_servers):
+ current_share_peers):
self.log("_got_all_query_results")
# now that we know everything about the shares currently out there,
# decide where to place the new shares.
assert not shares_needing_homes
- target_info = (target_map, shares_per_peer, peer_storage_servers)
+ target_info = (target_map, shares_per_peer)
return target_info
def _obtain_privkey(self, target_info):
# peers one at a time until we get a copy. Only bother asking peers
# who've admitted to holding a share.
- target_map, shares_per_peer, peer_storage_servers = target_info
+ target_map, shares_per_peer = target_info
# pull shares from self._encprivkey_shares
if not self._encprivkey_shares:
raise NotEnoughPeersError("Unable to find a copy of the privkey")
(peerid, shnum, offset, length) = self._encprivkey_shares.pop(0)
+ ss = self._storage_servers[peerid]
self.log("trying to obtain privkey from %s shnum %d" %
(idlib.shortnodeid_b2a(peerid), shnum))
- d = self._do_privkey_query(peer_storage_servers[peerid], peerid,
- shnum, offset, length)
+ d = self._do_privkey_query(ss, peerid, shnum, offset, length)
d.addErrback(self.log_err)
d.addCallback(lambda res: self._obtain_privkey(target_info))
return d
# surprises here are *not* indications of UncoordinatedWriteError,
# and we'll need to respond to them more gracefully.)
- target_map, shares_per_peer, peer_storage_servers = target_info
+ target_map, shares_per_peer = target_info
my_checkstring = pack_checkstring(seqnum, root_hash, IV)
peer_messages = {}
cancel_secret = self._node.get_cancel_secret(peerid)
secrets = (write_enabler, renew_secret, cancel_secret)
- d = self._do_testreadwrite(peerid, peer_storage_servers, secrets,
+ d = self._do_testreadwrite(peerid, secrets,
tw_vectors, read_vector)
d.addCallback(self._got_write_answer, tw_vectors, my_checkstring,
peerid, expected_old_shares[peerid], dispatch_map)
d.addCallback(lambda res: (self._surprised, dispatch_map))
return d
- def _do_testreadwrite(self, peerid, peer_storage_servers, secrets,
+ def _do_testreadwrite(self, peerid, secrets,
tw_vectors, read_vector):
- conn = peer_storage_servers[peerid]
storage_index = self._node._uri.storage_index
+ ss = self._storage_servers[peerid]
- d = conn.callRemote("slot_testv_and_readv_and_writev",
- storage_index,
- secrets,
- tw_vectors,
- read_vector)
+ d = ss.callRemote("slot_testv_and_readv_and_writev",
+ storage_index,
+ secrets,
+ tw_vectors,
+ read_vector)
return d
def _got_write_answer(self, answer, tw_vectors, my_checkstring,
def _get_all_shareholders(self, storage_index):
dl = []
- for (pmpeerid, peerid, connection) in self._peer_getter(storage_index):
- d = connection.callRemote("get_service", "storageserver")
- d.addCallback(lambda ss: ss.callRemote("get_buckets",
- storage_index))
+ for (peerid, ss) in self._peer_getter("storage", storage_index):
+ d = ss.callRemote("get_buckets", storage_index)
d.addCallbacks(self._got_response, self._got_error,
callbackArgs=(peerid,))
dl.append(d)
BadWriteEnablerError, IStatsProducer
from allmydata.util import fileutil, idlib, mathutil, log
from allmydata.util.assertutil import precondition, _assert
+import allmydata # for __version__
class DataTooLargeError(Exception):
pass
implements(RIStorageServer, IStatsProducer)
name = 'storageserver'
- def __init__(self, storedir, sizelimit=None, no_storage=False, stats_provider=None):
+ # we're pretty narrow-minded right now
+ OLDEST_SUPPORTED_VERSION = allmydata.__version__
+
+ def __init__(self, storedir, sizelimit=None,
+ discard_storage=False, readonly_storage=False,
+ stats_provider=None):
service.MultiService.__init__(self)
self.storedir = storedir
sharedir = os.path.join(storedir, "shares")
fileutil.make_dirs(sharedir)
self.sharedir = sharedir
self.sizelimit = sizelimit
- self.no_storage = no_storage
+ self.no_storage = discard_storage
+ self.readonly_storage = readonly_storage
self.stats_provider = stats_provider
if self.stats_provider:
self.stats_provider.register_producer(self)
self._clean_incomplete()
fileutil.make_dirs(self.incomingdir)
self._active_writers = weakref.WeakKeyDictionary()
+ lp = log.msg("StorageServer created, now measuring space..",
+ facility="tahoe.storage")
self.measure_size()
+ log.msg(format="space measurement done, consumed=%(consumed)d bytes",
+ consumed=self.consumed,
+ parent=lp, facility="tahoe.storage")
def log(self, *args, **kwargs):
- if self.parent:
- return self.parent.log(*args, **kwargs)
- return
+ if "facility" not in kwargs:
+ kwargs["facility"] = "tahoe.storage"
+ return log.msg(*args, **kwargs)
def setNodeID(self, nodeid):
# somebody must set this before any slots can be created or leases
space += bw.allocated_size()
return space
+ def remote_get_versions(self):
+ return (str(allmydata.__version__), str(self.OLDEST_SUPPORTED_VERSION))
+
def remote_allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
sf = ShareFile(fn)
sf.add_or_renew_lease(lease_info)
+ if self.readonly_storage:
+ # we won't accept new shares
+ return alreadygot, bucketwriters
+
for shnum in sharenums:
incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
class FakeIntroducerClient(introducer.IntroducerClient):
def __init__(self):
- self.connections = {}
-
-def permute(c, key):
- return [ y for x, y, z in c.get_permuted_peers(key) ]
+ self._connections = set()
+ def add_peer(self, nodeid):
+ entry = (nodeid, "storage", "rref")
+ self._connections.add(entry)
+ def remove_all_peers(self):
+ self._connections.clear()
class Basic(unittest.TestCase):
def test_loadable(self):
self.failUnlessEqual(c.getServiceNamed("storageserver").sizelimit,
None)
+ def _permute(self, c, key):
+ return [ peerid
+ for (peerid,rref) in c.get_permuted_peers("storage", key) ]
+
def test_permute(self):
basedir = "test_client.Basic.test_permute"
os.mkdir(basedir)
c = client.Client(basedir)
c.introducer_client = FakeIntroducerClient()
for k in ["%d" % i for i in range(5)]:
- c.introducer_client.connections[k] = None
- self.failUnlessEqual(permute(c, "one"), ['3','1','0','4','2'])
- self.failUnlessEqual(permute(c, "two"), ['0','4','2','1','3'])
- c.introducer_client.connections.clear()
- self.failUnlessEqual(permute(c, "one"), [])
+ c.introducer_client.add_peer(k)
+
+ self.failUnlessEqual(self._permute(c, "one"), ['3','1','0','4','2'])
+ self.failUnlessEqual(self._permute(c, "two"), ['0','4','2','1','3'])
+ c.introducer_client.remove_all_peers()
+ self.failUnlessEqual(self._permute(c, "one"), [])
c2 = client.Client(basedir)
c2.introducer_client = FakeIntroducerClient()
for k in ["%d" % i for i in range(5)]:
- c2.introducer_client.connections[k] = None
- self.failUnlessEqual(permute(c2, "one"), ['3','1','0','4','2'])
+ c2.introducer_client.add_peer(k)
+ self.failUnlessEqual(self._permute(c2, "one"), ['3','1','0','4','2'])
def test_versions(self):
basedir = "test_client.Basic.test_versions"
open(os.path.join(basedir, "introducer.furl"), "w").write("")
open(os.path.join(basedir, "vdrive.furl"), "w").write("")
c = client.Client(basedir)
- mine, oldest = c.remote_get_versions()
+ ss = c.getServiceNamed("storageserver")
+ mine, oldest = ss.remote_get_versions()
self.failUnlessEqual(mine, str(allmydata.__version__))
self.failIfEqual(str(allmydata.__version__), "unknown")
self.failUnless("." in str(allmydata.__version__),
return True
def get_encoding_parameters(self):
return self.DEFAULT_ENCODING_PARAMETERS
- def get_permuted_peers(self, storage_index):
+ def get_permuted_peers(self, service_name, storage_index):
return []
def flush_but_dont_ignore(res):
-from base64 import b32encode
+from base64 import b32decode
import os
from twisted.trial import unittest
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from twisted.python import log
from foolscap import Tub, Referenceable
def test_create(self):
- ic = IntroducerClient(None, "introducer", "myfurl")
- def _ignore(nodeid, rref):
- pass
- ic.notify_on_new_connection(_ignore)
+ ic = IntroducerClient(None, "introducer.furl", "my_nickname",
+ "my_version", "oldest_version")
def test_listen(self):
i = IntroducerService()
i = IntroducerService()
i.setServiceParent(self.parent)
- iurl = tub.registerReference(i)
+ introducer_furl = tub.registerReference(i)
NUMCLIENTS = 5
# we have 5 clients who publish themselves, and an extra one which
# does not. When the connections are fully established, all six nodes
n = FakeNode()
log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
+ c = IntroducerClient(tub, introducer_furl,
+ "nickname-%d" % i, "version", "oldest")
if i < NUMCLIENTS:
node_furl = tub.registerReference(n)
- else:
- node_furl = None
- c = IntroducerClient(tub, iurl, node_furl)
+ c.publish(node_furl, "storage", "ri_name")
+ # the last one does not publish anything
+
+ c.subscribe_to("storage")
c.setServiceParent(self.parent)
clients.append(c)
tubs[c] = tub
- def _wait_for_all_connections(res):
- dl = [] # list of when_enough_peers() for each peer
- # will fire once everybody is connected
+ def _wait_for_all_connections():
for c in clients:
- dl.append(c.when_enough_peers(NUMCLIENTS))
- return defer.DeferredList(dl, fireOnOneErrback=True)
-
- d = _wait_for_all_connections(None)
+ if len(c.get_all_connections()) < NUMCLIENTS:
+ return False
+ return True
+ d = self.poll(_wait_for_all_connections, timeout=5)
def _check1(res):
log.msg("doing _check1")
for c in clients:
- self.failUnlessEqual(len(c.connections), NUMCLIENTS)
- self.failUnless(c._connected) # to the introducer
+ self.failUnless(c.connected_to_introducer())
+ self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
+ self.failUnlessEqual(len(c.get_all_peerids()), NUMCLIENTS)
+ self.failUnlessEqual(len(c.get_all_connections_for("storage")),
+ NUMCLIENTS)
d.addCallback(_check1)
+
origin_c = clients[0]
def _disconnect_somebody_else(res):
# now disconnect somebody's connection to someone else
- # find a target that is not themselves
- for nodeid,rref in origin_c.connections.items():
- if b32encode(nodeid).lower() != tubs[origin_c].tubID:
- victim = rref
- break
- log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
- victim.tracker.broker.transport.loseConnection()
+ current_counter = origin_c.counter
+ victim_nodeid = b32decode(tubs[clients[1]].tubID.upper())
+ log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID,
+ victim_nodeid))
+ origin_c.debug_disconnect_from_peerid(victim_nodeid)
log.msg(" did disconnect")
+
+ # then wait until something changes, which ought to be them
+ # noticing the loss
+ def _compare():
+ return current_counter != origin_c.counter
+ return self.poll(_compare, timeout=5)
+
d.addCallback(_disconnect_somebody_else)
- def _wait_til_he_notices(res):
- # wait til the origin_c notices the loss
- log.msg(" waiting until peer notices the disconnection")
- return origin_c.when_fewer_than_peers(NUMCLIENTS)
- d.addCallback(_wait_til_he_notices)
- d.addCallback(_wait_for_all_connections)
+
+ # and wait for them to reconnect
+ d.addCallback(lambda res: self.poll(_wait_for_all_connections, timeout=5))
def _check2(res):
log.msg("doing _check2")
for c in clients:
- self.failUnlessEqual(len(c.connections), NUMCLIENTS)
+ self.failUnlessEqual(len(c.get_all_connections()), NUMCLIENTS)
d.addCallback(_check2)
+
def _disconnect_yourself(res):
# now disconnect somebody's connection to themselves.
- # find a target that *is* themselves
- for nodeid,rref in origin_c.connections.items():
- if b32encode(nodeid).lower() == tubs[origin_c].tubID:
- victim = rref
- break
- log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID, victim))
- victim.tracker.broker.transport.loseConnection()
+ current_counter = origin_c.counter
+ victim_nodeid = b32decode(tubs[clients[0]].tubID.upper())
+ log.msg(" disconnecting %s->%s" % (tubs[origin_c].tubID,
+ victim_nodeid))
+ origin_c.debug_disconnect_from_peerid(victim_nodeid)
log.msg(" did disconnect from self")
+
+ def _compare():
+ return current_counter != origin_c.counter
+ return self.poll(_compare, timeout=5)
d.addCallback(_disconnect_yourself)
- d.addCallback(_wait_til_he_notices)
- d.addCallback(_wait_for_all_connections)
+
+ d.addCallback(lambda res: self.poll(_wait_for_all_connections, timeout=5))
def _check3(res):
log.msg("doing _check3")
for c in clients:
- self.failUnlessEqual(len(c.connections), NUMCLIENTS)
+ self.failUnlessEqual(len(c.get_all_connections_for("storage")),
+ NUMCLIENTS)
d.addCallback(_check3)
def _shutdown_introducer(res):
# now shut down the introducer. We do this by shutting down the
log.msg("shutting down the introducer")
return self.central_tub.disownServiceParent()
d.addCallback(_shutdown_introducer)
- d.addCallback(self.stall, 2)
+ def _wait_for_introducer_loss():
+ for c in clients:
+ if c.connected_to_introducer():
+ return False
+ return True
+ d.addCallback(lambda res: self.poll(_wait_for_introducer_loss, timeout=5))
+
def _check4(res):
log.msg("doing _check4")
for c in clients:
- self.failUnlessEqual(len(c.connections), NUMCLIENTS)
- self.failIf(c._connected)
+ self.failUnlessEqual(len(c.get_all_connections_for("storage")),
+ NUMCLIENTS)
+ self.failIf(c.connected_to_introducer())
d.addCallback(_check4)
return d
- test_system.timeout = 2400
-
- def stall(self, res, timeout):
- d = defer.Deferred()
- reactor.callLater(timeout, d.callback, res)
- return d
-
- def test_system_this_one_breaks(self):
- # this uses a single Tub, which has a strong effect on the
- # failingness
- tub = Tub()
- tub.setOption("logLocalFailures", True)
- tub.setOption("logRemoteFailures", True)
- tub.setServiceParent(self.parent)
- l = tub.listenOn("tcp:0")
- portnum = l.getPortnum()
- tub.setLocation("localhost:%d" % portnum)
-
- i = IntroducerService()
- i.setServiceParent(self.parent)
- iurl = tub.registerReference(i)
-
- clients = []
- for i in range(5):
- n = FakeNode()
- node_furl = tub.registerReference(n)
- c = IntroducerClient(tub, iurl, node_furl)
- c.setServiceParent(self.parent)
- clients.append(c)
-
- # time passes..
- d = defer.Deferred()
- def _check(res):
- log.msg("doing _check")
- self.failUnlessEqual(len(clients[0].connections), 5)
- d.addCallback(_check)
- reactor.callLater(2, d.callback, None)
- return d
- del test_system_this_one_breaks
-
-
- def test_system_this_one_breaks_too(self):
- # this one shuts down so quickly that it fails in a different way
- self.central_tub = tub = Tub()
- tub.setOption("logLocalFailures", True)
- tub.setOption("logRemoteFailures", True)
- tub.setServiceParent(self.parent)
- l = tub.listenOn("tcp:0")
- portnum = l.getPortnum()
- tub.setLocation("localhost:%d" % portnum)
-
- i = IntroducerService()
- i.setServiceParent(self.parent)
- iurl = tub.registerReference(i)
-
- clients = []
- for i in range(5):
- tub = Tub()
- tub.setOption("logLocalFailures", True)
- tub.setOption("logRemoteFailures", True)
- tub.setServiceParent(self.parent)
- l = tub.listenOn("tcp:0")
- portnum = l.getPortnum()
- tub.setLocation("localhost:%d" % portnum)
- n = FakeNode()
- node_furl = tub.registerReference(n)
- c = IntroducerClient(tub, iurl, node_furl)
- c.setServiceParent(self.parent)
- clients.append(c)
-
- # time passes..
- d = defer.Deferred()
- reactor.callLater(0.01, d.callback, None)
- def _check(res):
- log.msg("doing _check")
- self.fail("BOOM")
- for c in clients:
- self.failUnlessEqual(len(c.connections), 5)
- c.connections.values()[0].tracker.broker.transport.loseConnection()
- return self.stall(None, 2)
- d.addCallback(_check)
- def _check_again(res):
- log.msg("doing _check_again")
- for c in clients:
- self.failUnlessEqual(len(c.connections), 5)
- d.addCallback(_check_again)
- return d
- del test_system_this_one_breaks_too
return defer.succeed(None)
class FakePublish(mutable.Publish):
- def _do_query(self, conn, peerid, peer_storage_servers, storage_index):
- assert conn[0] == peerid
+ def _do_query(self, ss, peerid, storage_index):
+ assert ss[0] == peerid
shares = self._peers[peerid]
return defer.succeed(shares)
- def _do_testreadwrite(self, peerid, peer_storage_servers, secrets,
+ def _do_testreadwrite(self, peerid, secrets,
tw_vectors, read_vector):
# always-pass: parrot the test vectors back to them.
readv = {}
res = FakeFilenode(self).init_from_uri(u)
return res
- def get_permuted_peers(self, key, include_myself=True):
+ def get_permuted_peers(self, service_name, key):
+ # TODO: include_myself=True
"""
- @return: list of (permuted-peerid, peerid, connection,)
+ @return: list of (peerid, connection,)
"""
peers_and_connections = [(pid, (pid,)) for pid in self._peerids]
results = []
permuted = sha.new(key + peerid).digest()
results.append((permuted, peerid, connection))
results.sort()
+ results = [ (r[1],r[2]) for r in results]
return results
def upload(self, uploadable):
total_shares = 10
d = p._query_peers(total_shares)
def _done(target_info):
- (target_map, shares_per_peer, peer_storage_servers) = target_info
+ (target_map, shares_per_peer) = target_info
shares_per_peer = {}
for shnum in target_map:
for (peerid, old_seqnum, old_R) in target_map[shnum]:
total_shares = 10
d = p._query_peers(total_shares)
def _done(target_info):
- (target_map, shares_per_peer, peer_storage_servers) = target_info
+ (target_map, shares_per_peer) = target_info
shares_per_peer = {}
for shnum in target_map:
for (peerid, old_seqnum, old_R) in target_map[shnum]:
enough to not fit inside a LIT uri.
"""
-class SystemTest(testutil.SignalMixin, unittest.TestCase):
+class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
def setUp(self):
self.sparent = service.MultiService()
d.addCallback(lambda res: c)
return d
+ def _check_connections(self):
+ for c in self.clients:
+ ic = c.introducer_client
+ if not ic.connected_to_introducer():
+ return False
+ if len(ic.get_all_peerids()) != self.numclients:
+ return False
+ return True
+
def wait_for_connections(self, ignored=None):
# TODO: replace this with something that takes a list of peerids and
# fires when they've all been heard from, instead of using a count
# and a threshold
- for c in self.clients:
- if (not c.introducer_client or
- len(list(c.get_all_peerids())) != self.numclients):
- d = defer.Deferred()
- d.addCallback(self.wait_for_connections)
- reactor.callLater(0.05, d.callback, None)
- return d
- return defer.succeed(None)
+ return self.poll(self._check_connections, timeout=200)
def test_connections(self):
self.basedir = "system/SystemTest/test_connections"
for c in self.clients:
all_peerids = list(c.get_all_peerids())
self.failUnlessEqual(len(all_peerids), self.numclients+1)
- permuted_peers = list(c.get_permuted_peers("a", True))
+ permuted_peers = list(c.get_permuted_peers("storage", "a"))
self.failUnlessEqual(len(permuted_peers), self.numclients+1)
- permuted_other_peers = list(c.get_permuted_peers("a", False))
- self.failUnlessEqual(len(permuted_other_peers), self.numclients)
d.addCallback(_check)
def _shutdown_extra_node(res):
for c in self.clients:
all_peerids = list(c.get_all_peerids())
self.failUnlessEqual(len(all_peerids), self.numclients)
- permuted_peers = list(c.get_permuted_peers("a", True))
+ permuted_peers = list(c.get_permuted_peers("storage", "a"))
self.failUnlessEqual(len(permuted_peers), self.numclients)
- permuted_other_peers = list(c.get_permuted_peers("a", False))
- self.failUnlessEqual(len(permuted_other_peers), self.numclients-1)
d.addCallback(_check_connections)
def _do_upload(res):
log.msg("UPLOADING")
def _download_nonexistent_uri(res):
baduri = self.mangle_uri(self.uri)
+ log.msg("about to download non-existent URI", level=log.UNUSUAL,
+ facility="tahoe.tests")
d1 = self.downloader.download_to_data(baduri)
def _baduri_should_fail(res):
+ log.msg("finished downloading non-existend URI",
+ level=log.UNUSUAL, facility="tahoe.tests")
self.failUnless(isinstance(res, Failure))
self.failUnless(res.check(download.NotEnoughPeersError),
"expected NotEnoughPeersError, got %s" % res)
d.addCallback(self.log, "GOT WEB LISTENER")
return d
- def log(self, res, msg):
+ def log(self, res, msg, **kwargs):
# print "MSG: %s RES: %s" % (msg, res)
- log.msg(msg)
+ log.msg(msg, **kwargs)
return res
def stall(self, res, delay=1.0):
d.addCallback(_got_from_uri)
# download from a bogus URI, make sure we get a reasonable error
- d.addCallback(self.log, "_get_from_bogus_uri")
+ d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
def _get_from_bogus_uri(res):
d1 = getPage(base + "uri/%s?filename=%s"
% (self.mangle_uri(self.uri), "mydata567"))
"410")
return d1
d.addCallback(_get_from_bogus_uri)
+ d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
# upload a file with PUT
d.addCallback(self.log, "about to try PUT")
peers = set()
for shpeers in sharemap.values():
peers.update(shpeers)
- self.failUnlessEqual(len(peers), self.numclients-1)
+ self.failUnlessEqual(len(peers), self.numclients)
d.addCallback(_check_checker_results)
def _check_stored_results(res):
from twisted.trial import unittest
from twisted.python.failure import Failure
from twisted.python import log
-from twisted.internet import defer
from cStringIO import StringIO
from allmydata import upload, encode, uri
d.addCallback(lambda res: u.close())
return d
-class FakePeer:
- def __init__(self, mode="good"):
- self.ss = FakeStorageServer(mode)
-
- def callRemote(self, methname, *args, **kwargs):
- def _call():
- meth = getattr(self, methname)
- return meth(*args, **kwargs)
- return defer.maybeDeferred(_call)
-
- def get_service(self, sname):
- assert sname == "storageserver"
- return self.ss
-
class FakeStorageServer:
def __init__(self, mode):
self.mode = mode
def log(self, *args, **kwargs):
pass
def get_permuted_peers(self, storage_index, include_myself):
- peers = [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),)
+ peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
for fakeid in range(self.num_servers) ]
- self.last_peers = [p[2] for p in peers]
+ self.last_peers = [p[1] for p in peers]
return peers
def get_push_to_ourselves(self):
return None
d.addCallback(self._check_large, SIZE_LARGE)
def _check(res):
for p in self.node.last_peers:
- allocated = p.ss.allocated
+ allocated = p.allocated
self.failUnlessEqual(len(allocated), 1)
- self.failUnlessEqual(p.ss.queries, 1)
+ self.failUnlessEqual(p.queries, 1)
d.addCallback(_check)
return d
d.addCallback(self._check_large, SIZE_LARGE)
def _check(res):
for p in self.node.last_peers:
- allocated = p.ss.allocated
+ allocated = p.allocated
self.failUnlessEqual(len(allocated), 2)
- self.failUnlessEqual(p.ss.queries, 2)
+ self.failUnlessEqual(p.queries, 2)
d.addCallback(_check)
return d
got_one = []
got_two = []
for p in self.node.last_peers:
- allocated = p.ss.allocated
+ allocated = p.allocated
self.failUnless(len(allocated) in (1,2), len(allocated))
if len(allocated) == 1:
- self.failUnlessEqual(p.ss.queries, 1)
+ self.failUnlessEqual(p.queries, 1)
got_one.append(p)
else:
- self.failUnlessEqual(p.ss.queries, 2)
+ self.failUnlessEqual(p.queries, 2)
got_two.append(p)
self.failUnlessEqual(len(got_one), 49)
self.failUnlessEqual(len(got_two), 1)
d.addCallback(self._check_large, SIZE_LARGE)
def _check(res):
for p in self.node.last_peers:
- allocated = p.ss.allocated
+ allocated = p.allocated
self.failUnlessEqual(len(allocated), 4)
- self.failUnlessEqual(p.ss.queries, 2)
+ self.failUnlessEqual(p.queries, 2)
d.addCallback(_check)
return d
def _check(res):
counts = {}
for p in self.node.last_peers:
- allocated = p.ss.allocated
+ allocated = p.allocated
counts[len(allocated)] = counts.get(len(allocated), 0) + 1
histogram = [counts.get(i, 0) for i in range(5)]
self.failUnlessEqual(histogram, [0,0,0,2,1])
EXTENSION_SIZE = 1000
class PeerTracker:
- def __init__(self, peerid, permutedid, connection,
+ def __init__(self, peerid, storage_server,
sharesize, blocksize, num_segments, num_share_hashes,
storage_index,
bucket_renewal_secret, bucket_cancel_secret):
precondition(isinstance(peerid, str), peerid)
precondition(len(peerid) == 20, peerid)
self.peerid = peerid
- self.permutedid = permutedid
- self.connection = connection # to an RIClient
+ self._storageserver = storage_server # to an RIStorageServer
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
self.sharesize = sharesize
- #print "PeerTracker", peerid, permutedid, sharesize
+ #print "PeerTracker", peerid, sharesize
as = storage.allocated_size(sharesize,
num_segments,
num_share_hashes,
self.num_segments = num_segments
self.num_share_hashes = num_share_hashes
self.storage_index = storage_index
- self._storageserver = None
self.renew_secret = bucket_renewal_secret
self.cancel_secret = bucket_cancel_secret
idlib.b2a(self.storage_index)[:6]))
def query(self, sharenums):
- if not self._storageserver:
- d = self.connection.callRemote("get_service", "storageserver")
- d.addCallback(self._got_storageserver)
- d.addCallback(lambda res: self._query(sharenums))
- return d
- return self._query(sharenums)
- def _got_storageserver(self, storageserver):
- self._storageserver = storageserver
- def _query(self, sharenums):
#print " query", self.peerid, len(sharenums)
d = self._storageserver.callRemote("allocate_buckets",
self.storage_index,
self.use_peers = set() # PeerTrackers that have shares assigned to them
self.preexisting_shares = {} # sharenum -> PeerTracker holding the share
- peers = client.get_permuted_peers(storage_index, push_to_ourselves)
+ peers = client.get_permuted_peers("storage", storage_index)
+ # TODO: push_to_ourselves
if not peers:
raise encode.NotEnoughPeersError("client gave us zero peers")
file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
storage_index)
- trackers = [ PeerTracker(peerid, permutedid, conn,
+ trackers = [ PeerTracker(peerid, conn,
share_size, block_size,
num_segments, num_share_hashes,
storage_index,
bucket_cancel_secret_hash(file_cancel_secret,
peerid),
)
- for permutedid, peerid, conn in peers ]
+ for (peerid, conn) in peers ]
self.uncontacted_peers = trackers
d = defer.maybeDeferred(self._loop)