# 6: implement other sorts of IStorageClient classes: S3, etc
-import time
-from zope.interface import implements, Interface
+import re, time
+from zope.interface import implements
from foolscap.api import eventually
-from allmydata.interfaces import IStorageBroker
-from allmydata.util import idlib, log
+from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer
+from allmydata.util import log, base32
from allmydata.util.assertutil import precondition
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.util.hashutil import sha1
I'm also responsible for subscribing to the IntroducerClient to find out
about new servers as they are announced by the Introducer.
"""
- def __init__(self, tub, permute_peers):
+ def __init__(self, tub, permute_peers, preferred_peers=()):
self.tub = tub
assert permute_peers # False not implemented yet
self.permute_peers = permute_peers
+ self.preferred_peers = preferred_peers
# self.servers maps serverid -> IServer, and keeps track of all the
# storage servers that we've heard about. Each descriptor manages its
# own Reconnector, and will give us a RemoteReference when we ask
self.introducer_client = None
# these two are used in unit tests
- def test_add_rref(self, serverid, rref):
- s = NativeStorageServer(serverid, {})
+ def test_add_rref(self, serverid, rref, ann):
+ s = NativeStorageServer(serverid, ann.copy())
s.rref = rref
+ s._is_connected = True
self.servers[serverid] = s
def test_add_server(self, serverid, s):
self.introducer_client = ic = introducer_client
ic.subscribe_to("storage", self._got_announcement)
- def _got_announcement(self, serverid, ann_d):
- precondition(isinstance(serverid, str), serverid)
- precondition(len(serverid) == 20, serverid)
- assert ann_d["service-name"] == "storage"
+ def _got_announcement(self, key_s, ann):
+ if key_s is not None:
+ precondition(isinstance(key_s, str), key_s)
+ precondition(key_s.startswith("v0-"), key_s)
+ assert ann["service-name"] == "storage"
+ s = NativeStorageServer(key_s, ann)
+ serverid = s.get_serverid()
old = self.servers.get(serverid)
if old:
- if old.get_announcement() == ann_d:
+ if old.get_announcement() == ann:
return # duplicate
# replacement
del self.servers[serverid]
old.stop_connecting()
# now we forget about them and start using the new one
- dsc = NativeStorageServer(serverid, ann_d)
- self.servers[serverid] = dsc
- dsc.start_connecting(self.tub, self._trigger_connections)
+ self.servers[serverid] = s
+ s.start_connecting(self.tub, self._trigger_connections)
# the descriptor will manage their own Reconnector, and each time we
# need servers, we'll ask them if they're connected or not.
def get_servers_for_psi(self, peer_selection_index):
# return a list of server objects (IServers)
assert self.permute_peers == True
+ connected_servers = self.get_connected_servers()
+ preferred_servers = frozenset(s for s in connected_servers if s.get_longname() in self.preferred_peers)
def _permuted(server):
seed = server.get_permutation_seed()
- return sha1(peer_selection_index + seed).digest()
- return sorted(self.get_connected_servers(), key=_permuted)
+ is_unpreferred = server not in preferred_servers
+ return (is_unpreferred, sha1(peer_selection_index + seed).digest())
+ return sorted(connected_servers, key=_permuted)
def get_all_serverids(self):
- serverids = set()
- serverids.update(self.servers.keys())
- return frozenset(serverids)
+ return frozenset(self.servers.keys())
def get_connected_servers(self):
- return frozenset([s for s in self.get_known_servers()
- if s.get_rref()])
+ return frozenset([s for s in self.servers.values() if s.is_connected()])
def get_known_servers(self):
- return sorted(self.servers.values(), key=lambda s: s.get_serverid())
+ return frozenset(self.servers.values())
def get_nickname_for_serverid(self, serverid):
if serverid in self.servers:
return self.servers[serverid].get_nickname()
return None
+ def get_stub_server(self, serverid):
+ if serverid in self.servers:
+ return self.servers[serverid]
+ return StubServer(serverid)
-class IServer(Interface):
- """I live in the client, and represent a single server."""
- def start_connecting(tub, trigger_cb):
- pass
- def get_nickname():
- pass
- def get_rref():
- pass
+class StubServer:
+ implements(IDisplayableServer)
+ def __init__(self, serverid):
+ self.serverid = serverid # binary tubid
+ def get_serverid(self):
+ return self.serverid
+ def get_name(self):
+ return base32.b2a(self.serverid)[:8]
+ def get_longname(self):
+ return base32.b2a(self.serverid)
+ def get_nickname(self):
+ return "?"
class NativeStorageServer:
"""I hold information about a storage server that we want to connect to.
VERSION_DEFAULTS = {
"http://allmydata.org/tahoe/protocols/storage/v1" :
- { "maximum-immutable-share-size": 2**32,
+ { "maximum-immutable-share-size": 2**32 - 1,
+ "maximum-mutable-share-size": 2*1000*1000*1000, # maximum prior to v1.9.2
"tolerates-immutable-read-overrun": False,
"delete-mutable-shares-with-zero-length-writev": False,
+ "available-space": None,
},
"application-version": "unknown: no get_version()",
}
- def __init__(self, serverid, ann_d, min_shares=1):
- self.serverid = serverid
- self._tubid = serverid
- self.announcement = ann_d
- self.min_shares = min_shares
+ def __init__(self, key_s, ann):
+ self.key_s = key_s
+ self.announcement = ann
+
+ assert "anonymous-storage-FURL" in ann, ann
+ furl = str(ann["anonymous-storage-FURL"])
+ m = re.match(r'pb://(\w+)@', furl)
+ assert m, furl
+ tubid_s = m.group(1).lower()
+ self._tubid = base32.a2b(tubid_s)
+ assert "permutation-seed-base32" in ann, ann
+ ps = base32.a2b(str(ann["permutation-seed-base32"]))
+ self._permutation_seed = ps
+
+ if key_s:
+ self._long_description = key_s
+ if key_s.startswith("v0-"):
+ # remove v0- prefix from abbreviated name
+ self._short_description = key_s[3:3+8]
+ else:
+ self._short_description = key_s[:8]
+ else:
+ self._long_description = tubid_s
+ self._short_description = tubid_s[:6]
- self.serverid_s = idlib.shortnodeid_b2a(self.serverid)
self.announcement_time = time.time()
self.last_connect_time = None
self.last_loss_time = None
self.remote_host = None
self.rref = None
+ self._is_connected = False
self._reconnector = None
self._trigger_cb = None
+ # Special methods used by copy.copy() and copy.deepcopy(). When those are
+ # used in allmydata.immutable.filenode to copy CheckResults during
+ # repair, we want it to treat the IServer instances as singletons, and
+ # not attempt to duplicate them..
+ def __copy__(self):
+ return self
+ def __deepcopy__(self, memodict):
+ return self
+
def __repr__(self):
return "<NativeStorageServer for %s>" % self.get_name()
def get_serverid(self):
- return self._tubid
+ return self._tubid # XXX replace with self.key_s
def get_permutation_seed(self):
- return self._tubid
+ return self._permutation_seed
def get_version(self):
if self.rref:
return self.rref.version
return None
def get_name(self): # keep methodname short
- return self.serverid_s
+ # TODO: decide who adds [] in the short description. It should
+ # probably be the output side, not here.
+ return self._short_description
def get_longname(self):
- return idlib.nodeid_b2a(self._tubid)
+ return self._long_description
def get_lease_seed(self):
return self._tubid
def get_foolscap_write_enabler_seed(self):
return self._tubid
def get_nickname(self):
- return self.announcement["nickname"].decode("utf-8")
+ return self.announcement["nickname"]
def get_announcement(self):
return self.announcement
def get_remote_host(self):
return self.remote_host
+ def is_connected(self):
+ return self._is_connected
def get_last_connect_time(self):
return self.last_connect_time
def get_last_loss_time(self):
def get_announcement_time(self):
return self.announcement_time
+ def get_available_space(self):
+ version = self.get_version()
+ if version is None:
+ return None
+ protocol_v1_version = version.get('http://allmydata.org/tahoe/protocols/storage/v1', {})
+ available_space = protocol_v1_version.get('available-space')
+ if available_space is None:
+ available_space = protocol_v1_version.get('maximum-immutable-share-size', None)
+ return available_space
+
def start_connecting(self, tub, trigger_cb):
- furl = self.announcement["FURL"]
+ furl = str(self.announcement["anonymous-storage-FURL"])
self._trigger_cb = trigger_cb
self._reconnector = tub.connectTo(furl, self._got_connection)
self.last_connect_time = time.time()
self.remote_host = rref.getPeer()
self.rref = rref
+ self._is_connected = True
rref.notifyOnDisconnect(self._lost)
def get_rref(self):
log.msg(format="lost connection to %(name)s", name=self.get_name(),
facility="tahoe.storage_broker", umid="zbRllw")
self.last_loss_time = time.time()
- self.rref = None
+ # self.rref is now stale: all callRemote()s will get a
+ # DeadReferenceError. We leave the stale reference in place so that
+ # uploader/downloader code (which received this IServer through
+ # get_connected_servers() or get_servers_for_psi()) can continue to
+ # use s.get_rref().callRemote() and not worry about it being None.
+ self._is_connected = False
self.remote_host = None
def stop_connecting(self):