From d13f9289ef6c79e7c589f10d7caf3fad629646dc Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Sat, 2 Dec 2006 18:27:18 -0700 Subject: [PATCH] rearrange client setup, factor out common Node functionality, add Uploader service to client --- allmydata/client.py | 82 +++++++++++++++-------------------- allmydata/node.py | 80 ++++++++++++++++++++++++++++++++++ allmydata/queen.py | 62 ++++---------------------- allmydata/test/test_upload.py | 3 +- allmydata/upload.py | 56 +++++++++++++++++++----- client.tac | 4 +- 6 files changed, 174 insertions(+), 113 deletions(-) create mode 100644 allmydata/node.py diff --git a/allmydata/client.py b/allmydata/client.py index 7ad41053..df17ac72 100644 --- a/allmydata/client.py +++ b/allmydata/client.py @@ -1,12 +1,11 @@ -import os.path import sha -from foolscap import Tub, Referenceable +from foolscap import Referenceable from twisted.application import service from twisted.python import log -from allmydata.util.iputil import get_local_ip_for from zope.interface import implements from allmydata.interfaces import RIClient +from allmydata import node from twisted.internet import defer, reactor # this BlockingResolver is because otherwise unit tests must sometimes deal @@ -16,62 +15,53 @@ from twisted.internet.base import BlockingResolver reactor.installResolver(BlockingResolver()) from allmydata.storageserver import StorageServer +from allmydata.upload import Uploader from allmydata.util import idlib -class Client(service.MultiService, Referenceable): +class Client(node.Node, Referenceable): implements(RIClient) CERTFILE = "client.pem" + PORTNUMFILE = "client.port" STOREDIR = 'storage' + NODETYPE = "client" - def __init__(self, queen_pburl): - service.MultiService.__init__(self) - self.queen_pburl = queen_pburl - if os.path.exists(self.CERTFILE): - self.tub = Tub(certData=open(self.CERTFILE, "rb").read()) - else: - self.tub = Tub() - f = open(self.CERTFILE, "wb") - f.write(self.tub.getCertData()) - f.close() - self.nodeid = idlib.a2b(self.tub.tubID) - self.tub.setServiceParent(self) + def __init__(self, basedir="."): + node.Node.__init__(self, basedir) self.queen = None # self.queen is either None or a RemoteReference self.all_peers = set() self.connections = {} - s = StorageServer(self.STOREDIR) - s.setServiceParent(self) - - AUTHKEYSFILEBASE = "authorized_keys." - for f in os.listdir("."): - if f.startswith(AUTHKEYSFILEBASE): - portnum = int(f[len(AUTHKEYSFILEBASE):]) - from allmydata import manhole - m = manhole.AuthorizedKeysManhole(portnum, f) - m.setServiceParent(self) - log.msg("AuthorizedKeysManhole listening on %d" % portnum) - - def _setup_tub(self, local_ip): - portnum = 0 - l = self.tub.listenOn("tcp:%d" % portnum) - self.tub.setLocation("%s:%d" % (local_ip, l.getPortnum())) - self.my_pburl = self.tub.registerReference(self) + self.add_service(StorageServer(self.STOREDIR)) + self.add_service(Uploader()) + self.queen_pburl = None + self.queen_connector = None + self.my_pburl = None - def startService(self): - # note: this class can only be started and stopped once. - service.MultiService.startService(self) - d = get_local_ip_for() - d.addCallback(self._setup_tub) - if self.queen_pburl: - # TODO: maybe this should wait for tub.setLocation ? - self.connector = self.tub.connectTo(self.queen_pburl, - self._got_queen) - else: + def set_queen_pburl(self, queen_pburl): + self.queen_pburl = queen_pburl + self.maybe_connect_to_queen() + + def maybe_connect_to_queen(self): + if not self.running: + return + if not self.my_pburl: + return + if self.queen_connector: + return + if not self.queen_pburl: log.msg("no queen_pburl, cannot connect") + return + self.queen_connector = self.tub.connectTo(self.queen_pburl, + self._got_queen) + + def tub_ready(self, tub): + self.my_pburl = self.tub.registerReference(self) + self.maybe_connect_to_queen() def stopService(self): - if self.queen_pburl: - self.connector.stopConnecting() - service.MultiService.stopService(self) + if self.queen_connector: + self.queen_connector.stopConnecting() + self.queen_connector = None + return service.MultiService.stopService(self) def _got_queen(self, queen): log.msg("connected to queen") diff --git a/allmydata/node.py b/allmydata/node.py new file mode 100644 index 00000000..201b7540 --- /dev/null +++ b/allmydata/node.py @@ -0,0 +1,80 @@ + +from twisted.application import service +import os.path +from foolscap import Tub +from allmydata.util.iputil import get_local_ip_for +from allmydata.util import idlib +from twisted.python import log + +class Node(service.MultiService): + # this implements common functionality of both Client nodes and the Queen + # node. + NODETYPE = "unknown NODETYPE" + PORTNUMFILE = None + CERTFILE = None + + def __init__(self, basedir="."): + service.MultiService.__init__(self) + self.basedir = os.path.abspath(basedir) + assert self.CERTFILE, "Your node.Node subclass must provide CERTFILE" + certfile = os.path.join(self.basedir, self.CERTFILE) + if os.path.exists(certfile): + f = open(certfile, "rb") + self.tub = Tub(certData=f.read()) + f.close() + else: + self.tub = Tub() + f = open(certfile, "wb") + f.write(self.tub.getCertData()) + f.close() + self.nodeid = idlib.a2b(self.tub.tubID) + portnum = 0 + assert self.PORTNUMFILE, "Your node.Node subclass must provide PORTNUMFILE" + self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE) + if os.path.exists(self._portnumfile): + portnum = int(open(self._portnumfile, "r").read()) + self.tub.listenOn("tcp:%d" % portnum) + # we must wait until our service has started before we can find out + # our IP address and thus do tub.setLocation, and we can't register + # any services with the Tub until after that point + self.tub.setServiceParent(self) + + AUTHKEYSFILEBASE = "authorized_keys." + for f in os.listdir(self.basedir): + if f.startswith(AUTHKEYSFILEBASE): + keyfile = os.path.join(self.basedir, f) + portnum = int(f[len(AUTHKEYSFILEBASE):]) + from allmydata import manhole + m = manhole.AuthorizedKeysManhole(portnum, keyfile) + m.setServiceParent(self) + log.msg("AuthorizedKeysManhole listening on %d" % portnum) + + def _setup_tub(self, local_ip): + l = self.tub.getListeners()[0] + portnum = l.getPortnum() + self.tub.setLocation("%s:%d" % (local_ip, portnum)) + if not os.path.exists(self._portnumfile): + # record which port we're listening on, so we can grab the same + # one next time + f = open(self._portnumfile, "w") + f.write("%d\n" % portnum) + f.close() + self.tub.setLocation("%s:%d" % (local_ip, l.getPortnum())) + return self.tub + + def tub_ready(self, tub): + # this is called when the Tub has a location + pass + + def add_service(self, s): + s.setServiceParent(self) + return s + + def startService(self): + # note: this class can only be started and stopped once. + service.MultiService.startService(self) + d = get_local_ip_for() + d.addCallback(self._setup_tub) + d.addCallback(self.tub_ready) + d.addCallback(lambda res: log.msg("%s running" % self.NODETYPE)) + diff --git a/allmydata/queen.py b/allmydata/queen.py index 0a75a930..5db05788 100644 --- a/allmydata/queen.py +++ b/allmydata/queen.py @@ -1,13 +1,12 @@ -from foolscap import Tub, Referenceable +from foolscap import Referenceable from foolscap.eventual import eventually from twisted.application import service from twisted.python import log -import os.path -from allmydata.util.iputil import get_local_ip_for from allmydata.util import idlib from zope.interface import implements from allmydata.interfaces import RIQueenRoster +from allmydata import node class Roster(service.MultiService, Referenceable): implements(RIQueenRoster) @@ -45,62 +44,17 @@ class Roster(service.MultiService, Referenceable): -class Queen(service.MultiService): +class Queen(node.Node): CERTFILE = "queen.pem" PORTNUMFILE = "queen.port" + NODETYPE = "queen" - def __init__(self): - service.MultiService.__init__(self) - if os.path.exists(self.CERTFILE): - self.tub = Tub(certData=open(self.CERTFILE, "rb").read()) - else: - self.tub = Tub() - f = open(self.CERTFILE, "wb") - f.write(self.tub.getCertData()) - f.close() - portnum = 0 - if os.path.exists(self.PORTNUMFILE): - portnum = int(open(self.PORTNUMFILE, "r").read()) - self.tub.listenOn("tcp:%d" % portnum) - # we must wait until our service has started before we can find out - # our IP address and thus do tub.setLocation, and we can't register - # any services with the Tub until after that point - self.tub.setServiceParent(self) + def __init__(self, basedir="."): + node.Node.__init__(self, basedir) self.urls = {} - AUTHKEYSFILEBASE = "authorized_keys." - for f in os.listdir("."): - if f.startswith(AUTHKEYSFILEBASE): - portnum = int(f[len(AUTHKEYSFILEBASE):]) - from allmydata import manhole - m = manhole.AuthorizedKeysManhole(portnum, f) - m.setServiceParent(self) - log.msg("AuthorizedKeysManhole listening on %d" % portnum) - - def _setup_tub(self, local_ip): - l = self.tub.getListeners()[0] - portnum = l.getPortnum() - self.tub.setLocation("%s:%d" % (local_ip, portnum)) - if not os.path.exists(self.PORTNUMFILE): - # record which port we're listening on, so we can grab the same - # one next time - f = open(self.PORTNUMFILE, "w") - f.write("%d\n" % portnum) - f.close() - self.tub.setLocation("%s:%d" % (local_ip, l.getPortnum())) - return local_ip - - def _setup_services(self, local_ip): - r = Roster() - r.setServiceParent(self) + def tub_ready(self, tub): + r = self.add_service(Roster()) self.urls["roster"] = self.tub.registerReference(r, "roster") log.msg(" roster is at %s" % self.urls["roster"]) - def startService(self): - # note: this class can only be started and stopped once. - service.MultiService.startService(self) - log.msg("queen running") - d = get_local_ip_for() - d.addCallback(self._setup_tub) - d.addCallback(self._setup_services) - diff --git a/allmydata/test/test_upload.py b/allmydata/test/test_upload.py index 349133d5..8c1bb076 100644 --- a/allmydata/test/test_upload.py +++ b/allmydata/test/test_upload.py @@ -67,7 +67,7 @@ class FakeClient: return defer.fail(IndexError("no connection to that peer")) return defer.succeed(peer) -class NextPeerUploader(upload.Uploader): +class NextPeerUploader(upload.FileUploader): def _got_all_peers(self, res): return res @@ -150,4 +150,3 @@ class NextPeer(unittest.TestCase): ]) d.addCallback(_check) return d - diff --git a/allmydata/upload.py b/allmydata/upload.py index b6dc4de0..08b97c0e 100644 --- a/allmydata/upload.py +++ b/allmydata/upload.py @@ -1,7 +1,10 @@ from twisted.python import failure from twisted.internet import defer +from twisted.application import service + from allmydata.util import idlib +from allmydata import encode class NotEnoughPeersError(Exception): pass @@ -14,26 +17,33 @@ class HaveAllPeersError(Exception): class TooFullError(Exception): pass -class Uploader: +def upload_a_file(peer, filename): + u = Uploader(peer) + u.set_filehandle(open(filename,"rb")) + u.set_verifierid(hashthingy(filethingy)) + u.make_encoder() + +class FileUploader: debug = False def __init__(self, peer): self._peer = peer - def set_encoder(self, encoder): - self._encoder = encoder + def set_filehandle(self, filehandle): + self._filehandle = filehandle + filehandle.seek(0, 2) + self._size = filehandle.tell() + filehandle.seek(0) + + def make_encoder(self): + self._encoder = encode.Encoder(self._filehandle, 4) + self._shares = 4 + self._share_size = self._size def set_verifierid(self, vid): assert isinstance(vid, str) self._verifierid = vid - def set_filesize(self, size): - self._size = size - - def _calculate_parameters(self): - self._shares = 100 - self._share_size = self._size / 25 - def start(self): # first step: who should we upload to? @@ -111,3 +121,29 @@ class Uploader: d = self._encoder.do_upload(self.landlords) return d +def netstring(s): + return "%d:%s," % (len(s), s) + +class Uploader(service.MultiService): + """I am a service that allows file uploading. + """ + name = "uploader" + + def _compute_verifierid(self, filehandle): + hasher = sha.new(netstring("allmydata_v1_verifierid")) + f.seek(0) + hasher.update(f.read()) + f.seek(0) + # note: this is only of the plaintext data, no encryption yet + return hasher.digest() + + def upload_file_by_name(self, filename): + assert self.parent + assert self.running + f = open(filename, "rb") + u = FileUploader(self.parent) + u.set_verifierid(self._compute_verifierid(f)) + u.make_encoder() + d = u.start() + return d + diff --git a/client.tac b/client.tac index 7e25a8f7..16d19dbf 100644 --- a/client.tac +++ b/client.tac @@ -5,7 +5,9 @@ from twisted.application import service queen_pburl = "pb://jekyv6ghn7zinppk7wcvfmk7o4gw76hb@192.168.1.101:42552/roster" yumyum_queen = "pb://cznyjh2pi4bybn3g7pi36bdfnwz356vk@192.168.1.98:56510/roster" -c = client.Client(yumyum_queen) +c = client.Client() +c.set_queen_pburl(yumyum_queen) +#c.set_queen_pburl(queen_pburl) application = service.Application("allmydata_client") c.setServiceParent(application) -- 2.45.2