]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/storage_client.py
revert previous commit to fix attribution (vanity)
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage_client.py
index 9d36a8bdf2aa9fe596c2f58206f9d1c422405074..dd9780f23a8c4efd8eefccd1de47cd3d7f92d060 100644 (file)
@@ -30,9 +30,9 @@ the foolscap-based server implemented in src/allmydata/storage/*.py .
 
 
 import re, time
-from zope.interface import implements, Interface
+from zope.interface import implements
 from foolscap.api import eventually
-from allmydata.interfaces import IStorageBroker
+from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer
 from allmydata.util import log, base32
 from allmydata.util.assertutil import precondition
 from allmydata.util.rrefutil import add_version_to_remote_reference
@@ -62,10 +62,11 @@ class StorageFarmBroker:
     I'm also responsible for subscribing to the IntroducerClient to find out
     about new servers as they are announced by the Introducer.
     """
-    def __init__(self, tub, permute_peers):
+    def __init__(self, tub, permute_peers, preferred_peers=()):
         self.tub = tub
         assert permute_peers # False not implemented yet
         self.permute_peers = permute_peers
+        self.preferred_peers = preferred_peers
         # self.servers maps serverid -> IServer, and keeps track of all the
         # storage servers that we've heard about. Each descriptor manages its
         # own Reconnector, and will give us a RemoteReference when we ask
@@ -77,6 +78,7 @@ class StorageFarmBroker:
     def test_add_rref(self, serverid, rref, ann):
         s = NativeStorageServer(serverid, ann.copy())
         s.rref = rref
+        s._is_connected = True
         self.servers[serverid] = s
 
     def test_add_server(self, serverid, s):
@@ -120,16 +122,19 @@ class StorageFarmBroker:
     def get_servers_for_psi(self, peer_selection_index):
         # return a list of server objects (IServers)
         assert self.permute_peers == True
+        connected_servers = self.get_connected_servers()
+        preferred_servers = frozenset(s for s in connected_servers if s.get_longname() in self.preferred_peers)
         def _permuted(server):
             seed = server.get_permutation_seed()
-            return sha1(peer_selection_index + seed).digest()
-        return sorted(self.get_connected_servers(), key=_permuted)
+            is_unpreferred = server not in preferred_servers
+            return (is_unpreferred, sha1(peer_selection_index + seed).digest())
+        return sorted(connected_servers, key=_permuted)
 
     def get_all_serverids(self):
         return frozenset(self.servers.keys())
 
     def get_connected_servers(self):
-        return frozenset([s for s in self.servers.values() if s.get_rref()])
+        return frozenset([s for s in self.servers.values() if s.is_connected()])
 
     def get_known_servers(self):
         return frozenset(self.servers.values())
@@ -139,14 +144,23 @@ class StorageFarmBroker:
             return self.servers[serverid].get_nickname()
         return None
 
-class IServer(Interface):
-    """I live in the client, and represent a single server."""
-    def start_connecting(tub, trigger_cb):
-        pass
-    def get_nickname():
-        pass
-    def get_rref():
-        pass
+    def get_stub_server(self, serverid):
+        if serverid in self.servers:
+            return self.servers[serverid]
+        return StubServer(serverid)
+
+class StubServer:
+    implements(IDisplayableServer)
+    def __init__(self, serverid):
+        self.serverid = serverid # binary tubid
+    def get_serverid(self):
+        return self.serverid
+    def get_name(self):
+        return base32.b2a(self.serverid)[:8]
+    def get_longname(self):
+        return base32.b2a(self.serverid)
+    def get_nickname(self):
+        return "?"
 
 class NativeStorageServer:
     """I hold information about a storage server that we want to connect to.
@@ -168,17 +182,18 @@ class NativeStorageServer:
 
     VERSION_DEFAULTS = {
         "http://allmydata.org/tahoe/protocols/storage/v1" :
-        { "maximum-immutable-share-size": 2**32,
+        { "maximum-immutable-share-size": 2**32 - 1,
+          "maximum-mutable-share-size": 2*1000*1000*1000, # maximum prior to v1.9.2
           "tolerates-immutable-read-overrun": False,
           "delete-mutable-shares-with-zero-length-writev": False,
+          "available-space": None,
           },
         "application-version": "unknown: no get_version()",
         }
 
-    def __init__(self, key_s, ann, min_shares=1):
+    def __init__(self, key_s, ann):
         self.key_s = key_s
         self.announcement = ann
-        self.min_shares = min_shares
 
         assert "anonymous-storage-FURL" in ann, ann
         furl = str(ann["anonymous-storage-FURL"])
@@ -190,18 +205,35 @@ class NativeStorageServer:
         ps = base32.a2b(str(ann["permutation-seed-base32"]))
         self._permutation_seed = ps
 
-        name = key_s or tubid_s
-        self._long_description = name
-        self._short_description = name[:8] # TODO: decide who adds []
+        if key_s:
+            self._long_description = key_s
+            if key_s.startswith("v0-"):
+                # remove v0- prefix from abbreviated name
+                self._short_description = key_s[3:3+8]
+            else:
+                self._short_description = key_s[:8]
+        else:
+            self._long_description = tubid_s
+            self._short_description = tubid_s[:6]
 
         self.announcement_time = time.time()
         self.last_connect_time = None
         self.last_loss_time = None
         self.remote_host = None
         self.rref = None
+        self._is_connected = False
         self._reconnector = None
         self._trigger_cb = None
 
+    # Special methods used by copy.copy() and copy.deepcopy(). When those are
+    # used in allmydata.immutable.filenode to copy CheckResults during
+    # repair, we want it to treat the IServer instances as singletons, and
+    # not attempt to duplicate them..
+    def __copy__(self):
+        return self
+    def __deepcopy__(self, memodict):
+        return self
+
     def __repr__(self):
         return "<NativeStorageServer for %s>" % self.get_name()
     def get_serverid(self):
@@ -213,6 +245,8 @@ class NativeStorageServer:
             return self.rref.version
         return None
     def get_name(self): # keep methodname short
+        # TODO: decide who adds [] in the short description. It should
+        # probably be the output side, not here.
         return self._short_description
     def get_longname(self):
         return self._long_description
@@ -222,11 +256,13 @@ class NativeStorageServer:
         return self._tubid
 
     def get_nickname(self):
-        return self.announcement["nickname"].decode("utf-8")
+        return self.announcement["nickname"]
     def get_announcement(self):
         return self.announcement
     def get_remote_host(self):
         return self.remote_host
+    def is_connected(self):
+        return self._is_connected
     def get_last_connect_time(self):
         return self.last_connect_time
     def get_last_loss_time(self):
@@ -234,6 +270,16 @@ class NativeStorageServer:
     def get_announcement_time(self):
         return self.announcement_time
 
+    def get_available_space(self):
+        version = self.get_version()
+        if version is None:
+            return None
+        protocol_v1_version = version.get('http://allmydata.org/tahoe/protocols/storage/v1', {})
+        available_space = protocol_v1_version.get('available-space')
+        if available_space is None:
+            available_space = protocol_v1_version.get('maximum-immutable-share-size', None)
+        return available_space
+
     def start_connecting(self, tub, trigger_cb):
         furl = str(self.announcement["anonymous-storage-FURL"])
         self._trigger_cb = trigger_cb
@@ -260,6 +306,7 @@ class NativeStorageServer:
         self.last_connect_time = time.time()
         self.remote_host = rref.getPeer()
         self.rref = rref
+        self._is_connected = True
         rref.notifyOnDisconnect(self._lost)
 
     def get_rref(self):
@@ -269,7 +316,12 @@ class NativeStorageServer:
         log.msg(format="lost connection to %(name)s", name=self.get_name(),
                 facility="tahoe.storage_broker", umid="zbRllw")
         self.last_loss_time = time.time()
-        self.rref = None
+        # self.rref is now stale: all callRemote()s will get a
+        # DeadReferenceError. We leave the stale reference in place so that
+        # uploader/downloader code (which received this IServer through
+        # get_connected_servers() or get_servers_for_psi()) can continue to
+        # use s.get_rref().callRemote() and not worry about it being None.
+        self._is_connected = False
         self.remote_host = None
 
     def stop_connecting(self):