]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/client.py
Add a test, add missing imports. refs #2388
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
index b997f306ed2b85a845821f163fb580cf894f144e..bb6dce23096d4d0af7b89973af4e08394a0a460b 100644 (file)
@@ -1,31 +1,32 @@
 import os, stat, time, weakref
-from allmydata.interfaces import RIStorageServer
 from allmydata import node
 
 from zope.interface import implements
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
+from twisted.application import service
 from twisted.application.internet import TimerService
-from foolscap import Referenceable
-from foolscap.logging import log
 from pycryptopp.publickey import rsa
 
 import allmydata
 from allmydata.storage.server import StorageServer
+from allmydata import storage_client
 from allmydata.immutable.upload import Uploader
-from allmydata.immutable.download import Downloader
-from allmydata.immutable.filenode import FileNode, LiteralFileNode
 from allmydata.immutable.offloaded import Helper
 from allmydata.control import ControlServer
 from allmydata.introducer.client import IntroducerClient
-from allmydata.util import hashutil, base32, pollmixin, cachedir
+from allmydata.util import hashutil, base32, pollmixin, log, keyutil, idlib
+from allmydata.util.encodingutil import get_filesystem_encoding, \
+     from_utf8_or_none
+from allmydata.util.fileutil import abspath_expanduser_unicode
 from allmydata.util.abbreviate import parse_abbreviated_size
-from allmydata.uri import LiteralFileURI
-from allmydata.dirnode import NewDirectoryNode
-from allmydata.mutable.filenode import MutableFileNode
+from allmydata.util.time_format import parse_duration, parse_date
 from allmydata.stats import StatsProvider
 from allmydata.history import History
-from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
-     IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
+from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION
+from allmydata.nodemaker import NodeMaker
+from allmydata.blacklist import Blacklist
+from allmydata.node import OldConfigOptionError
+
 
 KiB=1024
 MiB=1024*KiB
@@ -33,25 +34,90 @@ GiB=1024*MiB
 TiB=1024*GiB
 PiB=1024*TiB
 
-class StubClient(Referenceable):
-    implements(RIStubClient)
-
 def _make_secret():
     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
 
+class SecretHolder:
+    def __init__(self, lease_secret, convergence_secret):
+        self._lease_secret = lease_secret
+        self._convergence_secret = convergence_secret
+
+    def get_renewal_secret(self):
+        return hashutil.my_renewal_secret_hash(self._lease_secret)
+
+    def get_cancel_secret(self):
+        return hashutil.my_cancel_secret_hash(self._lease_secret)
+
+    def get_convergence_secret(self):
+        return self._convergence_secret
+
+class KeyGenerator:
+    """I create RSA keys for mutable files. Each call to generate() returns a
+    single keypair. The keysize is specified first by the keysize= argument
+    to generate(), then with a default set by set_default_keysize(), then
+    with a built-in default of 2048 bits."""
+    def __init__(self):
+        self._remote = None
+        self.default_keysize = 2048
+
+    def set_remote_generator(self, keygen):
+        self._remote = keygen
+    def set_default_keysize(self, keysize):
+        """Call this to override the size of the RSA keys created for new
+        mutable files which don't otherwise specify a size. This will affect
+        all subsequent calls to generate() without a keysize= argument. The
+        default size is 2048 bits. Test cases should call this method once
+        during setup, to cause me to create smaller keys, so the unit tests
+        run faster."""
+        self.default_keysize = keysize
+
+    def generate(self, keysize=None):
+        """I return a Deferred that fires with a (verifyingkey, signingkey)
+        pair. I accept a keysize in bits (2048 bit keys are standard, smaller
+        keys are used for testing). If you do not provide a keysize, I will
+        use my default, which is set by a call to set_default_keysize(). If
+        set_default_keysize() has never been called, I will create 2048 bit
+        keys."""
+        keysize = keysize or self.default_keysize
+        if self._remote:
+            d = self._remote.callRemote('get_rsa_key_pair', keysize)
+            def make_key_objs((verifying_key, signing_key)):
+                v = rsa.create_verifying_key_from_string(verifying_key)
+                s = rsa.create_signing_key_from_string(signing_key)
+                return v, s
+            d.addCallback(make_key_objs)
+            return d
+        else:
+            # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
+            # secs
+            signer = rsa.generate(keysize)
+            verifier = signer.get_verifying_key()
+            return defer.succeed( (verifier, signer) )
+
+class Terminator(service.Service):
+    def __init__(self):
+        self._clients = weakref.WeakKeyDictionary()
+    def register(self, c):
+        self._clients[c] = None
+    def stopService(self):
+        for c in self._clients:
+            c.stop()
+        return service.Service.stopService(self)
+
+
 class Client(node.Node, pollmixin.PollMixin):
     implements(IStatsProducer)
 
     PORTNUMFILE = "client.port"
     STOREDIR = 'storage'
     NODETYPE = "client"
-    SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
+    EXIT_TRIGGER_FILE = "exit_trigger"
 
     # This means that if a storage server treats me as though I were a
     # 1.0.0 storage client, it will work as they expect.
     OLDEST_SUPPORTED_VERSION = "1.0.0"
 
-    # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
+    # This is a dictionary of (needed, desired, total, max_segment_size). 'needed'
     # is the number of shares required to reconstruct a file. 'desired' means
     # that we will abort an upload unless we can allocate space for at least
     # this many. 'total' is the total number of shares created by encoding.
@@ -66,30 +132,36 @@ class Client(node.Node, pollmixin.PollMixin):
         node.Node.__init__(self, basedir)
         self.started_timestamp = time.time()
         self.logSource="Client"
-        self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
+        self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
         self.init_introducer_client()
         self.init_stats_provider()
-        self.init_lease_secret()
+        self.init_secrets()
+        self.init_node_key()
         self.init_storage()
         self.init_control()
+        self.helper = None
         if self.get_config("helper", "enabled", False, boolean=True):
             self.init_helper()
-        self.init_client()
-        self._key_generator = None
+        self._key_generator = KeyGenerator()
         key_gen_furl = self.get_config("client", "key_generator.furl", None)
         if key_gen_furl:
             self.init_key_gen(key_gen_furl)
+        self.init_client()
         # ControlServer and Helper are attached after Tub startup
         self.init_ftp_server()
         self.init_sftp_server()
-
-        hotline_file = os.path.join(self.basedir,
-                                    self.SUICIDE_PREVENTION_HOTLINE_FILE)
-        if os.path.exists(hotline_file):
-            age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
-            self.log("hotline file noticed (%ds old), starting timer" % age)
-            hotline = TimerService(1.0, self._check_hotline, hotline_file)
-            hotline.setServiceParent(self)
+        self.init_drop_uploader()
+
+        # If the node sees an exit_trigger file, it will poll every second to see
+        # whether the file still exists, and what its mtime is. If the file does not
+        # exist or has not been modified for a given timeout, the node will exit.
+        exit_trigger_file = os.path.join(self.basedir,
+                                         self.EXIT_TRIGGER_FILE)
+        if os.path.exists(exit_trigger_file):
+            age = time.time() - os.stat(exit_trigger_file)[stat.ST_MTIME]
+            self.log("%s file noticed (%ds old), starting timer" % (self.EXIT_TRIGGER_FILE, age))
+            exit_trigger = TimerService(1.0, self._check_exit_trigger, exit_trigger_file)
+            exit_trigger.setServiceParent(self)
 
         # this needs to happen last, so it can use getServiceNamed() to
         # acquire references to StorageServer and other web-statusable things
@@ -97,28 +169,24 @@ class Client(node.Node, pollmixin.PollMixin):
         if webport:
             self.init_web(webport) # strports string
 
-    def read_old_config_files(self):
-        node.Node.read_old_config_files(self)
-        copy = self._copy_config_from_file
-        copy("introducer.furl", "client", "introducer.furl")
-        copy("helper.furl", "client", "helper.furl")
-        copy("key_generator.furl", "client", "key_generator.furl")
-        copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
-        if os.path.exists(os.path.join(self.basedir, "no_storage")):
-            self.set_config("storage", "enabled", "false")
-        if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
-            self.set_config("storage", "readonly", "true")
-        if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
-            self.set_config("storage", "debug_discard", "true")
-        if os.path.exists(os.path.join(self.basedir, "run_helper")):
-            self.set_config("helper", "enabled", "true")
+    def _sequencer(self):
+        seqnum_s = self.get_config_from_file("announcement-seqnum")
+        if not seqnum_s:
+            seqnum_s = "0"
+        seqnum = int(seqnum_s.strip())
+        seqnum += 1 # increment
+        self.write_config("announcement-seqnum", "%d\n" % seqnum)
+        nonce = _make_secret().strip()
+        return seqnum, nonce
 
     def init_introducer_client(self):
         self.introducer_furl = self.get_config("client", "introducer.furl")
         ic = IntroducerClient(self.tub, self.introducer_furl,
                               self.nickname,
                               str(allmydata.__full_version__),
-                              str(self.OLDEST_SUPPORTED_VERSION))
+                              str(self.OLDEST_SUPPORTED_VERSION),
+                              self.get_app_versions(),
+                              self._sequencer)
         self.introducer_client = ic
         # hold off on starting the IntroducerClient until our tub has been
         # started, so we'll have a useful address on our RemoteReference, so
@@ -126,8 +194,6 @@ class Client(node.Node, pollmixin.PollMixin):
         d = self.when_tub_ready()
         def _start_introducer_client(res):
             ic.setServiceParent(self)
-            # nodes that want to upload and download will need storage servers
-            ic.subscribe_to("storage")
         d.addCallback(_start_introducer_client)
         d.addErrback(log.err, facility="tahoe.init",
                      level=log.BAD, umid="URyI5w")
@@ -141,9 +207,53 @@ class Client(node.Node, pollmixin.PollMixin):
     def get_stats(self):
         return { 'node.uptime': time.time() - self.started_timestamp }
 
-    def init_lease_secret(self):
-        secret_s = self.get_or_create_private_config("secret", _make_secret)
-        self._lease_secret = base32.a2b(secret_s)
+    def init_secrets(self):
+        lease_s = self.get_or_create_private_config("secret", _make_secret)
+        lease_secret = base32.a2b(lease_s)
+        convergence_s = self.get_or_create_private_config('convergence',
+                                                          _make_secret)
+        self.convergence = base32.a2b(convergence_s)
+        self._secret_holder = SecretHolder(lease_secret, self.convergence)
+
+    def init_node_key(self):
+        # we only create the key once. On all subsequent runs, we re-use the
+        # existing key
+        def _make_key():
+            sk_vs,vk_vs = keyutil.make_keypair()
+            return sk_vs+"\n"
+        sk_vs = self.get_or_create_private_config("node.privkey", _make_key)
+        sk,vk_vs = keyutil.parse_privkey(sk_vs.strip())
+        self.write_config("node.pubkey", vk_vs+"\n")
+        self._node_key = sk
+
+    def get_long_nodeid(self):
+        # this matches what IServer.get_longname() says about us elsewhere
+        vk_bytes = self._node_key.get_verifying_key_bytes()
+        return "v0-"+base32.b2a(vk_bytes)
+
+    def get_long_tubid(self):
+        return idlib.nodeid_b2a(self.nodeid)
+
+    def _init_permutation_seed(self, ss):
+        seed = self.get_config_from_file("permutation-seed")
+        if not seed:
+            have_shares = ss.have_shares()
+            if have_shares:
+                # if the server has shares but not a recorded
+                # permutation-seed, then it has been around since pre-#466
+                # days, and the clients who uploaded those shares used our
+                # TubID as a permutation-seed. We should keep using that same
+                # seed to keep the shares in the same place in the permuted
+                # ring, so those clients don't have to perform excessive
+                # searches.
+                seed = base32.b2a(self.nodeid)
+            else:
+                # otherwise, we're free to use the more natural seed of our
+                # pubkey-based serverid
+                vk_bytes = self._node_key.get_verifying_key_bytes()
+                seed = base32.b2a(vk_bytes)
+            self.write_config("permutation-seed", seed+"\n")
+        return seed.strip()
 
     def init_storage(self):
         # should we run a storage server (and publish it for others to use)?
@@ -154,67 +264,141 @@ class Client(node.Node, pollmixin.PollMixin):
         storedir = os.path.join(self.basedir, self.STOREDIR)
 
         data = self.get_config("storage", "reserved_space", None)
-        reserved = None
         try:
             reserved = parse_abbreviated_size(data)
         except ValueError:
             log.msg("[storage]reserved_space= contains unparseable value %s"
                     % data)
+            raise
         if reserved is None:
             reserved = 0
         discard = self.get_config("storage", "debug_discard", False,
                                   boolean=True)
+
+        expire = self.get_config("storage", "expire.enabled", False, boolean=True)
+        if expire:
+            mode = self.get_config("storage", "expire.mode") # require a mode
+        else:
+            mode = self.get_config("storage", "expire.mode", "age")
+
+        o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
+        if o_l_d is not None:
+            o_l_d = parse_duration(o_l_d)
+
+        cutoff_date = None
+        if mode == "cutoff-date":
+            cutoff_date = self.get_config("storage", "expire.cutoff_date")
+            cutoff_date = parse_date(cutoff_date)
+
+        sharetypes = []
+        if self.get_config("storage", "expire.immutable", True, boolean=True):
+            sharetypes.append("immutable")
+        if self.get_config("storage", "expire.mutable", True, boolean=True):
+            sharetypes.append("mutable")
+        expiration_sharetypes = tuple(sharetypes)
+
         ss = StorageServer(storedir, self.nodeid,
                            reserved_space=reserved,
                            discard_storage=discard,
                            readonly_storage=readonly,
-                           stats_provider=self.stats_provider)
+                           stats_provider=self.stats_provider,
+                           expiration_enabled=expire,
+                           expiration_mode=mode,
+                           expiration_override_lease_duration=o_l_d,
+                           expiration_cutoff_date=cutoff_date,
+                           expiration_sharetypes=expiration_sharetypes)
         self.add_service(ss)
+
         d = self.when_tub_ready()
         # we can't do registerReference until the Tub is ready
         def _publish(res):
-            furl_file = os.path.join(self.basedir, "private", "storage.furl")
+            furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
             furl = self.tub.registerReference(ss, furlFile=furl_file)
-            ri_name = RIStorageServer.__remote_name__
-            self.introducer_client.publish(furl, "storage", ri_name)
+            ann = {"anonymous-storage-FURL": furl,
+                   "permutation-seed-base32": self._init_permutation_seed(ss),
+                   }
+            self.introducer_client.publish("storage", ann, self._node_key)
         d.addCallback(_publish)
         d.addErrback(log.err, facility="tahoe.init",
                      level=log.BAD, umid="aLGBKw")
 
     def init_client(self):
         helper_furl = self.get_config("client", "helper.furl", None)
-        DEP = self.DEFAULT_ENCODING_PARAMETERS
+        if helper_furl in ("None", ""):
+            helper_furl = None
+
+        DEP = self.encoding_params
         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
-        convergence_s = self.get_or_create_private_config('convergence', _make_secret)
-        self.convergence = base32.a2b(convergence_s)
-        self._node_cache = weakref.WeakValueDictionary() # uri -> node
-        self.add_service(History(self.stats_provider))
-        self.add_service(Uploader(helper_furl, self.stats_provider))
-        download_cachedir = os.path.join(self.basedir,
-                                         "private", "cache", "download")
-        self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
-        self.download_cache.setServiceParent(self)
-        self.add_service(Downloader(self.stats_provider))
-        self.init_stub_client()
-
-    def init_stub_client(self):
-        def _publish(res):
-            # we publish an empty object so that the introducer can count how
-            # many clients are connected and see what versions they're
-            # running.
-            sc = StubClient()
-            furl = self.tub.registerReference(sc)
-            ri_name = RIStubClient.__remote_name__
-            self.introducer_client.publish(furl, "stub_client", ri_name)
-        d = self.when_tub_ready()
-        d.addCallback(_publish)
-        d.addErrback(log.err, facility="tahoe.init",
-                     level=log.BAD, umid="OEHq3g")
+
+        self.init_client_storage_broker()
+        self.history = History(self.stats_provider)
+        self.terminator = Terminator()
+        self.terminator.setServiceParent(self)
+        self.add_service(Uploader(helper_furl, self.stats_provider,
+                                  self.history))
+        self.init_blacklist()
+        self.init_nodemaker()
+
+    def init_client_storage_broker(self):
+        # create a StorageFarmBroker object, for use by Uploader/Downloader
+        # (and everybody else who wants to use storage servers)
+        sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
+        self.storage_broker = sb
+
+        # load static server specifications from tahoe.cfg, if any.
+        # Not quite ready yet.
+        #if self.config.has_section("client-server-selection"):
+        #    server_params = {} # maps serverid to dict of parameters
+        #    for (name, value) in self.config.items("client-server-selection"):
+        #        pieces = name.split(".")
+        #        if pieces[0] == "server":
+        #            serverid = pieces[1]
+        #            if serverid not in server_params:
+        #                server_params[serverid] = {}
+        #            server_params[serverid][pieces[2]] = value
+        #    for serverid, params in server_params.items():
+        #        server_type = params.pop("type")
+        #        if server_type == "tahoe-foolscap":
+        #            s = storage_client.NativeStorageClient(*params)
+        #        else:
+        #            msg = ("unrecognized server type '%s' in "
+        #                   "tahoe.cfg [client-server-selection]server.%s.type"
+        #                   % (server_type, serverid))
+        #            raise storage_client.UnknownServerTypeError(msg)
+        #        sb.add_server(s.serverid, s)
+
+        # check to see if we're supposed to use the introducer too
+        if self.get_config("client-server-selection", "use_introducer",
+                           default=True, boolean=True):
+            sb.use_introducer(self.introducer_client)
+
+    def get_storage_broker(self):
+        return self.storage_broker
+
+    def init_blacklist(self):
+        fn = os.path.join(self.basedir, "access.blacklist")
+        self.blacklist = Blacklist(fn)
+
+    def init_nodemaker(self):
+        default = self.get_config("client", "mutable.format", default="SDMF")
+        if default.upper() == "MDMF":
+            self.mutable_file_default = MDMF_VERSION
+        else:
+            self.mutable_file_default = SDMF_VERSION
+        self.nodemaker = NodeMaker(self.storage_broker,
+                                   self._secret_holder,
+                                   self.get_history(),
+                                   self.getServiceNamed("uploader"),
+                                   self.terminator,
+                                   self.get_encoding_parameters(),
+                                   self.mutable_file_default,
+                                   self._key_generator,
+                                   self.blacklist)
 
     def get_history(self):
-        return self.getServiceNamed("history")
+        return self.history
 
     def init_control(self):
         d = self.when_tub_ready()
@@ -230,16 +414,17 @@ class Client(node.Node, pollmixin.PollMixin):
     def init_helper(self):
         d = self.when_tub_ready()
         def _publish(self):
-            h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
-            h.setServiceParent(self)
+            self.helper = Helper(os.path.join(self.basedir, "helper"),
+                                 self.storage_broker, self._secret_holder,
+                                 self.stats_provider, self.history)
             # TODO: this is confusing. BASEDIR/private/helper.furl is created
             # by the helper. BASEDIR/helper.furl is consumed by the client
             # who wants to use the helper. I like having the filename be the
             # same, since that makes 'cp' work smoothly, but the difference
             # between config inputs and generated outputs is hard to see.
             helper_furlfile = os.path.join(self.basedir,
-                                           "private", "helper.furl")
-            self.tub.registerReference(h, furlFile=helper_furlfile)
+                                           "private", "helper.furl").encode(get_filesystem_encoding())
+            self.tub.registerReference(self.helper, furlFile=helper_furlfile)
         d.addCallback(_publish)
         d.addErrback(log.err, facility="tahoe.init",
                      level=log.BAD, umid="K0mW5w")
@@ -253,30 +438,31 @@ class Client(node.Node, pollmixin.PollMixin):
                      level=log.BAD, umid="z9DMzw")
 
     def _got_key_generator(self, key_generator):
-        self._key_generator = key_generator
+        self._key_generator.set_remote_generator(key_generator)
         key_generator.notifyOnDisconnect(self._lost_key_generator)
 
     def _lost_key_generator(self):
-        self._key_generator = None
+        self._key_generator.set_remote_generator(None)
 
-    def get_servers(self, service_name):
-        """ Return frozenset of (peerid, versioned-rref) """
-        assert isinstance(service_name, str)
-        return self.introducer_client.get_peers(service_name)
+    def set_default_mutable_keysize(self, keysize):
+        self._key_generator.set_default_keysize(keysize)
 
     def init_web(self, webport):
         self.log("init_web(webport=%s)", args=(webport,))
 
         from allmydata.webish import WebishServer
         nodeurl_path = os.path.join(self.basedir, "node.url")
-        staticdir = self.get_config("node", "web.static", "public_html")
-        staticdir = os.path.expanduser(staticdir)
+        staticdir_config = self.get_config("node", "web.static", "public_html").decode("utf-8")
+        staticdir = abspath_expanduser_unicode(staticdir_config, base=self.basedir)
         ws = WebishServer(self, webport, nodeurl_path, staticdir)
         self.add_service(ws)
 
     def init_ftp_server(self):
         if self.get_config("ftpd", "enabled", False, boolean=True):
-            accountfile = self.get_config("ftpd", "accounts.file", None)
+            accountfile = from_utf8_or_none(
+                self.get_config("ftpd", "accounts.file", None))
+            if accountfile:
+                accountfile = abspath_expanduser_unicode(accountfile, base=self.basedir)
             accounturl = self.get_config("ftpd", "accounts.url", None)
             ftp_portstr = self.get_config("ftpd", "port", "8021")
 
@@ -286,54 +472,61 @@ class Client(node.Node, pollmixin.PollMixin):
 
     def init_sftp_server(self):
         if self.get_config("sftpd", "enabled", False, boolean=True):
-            accountfile = self.get_config("sftpd", "accounts.file", None)
+            accountfile = from_utf8_or_none(
+                self.get_config("sftpd", "accounts.file", None))
+            if accountfile:
+                accountfile = abspath_expanduser_unicode(accountfile, base=self.basedir)
             accounturl = self.get_config("sftpd", "accounts.url", None)
             sftp_portstr = self.get_config("sftpd", "port", "8022")
-            pubkey_file = self.get_config("sftpd", "host_pubkey_file")
-            privkey_file = self.get_config("sftpd", "host_privkey_file")
+            pubkey_file = from_utf8_or_none(self.get_config("sftpd", "host_pubkey_file"))
+            privkey_file = from_utf8_or_none(self.get_config("sftpd", "host_privkey_file"))
 
             from allmydata.frontends import sftpd
             s = sftpd.SFTPServer(self, accountfile, accounturl,
                                  sftp_portstr, pubkey_file, privkey_file)
             s.setServiceParent(self)
 
-    def _check_hotline(self, hotline_file):
-        if os.path.exists(hotline_file):
-            mtime = os.stat(hotline_file)[stat.ST_MTIME]
+    def init_drop_uploader(self):
+        if self.get_config("drop_upload", "enabled", False, boolean=True):
+            if self.get_config("drop_upload", "upload.dircap", None):
+                raise OldConfigOptionError("The [drop_upload]upload.dircap option is no longer supported; please "
+                                           "put the cap in a 'private/drop_upload_dircap' file, and delete this option.")
+
+            upload_dircap = self.get_or_create_private_config("drop_upload_dircap")
+            local_dir_utf8 = self.get_config("drop_upload", "local.directory")
+
+            try:
+                from allmydata.frontends import drop_upload
+                s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
+                s.setServiceParent(self)
+                s.startService()
+            except Exception, e:
+                self.log("couldn't start drop-uploader: %r", args=(e,))
+
+    def _check_exit_trigger(self, exit_trigger_file):
+        if os.path.exists(exit_trigger_file):
+            mtime = os.stat(exit_trigger_file)[stat.ST_MTIME]
             if mtime > time.time() - 120.0:
                 return
             else:
-                self.log("hotline file too old, shutting down")
+                self.log("%s file too old, shutting down" % (self.EXIT_TRIGGER_FILE,))
         else:
-            self.log("hotline file missing, shutting down")
+            self.log("%s file missing, shutting down" % (self.EXIT_TRIGGER_FILE,))
         reactor.stop()
 
-    def get_all_peerids(self):
-        return self.introducer_client.get_all_peerids()
-    def get_nickname_for_peerid(self, peerid):
-        return self.introducer_client.get_nickname_for_peerid(peerid)
-
-    def get_permuted_peers(self, service_name, key):
-        """
-        @return: list of (peerid, connection,)
-        """
-        assert isinstance(service_name, str)
-        assert isinstance(key, str)
-        return self.introducer_client.get_permuted_peers(service_name, key)
-
     def get_encoding_parameters(self):
-        return self.DEFAULT_ENCODING_PARAMETERS
+        return self.encoding_params
 
     def connected_to_introducer(self):
         if self.introducer_client:
             return self.introducer_client.connected_to_introducer()
         return False
 
-    def get_renewal_secret(self):
-        return hashutil.my_renewal_secret_hash(self._lease_secret)
+    def get_renewal_secret(self): # this will go away
+        return self._secret_holder.get_renewal_secret()
 
     def get_cancel_secret(self):
-        return hashutil.my_cancel_secret_hash(self._lease_secret)
+        return self._secret_holder.get_cancel_secret()
 
     def debug_wait_for_client_connections(self, num_clients):
         """Return a Deferred that fires (with None) when we have connections
@@ -341,8 +534,7 @@ class Client(node.Node, pollmixin.PollMixin):
         temporary test network and need to know when it is safe to proceed
         with an upload or download."""
         def _check():
-            current_clients = list(self.get_all_peerids())
-            return len(current_clients) >= num_clients
+            return len(self.storage_broker.get_connected_servers()) >= num_clients
         d = self.poll(_check, 0.5)
         d.addCallback(lambda res: None)
         return d
@@ -352,79 +544,23 @@ class Client(node.Node, pollmixin.PollMixin):
     # dirnodes. The first takes a URI and produces a filenode or (new-style)
     # dirnode. The other three create brand-new filenodes/dirnodes.
 
-    def create_node_from_uri(self, u):
-        # this returns synchronously.
-        u = IURI(u)
-        u_s = u.to_string()
-        if u_s not in self._node_cache:
-            if IReadonlyNewDirectoryURI.providedBy(u):
-                # new-style read-only dirnodes
-                node = NewDirectoryNode(self).init_from_uri(u)
-            elif INewDirectoryURI.providedBy(u):
-                # new-style dirnodes
-                node = NewDirectoryNode(self).init_from_uri(u)
-            elif IFileURI.providedBy(u):
-                if isinstance(u, LiteralFileURI):
-                    node = LiteralFileNode(u, self) # LIT
-                else:
-                    key = base32.b2a(u.storage_index)
-                    cachefile = self.download_cache.get_file(key)
-                    node = FileNode(u, self, cachefile) # CHK
-            else:
-                assert IMutableFileURI.providedBy(u), u
-                node = MutableFileNode(self).init_from_uri(u)
-            self._node_cache[u_s] = node
-        return self._node_cache[u_s]
-
-    def create_empty_dirnode(self):
-        n = NewDirectoryNode(self)
-        d = n.create(self._generate_pubprivkeys)
-        d.addCallback(lambda res: n)
-        return d
+    def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
+        # This returns synchronously.
+        # Note that it does *not* validate the write_uri and read_uri; instead we
+        # may get an opaque node if there were any problems.
+        return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
 
-    def create_mutable_file(self, contents=""):
-        n = MutableFileNode(self)
-        d = n.create(contents, self._generate_pubprivkeys)
-        d.addCallback(lambda res: n)
+    def create_dirnode(self, initial_children={}, version=None):
+        d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
         return d
 
-    def _generate_pubprivkeys(self, key_size):
-        if self._key_generator:
-            d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
-            def make_key_objs((verifying_key, signing_key)):
-                v = rsa.create_verifying_key_from_string(verifying_key)
-                s = rsa.create_signing_key_from_string(signing_key)
-                return v, s
-            d.addCallback(make_key_objs)
-            return d
-        else:
-            # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
-            # secs
-            signer = rsa.generate(key_size)
-            verifier = signer.get_verifying_key()
-            return verifier, signer
+    def create_immutable_dirnode(self, children, convergence=None):
+        return self.nodemaker.create_immutable_directory(children, convergence)
+
+    def create_mutable_file(self, contents=None, keysize=None, version=None):
+        return self.nodemaker.create_mutable_file(contents, keysize,
+                                                  version=version)
 
     def upload(self, uploadable):
         uploader = self.getServiceNamed("uploader")
-        return uploader.upload(uploadable, history=self.get_history())
-
-
-    def list_all_upload_statuses(self):
-        return self.get_history().list_all_upload_statuses()
-
-    def list_all_download_statuses(self):
-        return self.get_history().list_all_download_statuses()
-
-    def list_all_mapupdate_statuses(self):
-        return self.get_history().list_all_mapupdate_statuses()
-    def list_all_publish_statuses(self):
-        return self.get_history().list_all_publish_statuses()
-    def list_all_retrieve_statuses(self):
-        return self.get_history().list_all_retrieve_statuses()
-
-    def list_all_helper_statuses(self):
-        try:
-            helper = self.getServiceNamed("helper")
-        except KeyError:
-            return []
-        return helper.get_all_upload_statuses()
+        return uploader.upload(uploadable)