-import os, stat, time
-from allmydata.interfaces import RIStorageServer
+import os, stat, time, weakref
from allmydata import node
from zope.interface import implements
from twisted.internet import reactor, defer
+from twisted.application import service
from twisted.application.internet import TimerService
-from foolscap.api import Referenceable
from pycryptopp.publickey import rsa
import allmydata
from allmydata.storage.server import StorageServer
from allmydata import storage_client
from allmydata.immutable.upload import Uploader
-from allmydata.immutable.download import Downloader
from allmydata.immutable.offloaded import Helper
from allmydata.control import ControlServer
from allmydata.introducer.client import IntroducerClient
-from allmydata.util import hashutil, base32, pollmixin, cachedir, log
+from allmydata.util import hashutil, base32, pollmixin, log, keyutil
+from allmydata.util.encodingutil import get_filesystem_encoding
from allmydata.util.abbreviate import parse_abbreviated_size
from allmydata.util.time_format import parse_duration, parse_date
from allmydata.stats import StatsProvider
from allmydata.history import History
-from allmydata.interfaces import IStatsProducer, RIStubClient
+from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION
from allmydata.nodemaker import NodeMaker
+from allmydata.blacklist import Blacklist
+from allmydata.node import OldConfigOptionError
KiB=1024
TiB=1024*GiB
PiB=1024*TiB
-class StubClient(Referenceable):
- implements(RIStubClient)
-
def _make_secret():
return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
class SecretHolder:
- def __init__(self, lease_secret):
+ def __init__(self, lease_secret, convergence_secret):
self._lease_secret = lease_secret
+ self._convergence_secret = convergence_secret
def get_renewal_secret(self):
return hashutil.my_renewal_secret_hash(self._lease_secret)
def get_cancel_secret(self):
return hashutil.my_cancel_secret_hash(self._lease_secret)
+ def get_convergence_secret(self):
+ return self._convergence_secret
+
class KeyGenerator:
+ """I create RSA keys for mutable files. Each call to generate() returns a
+ single keypair. The keysize is specified first by the keysize= argument
+ to generate(), then with a default set by set_default_keysize(), then
+ with a built-in default of 2048 bits."""
def __init__(self):
self._remote = None
self.default_keysize = 2048
self._remote = keygen
def set_default_keysize(self, keysize):
"""Call this to override the size of the RSA keys created for new
- mutable files. The default of None means to let mutable.filenode
- choose its own size, which means 2048 bits."""
+ mutable files which don't otherwise specify a size. This will affect
+ all subsequent calls to generate() without a keysize= argument. The
+ default size is 2048 bits. Test cases should call this method once
+ during setup, to cause me to create smaller keys, so the unit tests
+ run faster."""
self.default_keysize = keysize
def generate(self, keysize=None):
+ """I return a Deferred that fires with a (verifyingkey, signingkey)
+ pair. I accept a keysize in bits (2048 bit keys are standard, smaller
+ keys are used for testing). If you do not provide a keysize, I will
+ use my default, which is set by a call to set_default_keysize(). If
+ set_default_keysize() has never been called, I will create 2048 bit
+ keys."""
keysize = keysize or self.default_keysize
if self._remote:
d = self._remote.callRemote('get_rsa_key_pair', keysize)
verifier = signer.get_verifying_key()
return defer.succeed( (verifier, signer) )
+class Terminator(service.Service):
+ def __init__(self):
+ self._clients = weakref.WeakKeyDictionary()
+ def register(self, c):
+ self._clients[c] = None
+ def stopService(self):
+ for c in self._clients:
+ c.stop()
+ return service.Service.stopService(self)
+
class Client(node.Node, pollmixin.PollMixin):
implements(IStatsProducer)
self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
self.init_introducer_client()
self.init_stats_provider()
- self.init_lease_secret()
+ self.init_secrets()
self.init_storage()
self.init_control()
+ self.helper = None
if self.get_config("helper", "enabled", False, boolean=True):
self.init_helper()
self._key_generator = KeyGenerator()
# ControlServer and Helper are attached after Tub startup
self.init_ftp_server()
self.init_sftp_server()
+ self.init_drop_uploader()
hotline_file = os.path.join(self.basedir,
self.SUICIDE_PREVENTION_HOTLINE_FILE)
if webport:
self.init_web(webport) # strports string
- def read_old_config_files(self):
- node.Node.read_old_config_files(self)
- copy = self._copy_config_from_file
- copy("introducer.furl", "client", "introducer.furl")
- copy("helper.furl", "client", "helper.furl")
- copy("key_generator.furl", "client", "key_generator.furl")
- copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
- if os.path.exists(os.path.join(self.basedir, "no_storage")):
- self.set_config("storage", "enabled", "false")
- if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
- self.set_config("storage", "readonly", "true")
- if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
- self.set_config("storage", "debug_discard", "true")
- if os.path.exists(os.path.join(self.basedir, "run_helper")):
- self.set_config("helper", "enabled", "true")
-
def init_introducer_client(self):
self.introducer_furl = self.get_config("client", "introducer.furl")
ic = IntroducerClient(self.tub, self.introducer_furl,
self.nickname,
str(allmydata.__full_version__),
- str(self.OLDEST_SUPPORTED_VERSION))
+ str(self.OLDEST_SUPPORTED_VERSION),
+ self.get_app_versions())
self.introducer_client = ic
# hold off on starting the IntroducerClient until our tub has been
# started, so we'll have a useful address on our RemoteReference, so
def get_stats(self):
return { 'node.uptime': time.time() - self.started_timestamp }
- def init_lease_secret(self):
- secret_s = self.get_or_create_private_config("secret", _make_secret)
- lease_secret = base32.a2b(secret_s)
- self._secret_holder = SecretHolder(lease_secret)
+ def init_secrets(self):
+ lease_s = self.get_or_create_private_config("secret", _make_secret)
+ lease_secret = base32.a2b(lease_s)
+ convergence_s = self.get_or_create_private_config('convergence',
+ _make_secret)
+ self.convergence = base32.a2b(convergence_s)
+ self._secret_holder = SecretHolder(lease_secret, self.convergence)
+
+ def _maybe_create_node_key(self):
+ # we only create the key once. On all subsequent runs, we re-use the
+ # existing key
+ def _make_key():
+ sk_vs,vk_vs = keyutil.make_keypair()
+ return sk_vs+"\n"
+ # for a while (between releases, before 1.10) this was known as
+ # server.privkey, but now it lives in node.privkey. This fallback can
+ # be removed after 1.10 is released.
+ sk_vs = self.get_private_config("server.privkey", None)
+ if not sk_vs:
+ sk_vs = self.get_or_create_private_config("node.privkey", _make_key)
+ sk,vk_vs = keyutil.parse_privkey(sk_vs.strip())
+ self.write_config("node.pubkey", vk_vs+"\n")
+ self._server_key = sk
+
+ def _init_permutation_seed(self, ss):
+ seed = self.get_config_from_file("permutation-seed")
+ if not seed:
+ have_shares = ss.have_shares()
+ if have_shares:
+ # if the server has shares but not a recorded
+ # permutation-seed, then it has been around since pre-#466
+ # days, and the clients who uploaded those shares used our
+ # TubID as a permutation-seed. We should keep using that same
+ # seed to keep the shares in the same place in the permuted
+ # ring, so those clients don't have to perform excessive
+ # searches.
+ seed = base32.b2a(self.nodeid)
+ else:
+ # otherwise, we're free to use the more natural seed of our
+ # pubkey-based serverid
+ vk_bytes = self._server_key.get_verifying_key_bytes()
+ seed = base32.b2a(vk_bytes)
+ self.write_config("permutation-seed", seed+"\n")
+ return seed.strip()
def init_storage(self):
# should we run a storage server (and publish it for others to use)?
return
readonly = self.get_config("storage", "readonly", False, boolean=True)
+ self._maybe_create_node_key()
+
storedir = os.path.join(self.basedir, self.STOREDIR)
data = self.get_config("storage", "reserved_space", None)
d = self.when_tub_ready()
# we can't do registerReference until the Tub is ready
def _publish(res):
- furl_file = os.path.join(self.basedir, "private", "storage.furl")
+ furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
furl = self.tub.registerReference(ss, furlFile=furl_file)
- ri_name = RIStorageServer.__remote_name__
- self.introducer_client.publish(furl, "storage", ri_name)
+ ann = {"anonymous-storage-FURL": furl,
+ "permutation-seed-base32": self._init_permutation_seed(ss),
+ }
+ self.introducer_client.publish("storage", ann, self._server_key)
d.addCallback(_publish)
d.addErrback(log.err, facility="tahoe.init",
level=log.BAD, umid="aLGBKw")
def init_client(self):
helper_furl = self.get_config("client", "helper.furl", None)
+ if helper_furl in ("None", ""):
+ helper_furl = None
+
DEP = self.DEFAULT_ENCODING_PARAMETERS
DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
- convergence_s = self.get_or_create_private_config('convergence', _make_secret)
- self.convergence = base32.a2b(convergence_s)
self.init_client_storage_broker()
- self.history = self.add_service(History(self.stats_provider))
- self.add_service(Uploader(helper_furl, self.stats_provider))
- download_cachedir = os.path.join(self.basedir,
- "private", "cache", "download")
- self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
- self.download_cache_dirman.setServiceParent(self)
- self.add_service(Downloader(self.stats_provider))
- self.init_stub_client()
+ self.history = History(self.stats_provider)
+ self.terminator = Terminator()
+ self.terminator.setServiceParent(self)
+ self.add_service(Uploader(helper_furl, self.stats_provider,
+ self.history))
+ self.init_blacklist()
self.init_nodemaker()
def init_client_storage_broker(self):
def get_storage_broker(self):
return self.storage_broker
- def init_stub_client(self):
- def _publish(res):
- # we publish an empty object so that the introducer can count how
- # many clients are connected and see what versions they're
- # running.
- sc = StubClient()
- furl = self.tub.registerReference(sc)
- ri_name = RIStubClient.__remote_name__
- self.introducer_client.publish(furl, "stub_client", ri_name)
- d = self.when_tub_ready()
- d.addCallback(_publish)
- d.addErrback(log.err, facility="tahoe.init",
- level=log.BAD, umid="OEHq3g")
+ def init_blacklist(self):
+ fn = os.path.join(self.basedir, "access.blacklist")
+ self.blacklist = Blacklist(fn)
def init_nodemaker(self):
+ default = self.get_config("client", "mutable.format", default="SDMF")
+ if default.upper() == "MDMF":
+ self.mutable_file_default = MDMF_VERSION
+ else:
+ self.mutable_file_default = SDMF_VERSION
self.nodemaker = NodeMaker(self.storage_broker,
self._secret_holder,
self.get_history(),
self.getServiceNamed("uploader"),
- self.getServiceNamed("downloader"),
- self.download_cache_dirman,
+ self.terminator,
self.get_encoding_parameters(),
- self._key_generator)
+ self.mutable_file_default,
+ self._key_generator,
+ self.blacklist)
def get_history(self):
- return self.getServiceNamed("history")
+ return self.history
def init_control(self):
d = self.when_tub_ready()
def init_helper(self):
d = self.when_tub_ready()
def _publish(self):
- h = Helper(os.path.join(self.basedir, "helper"),
- self.stats_provider, self.history)
- h.setServiceParent(self)
+ self.helper = Helper(os.path.join(self.basedir, "helper"),
+ self.storage_broker, self._secret_holder,
+ self.stats_provider, self.history)
# TODO: this is confusing. BASEDIR/private/helper.furl is created
# by the helper. BASEDIR/helper.furl is consumed by the client
# who wants to use the helper. I like having the filename be the
# same, since that makes 'cp' work smoothly, but the difference
# between config inputs and generated outputs is hard to see.
helper_furlfile = os.path.join(self.basedir,
- "private", "helper.furl")
- self.tub.registerReference(h, furlFile=helper_furlfile)
+ "private", "helper.furl").encode(get_filesystem_encoding())
+ self.tub.registerReference(self.helper, furlFile=helper_furlfile)
d.addCallback(_publish)
d.addErrback(log.err, facility="tahoe.init",
level=log.BAD, umid="K0mW5w")
sftp_portstr, pubkey_file, privkey_file)
s.setServiceParent(self)
+ def init_drop_uploader(self):
+ if self.get_config("drop_upload", "enabled", False, boolean=True):
+ if self.get_config("drop_upload", "upload.dircap", None):
+ raise OldConfigOptionError("The [drop_upload]upload.dircap option is no longer supported; please "
+ "put the cap in a 'private/drop_upload_dircap' file, and delete this option.")
+
+ upload_dircap = self.get_or_create_private_config("drop_upload_dircap")
+ local_dir_utf8 = self.get_config("drop_upload", "local.directory")
+
+ try:
+ from allmydata.frontends import drop_upload
+ s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
+ s.setServiceParent(self)
+ s.startService()
+ except Exception, e:
+ self.log("couldn't start drop-uploader: %r", args=(e,))
+
def _check_hotline(self, hotline_file):
if os.path.exists(hotline_file):
mtime = os.stat(hotline_file)[stat.ST_MTIME]
temporary test network and need to know when it is safe to proceed
with an upload or download."""
def _check():
- return len(self.storage_broker.get_all_servers()) >= num_clients
+ return len(self.storage_broker.get_connected_servers()) >= num_clients
d = self.poll(_check, 0.5)
d.addCallback(lambda res: None)
return d
# dirnodes. The first takes a URI and produces a filenode or (new-style)
# dirnode. The other three create brand-new filenodes/dirnodes.
- def create_node_from_uri(self, writecap, readcap=None):
- # this returns synchronously.
- return self.nodemaker.create_from_cap(writecap, readcap)
+ def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
+ # This returns synchronously.
+ # Note that it does *not* validate the write_uri and read_uri; instead we
+ # may get an opaque node if there were any problems.
+ return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
+
+ def create_dirnode(self, initial_children={}, version=None):
+ d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
+ return d
- def create_empty_dirnode(self):
- return self.nodemaker.create_new_mutable_directory()
+ def create_immutable_dirnode(self, children, convergence=None):
+ return self.nodemaker.create_immutable_directory(children, convergence)
- def create_mutable_file(self, contents="", keysize=None):
- return self.nodemaker.create_mutable_file(contents, keysize)
+ def create_mutable_file(self, contents=None, keysize=None, version=None):
+ return self.nodemaker.create_mutable_file(contents, keysize,
+ version=version)
def upload(self, uploadable):
uploader = self.getServiceNamed("uploader")
- return uploader.upload(uploadable, history=self.get_history())
+ return uploader.upload(uploadable)