]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/storage_client.py
new feature: preferred storage servers
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage_client.py
index 69161924bb19b77912af2422c10991c9175e0965..dd9780f23a8c4efd8eefccd1de47cd3d7f92d060 100644 (file)
@@ -28,13 +28,15 @@ the foolscap-based server implemented in src/allmydata/storage/*.py .
 #
 # 6: implement other sorts of IStorageClient classes: S3, etc
 
-import sha, time
-from zope.interface import implements, Interface
+
+import re, time
+from zope.interface import implements
 from foolscap.api import eventually
-from allmydata.interfaces import IStorageBroker
-from allmydata.util import idlib, log
-from allmydata.util.assertutil import _assert, precondition
+from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer
+from allmydata.util import log, base32
+from allmydata.util.assertutil import precondition
 from allmydata.util.rrefutil import add_version_to_remote_reference
+from allmydata.util.hashutil import sha1
 
 # who is responsible for de-duplication?
 #  both?
@@ -60,44 +62,49 @@ class StorageFarmBroker:
     I'm also responsible for subscribing to the IntroducerClient to find out
     about new servers as they are announced by the Introducer.
     """
-    def __init__(self, tub, permute_peers):
+    def __init__(self, tub, permute_peers, preferred_peers=()):
         self.tub = tub
         assert permute_peers # False not implemented yet
         self.permute_peers = permute_peers
-        # self.descriptors maps serverid -> IServerDescriptor, and keeps
-        # track of all the storage servers that we've heard about. Each
-        # descriptor manages its own Reconnector, and will give us a
-        # RemoteReference when we ask them for it.
-        self.descriptors = {}
-        # self.test_servers are statically configured from unit tests
-        self.test_servers = {} # serverid -> rref
+        self.preferred_peers = preferred_peers
+        # self.servers maps serverid -> IServer, and keeps track of all the
+        # storage servers that we've heard about. Each descriptor manages its
+        # own Reconnector, and will give us a RemoteReference when we ask
+        # them for it.
+        self.servers = {}
         self.introducer_client = None
 
     # these two are used in unit tests
-    def test_add_server(self, serverid, rref):
-        self.test_servers[serverid] = rref
-    def test_add_descriptor(self, serverid, dsc):
-        self.descriptors[serverid] = dsc
+    def test_add_rref(self, serverid, rref, ann):
+        s = NativeStorageServer(serverid, ann.copy())
+        s.rref = rref
+        s._is_connected = True
+        self.servers[serverid] = s
+
+    def test_add_server(self, serverid, s):
+        self.servers[serverid] = s
 
     def use_introducer(self, introducer_client):
         self.introducer_client = ic = introducer_client
         ic.subscribe_to("storage", self._got_announcement)
 
-    def _got_announcement(self, serverid, ann_d):
-        precondition(isinstance(serverid, str), serverid)
-        precondition(len(serverid) == 20, serverid)
-        assert ann_d["service-name"] == "storage"
-        old = self.descriptors.get(serverid)
+    def _got_announcement(self, key_s, ann):
+        if key_s is not None:
+            precondition(isinstance(key_s, str), key_s)
+            precondition(key_s.startswith("v0-"), key_s)
+        assert ann["service-name"] == "storage"
+        s = NativeStorageServer(key_s, ann)
+        serverid = s.get_serverid()
+        old = self.servers.get(serverid)
         if old:
-            if old.get_announcement() == ann_d:
+            if old.get_announcement() == ann:
                 return # duplicate
             # replacement
-            del self.descriptors[serverid]
+            del self.servers[serverid]
             old.stop_connecting()
             # now we forget about them and start using the new one
-        dsc = NativeStorageClientDescriptor(serverid, ann_d)
-        self.descriptors[serverid] = dsc
-        dsc.start_connecting(self.tub, self._trigger_connections)
+        self.servers[serverid] = s
+        s.start_connecting(self.tub, self._trigger_connections)
         # the descriptor will manage their own Reconnector, and each time we
         # need servers, we'll ask them if they're connected or not.
 
@@ -109,56 +116,53 @@ class StorageFarmBroker:
         # connections to only a subset of the servers, which would increase
         # the chances that we'll put shares in weird places (and not update
         # existing shares of mutable files). See #374 for more details.
-        for dsc in self.descriptors.values():
+        for dsc in self.servers.values():
             dsc.try_to_connect()
 
-
-
-    def get_servers_for_index(self, peer_selection_index):
-        # first cut: return a list of (peerid, versioned-rref) tuples
+    def get_servers_for_psi(self, peer_selection_index):
+        # return a list of server objects (IServers)
         assert self.permute_peers == True
-        servers = self.get_all_servers()
-        key = peer_selection_index
-        return sorted(servers, key=lambda x: sha.new(key+x[0]).digest())
-
-    def get_all_servers(self):
-        # return a frozenset of (peerid, versioned-rref) tuples
-        servers = {}
-        for serverid,rref in self.test_servers.items():
-            servers[serverid] = rref
-        for serverid,dsc in self.descriptors.items():
-            rref = dsc.get_rref()
-            if rref:
-                servers[serverid] = rref
-        result = frozenset(servers.items())
-        _assert(len(result) <= len(self.get_all_serverids()), result, self.get_all_serverids())
-        return result
+        connected_servers = self.get_connected_servers()
+        preferred_servers = frozenset(s for s in connected_servers if s.get_longname() in self.preferred_peers)
+        def _permuted(server):
+            seed = server.get_permutation_seed()
+            is_unpreferred = server not in preferred_servers
+            return (is_unpreferred, sha1(peer_selection_index + seed).digest())
+        return sorted(connected_servers, key=_permuted)
 
     def get_all_serverids(self):
-        serverids = set()
-        serverids.update(self.test_servers.keys())
-        serverids.update(self.descriptors.keys())
-        return frozenset(serverids)
+        return frozenset(self.servers.keys())
+
+    def get_connected_servers(self):
+        return frozenset([s for s in self.servers.values() if s.is_connected()])
 
-    def get_all_descriptors(self):
-        return sorted(self.descriptors.values(),
-                      key=lambda dsc: dsc.get_serverid())
+    def get_known_servers(self):
+        return frozenset(self.servers.values())
 
     def get_nickname_for_serverid(self, serverid):
-        if serverid in self.descriptors:
-            return self.descriptors[serverid].get_nickname()
+        if serverid in self.servers:
+            return self.servers[serverid].get_nickname()
         return None
 
+    def get_stub_server(self, serverid):
+        if serverid in self.servers:
+            return self.servers[serverid]
+        return StubServer(serverid)
 
-class IServerDescriptor(Interface):
-    def start_connecting(tub, trigger_cb):
-        pass
-    def get_nickname():
-        pass
-    def get_rref():
-        pass
+class StubServer:
+    implements(IDisplayableServer)
+    def __init__(self, serverid):
+        self.serverid = serverid # binary tubid
+    def get_serverid(self):
+        return self.serverid
+    def get_name(self):
+        return base32.b2a(self.serverid)[:8]
+    def get_longname(self):
+        return base32.b2a(self.serverid)
+    def get_nickname(self):
+        return "?"
 
-class NativeStorageClientDescriptor:
+class NativeStorageServer:
     """I hold information about a storage server that we want to connect to.
     If we are connected, I hold the RemoteReference, their host address, and
     the their version information. I remember information about when we were
@@ -174,40 +178,91 @@ class NativeStorageClientDescriptor:
     @ivar rref: the RemoteReference, if connected, otherwise None
     @ivar remote_host: the IAddress, if connected, otherwise None
     """
-    implements(IServerDescriptor)
+    implements(IServer)
 
     VERSION_DEFAULTS = {
         "http://allmydata.org/tahoe/protocols/storage/v1" :
-        { "maximum-immutable-share-size": 2**32,
+        { "maximum-immutable-share-size": 2**32 - 1,
+          "maximum-mutable-share-size": 2*1000*1000*1000, # maximum prior to v1.9.2
           "tolerates-immutable-read-overrun": False,
           "delete-mutable-shares-with-zero-length-writev": False,
+          "available-space": None,
           },
         "application-version": "unknown: no get_version()",
         }
 
-    def __init__(self, serverid, ann_d, min_shares=1):
-        self.serverid = serverid
-        self.announcement = ann_d
-        self.min_shares = min_shares
+    def __init__(self, key_s, ann):
+        self.key_s = key_s
+        self.announcement = ann
+
+        assert "anonymous-storage-FURL" in ann, ann
+        furl = str(ann["anonymous-storage-FURL"])
+        m = re.match(r'pb://(\w+)@', furl)
+        assert m, furl
+        tubid_s = m.group(1).lower()
+        self._tubid = base32.a2b(tubid_s)
+        assert "permutation-seed-base32" in ann, ann
+        ps = base32.a2b(str(ann["permutation-seed-base32"]))
+        self._permutation_seed = ps
+
+        if key_s:
+            self._long_description = key_s
+            if key_s.startswith("v0-"):
+                # remove v0- prefix from abbreviated name
+                self._short_description = key_s[3:3+8]
+            else:
+                self._short_description = key_s[:8]
+        else:
+            self._long_description = tubid_s
+            self._short_description = tubid_s[:6]
 
-        self.serverid_s = idlib.shortnodeid_b2a(self.serverid)
         self.announcement_time = time.time()
         self.last_connect_time = None
         self.last_loss_time = None
         self.remote_host = None
         self.rref = None
+        self._is_connected = False
         self._reconnector = None
         self._trigger_cb = None
 
+    # Special methods used by copy.copy() and copy.deepcopy(). When those are
+    # used in allmydata.immutable.filenode to copy CheckResults during
+    # repair, we want it to treat the IServer instances as singletons, and
+    # not attempt to duplicate them..
+    def __copy__(self):
+        return self
+    def __deepcopy__(self, memodict):
+        return self
+
+    def __repr__(self):
+        return "<NativeStorageServer for %s>" % self.get_name()
     def get_serverid(self):
-        return self.serverid
+        return self._tubid # XXX replace with self.key_s
+    def get_permutation_seed(self):
+        return self._permutation_seed
+    def get_version(self):
+        if self.rref:
+            return self.rref.version
+        return None
+    def get_name(self): # keep methodname short
+        # TODO: decide who adds [] in the short description. It should
+        # probably be the output side, not here.
+        return self._short_description
+    def get_longname(self):
+        return self._long_description
+    def get_lease_seed(self):
+        return self._tubid
+    def get_foolscap_write_enabler_seed(self):
+        return self._tubid
 
     def get_nickname(self):
-        return self.announcement["nickname"].decode("utf-8")
+        return self.announcement["nickname"]
     def get_announcement(self):
         return self.announcement
     def get_remote_host(self):
         return self.remote_host
+    def is_connected(self):
+        return self._is_connected
     def get_last_connect_time(self):
         return self.last_connect_time
     def get_last_loss_time(self):
@@ -215,14 +270,24 @@ class NativeStorageClientDescriptor:
     def get_announcement_time(self):
         return self.announcement_time
 
+    def get_available_space(self):
+        version = self.get_version()
+        if version is None:
+            return None
+        protocol_v1_version = version.get('http://allmydata.org/tahoe/protocols/storage/v1', {})
+        available_space = protocol_v1_version.get('available-space')
+        if available_space is None:
+            available_space = protocol_v1_version.get('maximum-immutable-share-size', None)
+        return available_space
+
     def start_connecting(self, tub, trigger_cb):
-        furl = self.announcement["FURL"]
+        furl = str(self.announcement["anonymous-storage-FURL"])
         self._trigger_cb = trigger_cb
         self._reconnector = tub.connectTo(furl, self._got_connection)
 
     def _got_connection(self, rref):
-        lp = log.msg(format="got connection to %(serverid)s, getting versions",
-                     serverid=self.serverid_s,
+        lp = log.msg(format="got connection to %(name)s, getting versions",
+                     name=self.get_name(),
                      facility="tahoe.storage_broker", umid="coUECQ")
         if self._trigger_cb:
             eventually(self._trigger_cb)
@@ -230,28 +295,33 @@ class NativeStorageClientDescriptor:
         d = add_version_to_remote_reference(rref, default)
         d.addCallback(self._got_versioned_service, lp)
         d.addErrback(log.err, format="storageclient._got_connection",
-                     serverid=self.serverid_s, umid="Sdq3pg")
+                     name=self.get_name(), umid="Sdq3pg")
 
     def _got_versioned_service(self, rref, lp):
-        log.msg(format="%(serverid)s provided version info %(version)s",
-                serverid=self.serverid_s, version=rref.version,
+        log.msg(format="%(name)s provided version info %(version)s",
+                name=self.get_name(), version=rref.version,
                 facility="tahoe.storage_broker", umid="SWmJYg",
                 level=log.NOISY, parent=lp)
 
         self.last_connect_time = time.time()
         self.remote_host = rref.getPeer()
         self.rref = rref
+        self._is_connected = True
         rref.notifyOnDisconnect(self._lost)
 
     def get_rref(self):
         return self.rref
 
     def _lost(self):
-        log.msg(format="lost connection to %(serverid)s",
-                serverid=self.serverid_s,
+        log.msg(format="lost connection to %(name)s", name=self.get_name(),
                 facility="tahoe.storage_broker", umid="zbRllw")
         self.last_loss_time = time.time()
-        self.rref = None
+        # self.rref is now stale: all callRemote()s will get a
+        # DeadReferenceError. We leave the stale reference in place so that
+        # uploader/downloader code (which received this IServer through
+        # get_connected_servers() or get_servers_for_psi()) can continue to
+        # use s.get_rref().callRemote() and not worry about it being None.
+        self._is_connected = False
         self.remote_host = None
 
     def stop_connecting(self):