rearrange client setup, factor out common Node functionality, add Uploader service...
authorBrian Warner <warner@lothar.com>
Sun, 3 Dec 2006 01:27:18 +0000 (18:27 -0700)
committerBrian Warner <warner@lothar.com>
Sun, 3 Dec 2006 01:27:18 +0000 (18:27 -0700)
allmydata/client.py
allmydata/node.py [new file with mode: 0644]
allmydata/queen.py
allmydata/test/test_upload.py
allmydata/upload.py
client.tac

index 7ad410539629f36c0ddcc8d7fd01cf32005a64df..df17ac7240433a41705619872b1be2030ba47f3d 100644 (file)
@@ -1,12 +1,11 @@
 
-import os.path
 import sha
-from foolscap import Tub, Referenceable
+from foolscap import Referenceable
 from twisted.application import service
 from twisted.python import log
-from allmydata.util.iputil import get_local_ip_for
 from zope.interface import implements
 from allmydata.interfaces import RIClient
+from allmydata import node
 
 from twisted.internet import defer, reactor
 # this BlockingResolver is because otherwise unit tests must sometimes deal
@@ -16,62 +15,53 @@ from twisted.internet.base import BlockingResolver
 reactor.installResolver(BlockingResolver())
 
 from allmydata.storageserver import StorageServer
+from allmydata.upload import Uploader
 from allmydata.util import idlib
 
-class Client(service.MultiService, Referenceable):
+class Client(node.Node, Referenceable):
     implements(RIClient)
     CERTFILE = "client.pem"
+    PORTNUMFILE = "client.port"
     STOREDIR = 'storage'
+    NODETYPE = "client"
 
-    def __init__(self, queen_pburl):
-        service.MultiService.__init__(self)
-        self.queen_pburl = queen_pburl
-        if os.path.exists(self.CERTFILE):
-            self.tub = Tub(certData=open(self.CERTFILE, "rb").read())
-        else:
-            self.tub = Tub()
-            f = open(self.CERTFILE, "wb")
-            f.write(self.tub.getCertData())
-            f.close()
-        self.nodeid = idlib.a2b(self.tub.tubID)
-        self.tub.setServiceParent(self)
+    def __init__(self, basedir="."):
+        node.Node.__init__(self, basedir)
         self.queen = None # self.queen is either None or a RemoteReference
         self.all_peers = set()
         self.connections = {}
-        s = StorageServer(self.STOREDIR)
-        s.setServiceParent(self)
-
-        AUTHKEYSFILEBASE = "authorized_keys."
-        for f in os.listdir("."):
-            if f.startswith(AUTHKEYSFILEBASE):
-                portnum = int(f[len(AUTHKEYSFILEBASE):])
-                from allmydata import manhole
-                m = manhole.AuthorizedKeysManhole(portnum, f)
-                m.setServiceParent(self)
-                log.msg("AuthorizedKeysManhole listening on %d" % portnum)
-
-    def _setup_tub(self, local_ip):
-        portnum = 0
-        l = self.tub.listenOn("tcp:%d" % portnum)
-        self.tub.setLocation("%s:%d" % (local_ip, l.getPortnum()))
-        self.my_pburl = self.tub.registerReference(self)
+        self.add_service(StorageServer(self.STOREDIR))
+        self.add_service(Uploader())
+        self.queen_pburl = None
+        self.queen_connector = None
+        self.my_pburl = None
 
-    def startService(self):
-        # note: this class can only be started and stopped once.
-        service.MultiService.startService(self)
-        d = get_local_ip_for()
-        d.addCallback(self._setup_tub)
-        if self.queen_pburl:
-            # TODO: maybe this should wait for tub.setLocation ?
-            self.connector = self.tub.connectTo(self.queen_pburl,
-                                                self._got_queen)
-        else:
+    def set_queen_pburl(self, queen_pburl):
+        self.queen_pburl = queen_pburl
+        self.maybe_connect_to_queen()
+
+    def maybe_connect_to_queen(self):
+        if not self.running:
+            return
+        if not self.my_pburl:
+            return
+        if self.queen_connector:
+            return
+        if not self.queen_pburl:
             log.msg("no queen_pburl, cannot connect")
+            return
+        self.queen_connector = self.tub.connectTo(self.queen_pburl,
+                                                  self._got_queen)
+
+    def tub_ready(self, tub):
+        self.my_pburl = self.tub.registerReference(self)
+        self.maybe_connect_to_queen()
 
     def stopService(self):
-        if self.queen_pburl:
-            self.connector.stopConnecting()
-        service.MultiService.stopService(self)
+        if self.queen_connector:
+            self.queen_connector.stopConnecting()
+            self.queen_connector = None
+        return service.MultiService.stopService(self)
 
     def _got_queen(self, queen):
         log.msg("connected to queen")
diff --git a/allmydata/node.py b/allmydata/node.py
new file mode 100644 (file)
index 0000000..201b754
--- /dev/null
@@ -0,0 +1,80 @@
+
+from twisted.application import service
+import os.path
+from foolscap import Tub
+from allmydata.util.iputil import get_local_ip_for
+from allmydata.util import idlib
+from twisted.python import log
+
+class Node(service.MultiService):
+    # this implements common functionality of both Client nodes and the Queen
+    # node.
+    NODETYPE = "unknown NODETYPE"
+    PORTNUMFILE = None
+    CERTFILE = None
+
+    def __init__(self, basedir="."):
+        service.MultiService.__init__(self)
+        self.basedir = os.path.abspath(basedir)
+        assert self.CERTFILE, "Your node.Node subclass must provide CERTFILE"
+        certfile = os.path.join(self.basedir, self.CERTFILE)
+        if os.path.exists(certfile):
+            f = open(certfile, "rb")
+            self.tub = Tub(certData=f.read())
+            f.close()
+        else:
+            self.tub = Tub()
+            f = open(certfile, "wb")
+            f.write(self.tub.getCertData())
+            f.close()
+        self.nodeid = idlib.a2b(self.tub.tubID)
+        portnum = 0
+        assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE"
+        self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
+        if os.path.exists(self._portnumfile):
+            portnum = int(open(self._portnumfile, "r").read())
+        self.tub.listenOn("tcp:%d" % portnum)
+        # we must wait until our service has started before we can find out
+        # our IP address and thus do tub.setLocation, and we can't register
+        # any services with the Tub until after that point
+        self.tub.setServiceParent(self)
+
+        AUTHKEYSFILEBASE = "authorized_keys."
+        for f in os.listdir(self.basedir):
+            if f.startswith(AUTHKEYSFILEBASE):
+                keyfile = os.path.join(self.basedir, f)
+                portnum = int(f[len(AUTHKEYSFILEBASE):])
+                from allmydata import manhole
+                m = manhole.AuthorizedKeysManhole(portnum, keyfile)
+                m.setServiceParent(self)
+                log.msg("AuthorizedKeysManhole listening on %d" % portnum)
+
+    def _setup_tub(self, local_ip):
+        l = self.tub.getListeners()[0]
+        portnum = l.getPortnum()
+        self.tub.setLocation("%s:%d" % (local_ip, portnum))
+        if not os.path.exists(self._portnumfile):
+            # record which port we're listening on, so we can grab the same
+            # one next time
+            f = open(self._portnumfile, "w")
+            f.write("%d\n" % portnum)
+            f.close()
+        self.tub.setLocation("%s:%d" % (local_ip, l.getPortnum()))
+        return self.tub
+
+    def tub_ready(self, tub):
+        # this is called when the Tub has a location
+        pass
+
+    def add_service(self, s):
+        s.setServiceParent(self)
+        return s
+
+    def startService(self):
+        # note: this class can only be started and stopped once.
+        service.MultiService.startService(self)
+        d = get_local_ip_for()
+        d.addCallback(self._setup_tub)
+        d.addCallback(self.tub_ready)
+        d.addCallback(lambda res: log.msg("%s running" % self.NODETYPE))
+
index 0a75a93005a03667f3751435d32cc3dfa6a5ffec..5db057881da5b3385114bc7e123aa41ed7dd6077 100644 (file)
@@ -1,13 +1,12 @@
 
-from foolscap import Tub, Referenceable
+from foolscap import Referenceable
 from foolscap.eventual import eventually
 from twisted.application import service
 from twisted.python import log
-import os.path
-from allmydata.util.iputil import get_local_ip_for
 from allmydata.util import idlib
 from zope.interface import implements
 from allmydata.interfaces import RIQueenRoster
+from allmydata import node
 
 class Roster(service.MultiService, Referenceable):
     implements(RIQueenRoster)
@@ -45,62 +44,17 @@ class Roster(service.MultiService, Referenceable):
 
 
 
-class Queen(service.MultiService):
+class Queen(node.Node):
     CERTFILE = "queen.pem"
     PORTNUMFILE = "queen.port"
+    NODETYPE = "queen"
 
-    def __init__(self):
-        service.MultiService.__init__(self)
-        if os.path.exists(self.CERTFILE):
-            self.tub = Tub(certData=open(self.CERTFILE, "rb").read())
-        else:
-            self.tub = Tub()
-            f = open(self.CERTFILE, "wb")
-            f.write(self.tub.getCertData())
-            f.close()
-        portnum = 0
-        if os.path.exists(self.PORTNUMFILE):
-            portnum = int(open(self.PORTNUMFILE, "r").read())
-        self.tub.listenOn("tcp:%d" % portnum)
-        # we must wait until our service has started before we can find out
-        # our IP address and thus do tub.setLocation, and we can't register
-        # any services with the Tub until after that point
-        self.tub.setServiceParent(self)
+    def __init__(self, basedir="."):
+        node.Node.__init__(self, basedir)
         self.urls = {}
 
-        AUTHKEYSFILEBASE = "authorized_keys."
-        for f in os.listdir("."):
-            if f.startswith(AUTHKEYSFILEBASE):
-                portnum = int(f[len(AUTHKEYSFILEBASE):])
-                from allmydata import manhole
-                m = manhole.AuthorizedKeysManhole(portnum, f)
-                m.setServiceParent(self)
-                log.msg("AuthorizedKeysManhole listening on %d" % portnum)
-
-    def _setup_tub(self, local_ip):
-        l = self.tub.getListeners()[0]
-        portnum = l.getPortnum()
-        self.tub.setLocation("%s:%d" % (local_ip, portnum))
-        if not os.path.exists(self.PORTNUMFILE):
-            # record which port we're listening on, so we can grab the same
-            # one next time
-            f = open(self.PORTNUMFILE, "w")
-            f.write("%d\n" % portnum)
-            f.close()
-        self.tub.setLocation("%s:%d" % (local_ip, l.getPortnum()))
-        return local_ip
-
-    def _setup_services(self, local_ip):
-        r = Roster()
-        r.setServiceParent(self)
+    def tub_ready(self, tub):
+        r = self.add_service(Roster())
         self.urls["roster"] = self.tub.registerReference(r, "roster")
         log.msg(" roster is at %s" % self.urls["roster"])
 
-    def startService(self):
-        # note: this class can only be started and stopped once.
-        service.MultiService.startService(self)
-        log.msg("queen running")
-        d = get_local_ip_for()
-        d.addCallback(self._setup_tub)
-        d.addCallback(self._setup_services)
-
index 349133d5664bd591c2d77ae4876a1d1a65952430..8c1bb0768c1e63eebeedfd34a96e7a1069a0be9b 100644 (file)
@@ -67,7 +67,7 @@ class FakeClient:
             return defer.fail(IndexError("no connection to that peer"))
         return defer.succeed(peer)
 
-class NextPeerUploader(upload.Uploader):
+class NextPeerUploader(upload.FileUploader):
     def _got_all_peers(self, res):
         return res
 
@@ -150,4 +150,3 @@ class NextPeer(unittest.TestCase):
                                           ])
         d.addCallback(_check)
         return d
-
index b6dc4de07572d9f7e0bf14706fe5815f821ce578..08b97c0ec57a953a6b939e5102938ff4ab8dab94 100644 (file)
@@ -1,7 +1,10 @@
 
 from twisted.python import failure
 from twisted.internet import defer
+from twisted.application import service
+
 from allmydata.util import idlib
+from allmydata import encode
 
 class NotEnoughPeersError(Exception):
     pass
@@ -14,26 +17,33 @@ class HaveAllPeersError(Exception):
 class TooFullError(Exception):
     pass
 
-class Uploader:
+def upload_a_file(peer, filename):
+    u = Uploader(peer)
+    u.set_filehandle(open(filename,"rb"))
+    u.set_verifierid(hashthingy(filethingy))
+    u.make_encoder()
+
+class FileUploader:
     debug = False
 
     def __init__(self, peer):
         self._peer = peer
 
-    def set_encoder(self, encoder):
-        self._encoder = encoder
+    def set_filehandle(self, filehandle):
+        self._filehandle = filehandle
+        filehandle.seek(0, 2)
+        self._size = filehandle.tell()
+        filehandle.seek(0)
+
+    def make_encoder(self):
+        self._encoder = encode.Encoder(self._filehandle, 4)
+        self._shares = 4
+        self._share_size = self._size
 
     def set_verifierid(self, vid):
         assert isinstance(vid, str)
         self._verifierid = vid
 
-    def set_filesize(self, size):
-        self._size = size
-
-    def _calculate_parameters(self):
-        self._shares = 100
-        self._share_size = self._size / 25
-
 
     def start(self):
         # first step: who should we upload to?
@@ -111,3 +121,29 @@ class Uploader:
         d = self._encoder.do_upload(self.landlords)
         return d
 
+def netstring(s):
+    return "%d:%s," % (len(s), s)
+
+class Uploader(service.MultiService):
+    """I am a service that allows file uploading.
+    """
+    name = "uploader"
+
+    def _compute_verifierid(self, filehandle):
+        hasher = sha.new(netstring("allmydata_v1_verifierid"))
+        f.seek(0)
+        hasher.update(f.read())
+        f.seek(0)
+        # note: this is only of the plaintext data, no encryption yet
+        return hasher.digest()
+
+    def upload_file_by_name(self, filename):
+        assert self.parent
+        assert self.running
+        f = open(filename, "rb")
+        u = FileUploader(self.parent)
+        u.set_verifierid(self._compute_verifierid(f))
+        u.make_encoder()
+        d = u.start()
+        return d
+
index 7e25a8f7d4286706ca8385717d78e6779e0133df..16d19dbf9a9743fef213d0f724ecc4a353792683 100644 (file)
@@ -5,7 +5,9 @@ from twisted.application import service
 
 queen_pburl = "pb://jekyv6ghn7zinppk7wcvfmk7o4gw76hb@192.168.1.101:42552/roster"
 yumyum_queen = "pb://cznyjh2pi4bybn3g7pi36bdfnwz356vk@192.168.1.98:56510/roster"
-c = client.Client(yumyum_queen)
+c = client.Client()
+c.set_queen_pburl(yumyum_queen)
+#c.set_queen_pburl(queen_pburl)
 
 application = service.Application("allmydata_client")
 c.setServiceParent(application)