1 import os, stat, time, weakref
2 from allmydata import node
3 from base64 import urlsafe_b64encode
5 from zope.interface import implements
6 from twisted.internet import reactor, defer
7 from twisted.application import service
8 from twisted.application.internet import TimerService
9 from pycryptopp.publickey import rsa
12 from allmydata.storage.server import StorageServer
13 from allmydata import storage_client
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.offloaded import Helper
16 from allmydata.control import ControlServer
17 from allmydata.introducer.client import IntroducerClient
18 from allmydata.util import hashutil, base32, pollmixin, log, keyutil, idlib
19 from allmydata.util.encodingutil import get_filesystem_encoding, \
21 from allmydata.util.fileutil import abspath_expanduser_unicode
22 from allmydata.util.abbreviate import parse_abbreviated_size
23 from allmydata.util.time_format import parse_duration, parse_date
24 from allmydata.stats import StatsProvider
25 from allmydata.history import History
26 from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION
27 from allmydata.nodemaker import NodeMaker
28 from allmydata.blacklist import Blacklist
29 from allmydata.node import OldConfigOptionError
39 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
42 def __init__(self, lease_secret, convergence_secret):
43 self._lease_secret = lease_secret
44 self._convergence_secret = convergence_secret
46 def get_renewal_secret(self):
47 return hashutil.my_renewal_secret_hash(self._lease_secret)
49 def get_cancel_secret(self):
50 return hashutil.my_cancel_secret_hash(self._lease_secret)
52 def get_convergence_secret(self):
53 return self._convergence_secret
56 """I create RSA keys for mutable files. Each call to generate() returns a
57 single keypair. The keysize is specified first by the keysize= argument
58 to generate(), then with a default set by set_default_keysize(), then
59 with a built-in default of 2048 bits."""
62 self.default_keysize = 2048
64 def set_remote_generator(self, keygen):
66 def set_default_keysize(self, keysize):
67 """Call this to override the size of the RSA keys created for new
68 mutable files which don't otherwise specify a size. This will affect
69 all subsequent calls to generate() without a keysize= argument. The
70 default size is 2048 bits. Test cases should call this method once
71 during setup, to cause me to create smaller keys, so the unit tests
73 self.default_keysize = keysize
75 def generate(self, keysize=None):
76 """I return a Deferred that fires with a (verifyingkey, signingkey)
77 pair. I accept a keysize in bits (2048 bit keys are standard, smaller
78 keys are used for testing). If you do not provide a keysize, I will
79 use my default, which is set by a call to set_default_keysize(). If
80 set_default_keysize() has never been called, I will create 2048 bit
82 keysize = keysize or self.default_keysize
84 d = self._remote.callRemote('get_rsa_key_pair', keysize)
85 def make_key_objs((verifying_key, signing_key)):
86 v = rsa.create_verifying_key_from_string(verifying_key)
87 s = rsa.create_signing_key_from_string(signing_key)
89 d.addCallback(make_key_objs)
92 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
94 signer = rsa.generate(keysize)
95 verifier = signer.get_verifying_key()
96 return defer.succeed( (verifier, signer) )
98 class Terminator(service.Service):
100 self._clients = weakref.WeakKeyDictionary()
101 def register(self, c):
102 self._clients[c] = None
103 def stopService(self):
104 for c in self._clients:
106 return service.Service.stopService(self)
109 class Client(node.Node, pollmixin.PollMixin):
110 implements(IStatsProducer)
112 PORTNUMFILE = "client.port"
115 EXIT_TRIGGER_FILE = "exit_trigger"
117 # This means that if a storage server treats me as though I were a
118 # 1.0.0 storage client, it will work as they expect.
119 OLDEST_SUPPORTED_VERSION = "1.0.0"
121 # This is a dictionary of (needed, desired, total, max_segment_size). 'needed'
122 # is the number of shares required to reconstruct a file. 'desired' means
123 # that we will abort an upload unless we can allocate space for at least
124 # this many. 'total' is the total number of shares created by encoding.
125 # If everybody has room then this is is how many we will upload.
126 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
129 "max_segment_size": 128*KiB,
132 def __init__(self, basedir="."):
133 #print "Client.__init__(%r)" % (basedir,)
134 node.Node.__init__(self, basedir)
135 self.connected_enough_d = defer.Deferred()
136 self.started_timestamp = time.time()
137 self.logSource="Client"
138 self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
139 self.init_introducer_client()
140 self.init_stats_provider()
146 if self.get_config("helper", "enabled", False, boolean=True):
148 self._key_generator = KeyGenerator()
149 key_gen_furl = self.get_config("client", "key_generator.furl", None)
151 self.init_key_gen(key_gen_furl)
153 # ControlServer and Helper are attached after Tub startup
154 self.init_ftp_server()
155 self.init_sftp_server()
156 self.init_magic_folder()
158 # If the node sees an exit_trigger file, it will poll every second to see
159 # whether the file still exists, and what its mtime is. If the file does not
160 # exist or has not been modified for a given timeout, the node will exit.
161 exit_trigger_file = os.path.join(self.basedir,
162 self.EXIT_TRIGGER_FILE)
163 if os.path.exists(exit_trigger_file):
164 age = time.time() - os.stat(exit_trigger_file)[stat.ST_MTIME]
165 self.log("%s file noticed (%ds old), starting timer" % (self.EXIT_TRIGGER_FILE, age))
166 exit_trigger = TimerService(1.0, self._check_exit_trigger, exit_trigger_file)
167 exit_trigger.setServiceParent(self)
169 # this needs to happen last, so it can use getServiceNamed() to
170 # acquire references to StorageServer and other web-statusable things
171 webport = self.get_config("node", "web.port", None)
173 self.init_web(webport) # strports string
175 def _sequencer(self):
176 seqnum_s = self.get_config_from_file("announcement-seqnum")
179 seqnum = int(seqnum_s.strip())
180 seqnum += 1 # increment
181 self.write_config("announcement-seqnum", "%d\n" % seqnum)
182 nonce = _make_secret().strip()
185 def init_introducer_client(self):
186 self.introducer_furl = self.get_config("client", "introducer.furl")
187 ic = IntroducerClient(self.tub, self.introducer_furl,
189 str(allmydata.__full_version__),
190 str(self.OLDEST_SUPPORTED_VERSION),
191 self.get_app_versions(),
193 self.introducer_client = ic
194 # hold off on starting the IntroducerClient until our tub has been
195 # started, so we'll have a useful address on our RemoteReference, so
196 # that the introducer's status page will show us.
197 d = self.when_tub_ready()
198 def _start_introducer_client(res):
199 ic.setServiceParent(self)
200 d.addCallback(_start_introducer_client)
201 d.addErrback(log.err, facility="tahoe.init",
202 level=log.BAD, umid="URyI5w")
204 def init_stats_provider(self):
205 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
206 self.stats_provider = StatsProvider(self, gatherer_furl)
207 self.add_service(self.stats_provider)
208 self.stats_provider.register_producer(self)
211 return { 'node.uptime': time.time() - self.started_timestamp }
213 def init_secrets(self):
214 lease_s = self.get_or_create_private_config("secret", _make_secret)
215 lease_secret = base32.a2b(lease_s)
216 convergence_s = self.get_or_create_private_config('convergence',
218 self.convergence = base32.a2b(convergence_s)
219 self._secret_holder = SecretHolder(lease_secret, self.convergence)
221 def init_node_key(self):
222 # we only create the key once. On all subsequent runs, we re-use the
225 sk_vs,vk_vs = keyutil.make_keypair()
227 sk_vs = self.get_or_create_private_config("node.privkey", _make_key)
228 sk,vk_vs = keyutil.parse_privkey(sk_vs.strip())
229 self.write_config("node.pubkey", vk_vs+"\n")
232 def get_long_nodeid(self):
233 # this matches what IServer.get_longname() says about us elsewhere
234 vk_bytes = self._node_key.get_verifying_key_bytes()
235 return "v0-"+base32.b2a(vk_bytes)
237 def get_long_tubid(self):
238 return idlib.nodeid_b2a(self.nodeid)
240 def _init_permutation_seed(self, ss):
241 seed = self.get_config_from_file("permutation-seed")
243 have_shares = ss.have_shares()
245 # if the server has shares but not a recorded
246 # permutation-seed, then it has been around since pre-#466
247 # days, and the clients who uploaded those shares used our
248 # TubID as a permutation-seed. We should keep using that same
249 # seed to keep the shares in the same place in the permuted
250 # ring, so those clients don't have to perform excessive
252 seed = base32.b2a(self.nodeid)
254 # otherwise, we're free to use the more natural seed of our
255 # pubkey-based serverid
256 vk_bytes = self._node_key.get_verifying_key_bytes()
257 seed = base32.b2a(vk_bytes)
258 self.write_config("permutation-seed", seed+"\n")
261 def init_storage(self):
262 # should we run a storage server (and publish it for others to use)?
263 if not self.get_config("storage", "enabled", True, boolean=True):
265 readonly = self.get_config("storage", "readonly", False, boolean=True)
267 storedir = os.path.join(self.basedir, self.STOREDIR)
269 data = self.get_config("storage", "reserved_space", None)
271 reserved = parse_abbreviated_size(data)
273 log.msg("[storage]reserved_space= contains unparseable value %s"
278 discard = self.get_config("storage", "debug_discard", False,
281 expire = self.get_config("storage", "expire.enabled", False, boolean=True)
283 mode = self.get_config("storage", "expire.mode") # require a mode
285 mode = self.get_config("storage", "expire.mode", "age")
287 o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
288 if o_l_d is not None:
289 o_l_d = parse_duration(o_l_d)
292 if mode == "cutoff-date":
293 cutoff_date = self.get_config("storage", "expire.cutoff_date")
294 cutoff_date = parse_date(cutoff_date)
297 if self.get_config("storage", "expire.immutable", True, boolean=True):
298 sharetypes.append("immutable")
299 if self.get_config("storage", "expire.mutable", True, boolean=True):
300 sharetypes.append("mutable")
301 expiration_sharetypes = tuple(sharetypes)
303 ss = StorageServer(storedir, self.nodeid,
304 reserved_space=reserved,
305 discard_storage=discard,
306 readonly_storage=readonly,
307 stats_provider=self.stats_provider,
308 expiration_enabled=expire,
309 expiration_mode=mode,
310 expiration_override_lease_duration=o_l_d,
311 expiration_cutoff_date=cutoff_date,
312 expiration_sharetypes=expiration_sharetypes)
315 d = self.when_tub_ready()
316 # we can't do registerReference until the Tub is ready
318 furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
319 furl = self.tub.registerReference(ss, furlFile=furl_file)
320 ann = {"anonymous-storage-FURL": furl,
321 "permutation-seed-base32": self._init_permutation_seed(ss),
323 self.introducer_client.publish("storage", ann, self._node_key)
324 d.addCallback(_publish)
325 d.addErrback(log.err, facility="tahoe.init",
326 level=log.BAD, umid="aLGBKw")
328 def init_client(self):
329 helper_furl = self.get_config("client", "helper.furl", None)
330 if helper_furl in ("None", ""):
333 DEP = self.encoding_params
334 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
335 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
336 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
338 # for the CLI to authenticate to local JSON endpoints
339 self._auth_token = self._create_or_read_auth_token()
341 self.init_client_storage_broker()
342 self.history = History(self.stats_provider)
343 self.terminator = Terminator()
344 self.terminator.setServiceParent(self)
345 self.add_service(Uploader(helper_furl, self.stats_provider,
347 self.init_blacklist()
348 self.init_nodemaker()
350 def get_auth_token(self):
352 This returns a local authentication token, which is just some
353 random data in "api_auth_token" which must be echoed to API
356 Currently only the URI '/magic' for magic-folder status; other
357 endpoints are invited to include this as well, as appropriate.
359 return self._auth_token
361 def _create_or_read_auth_token(self):
363 This returns the current auth-token data, possibly creating it and
364 writing 'private/api_auth_token' in the process.
366 fname = os.path.join(self.basedir, 'private', 'api_auth_token')
368 with open(fname, 'rb') as f:
370 except (OSError, IOError):
371 log.msg("Creating '%s'." % (fname,))
372 with open(fname, 'wb') as f:
373 data = urlsafe_b64encode(os.urandom(32))
377 def init_client_storage_broker(self):
378 # create a StorageFarmBroker object, for use by Uploader/Downloader
379 # (and everybody else who wants to use storage servers)
380 ps = self.get_config("client", "peers.preferred", "").split(",")
381 preferred_peers = tuple([p.strip() for p in ps if p != ""])
383 connection_threshold = min(self.encoding_params["k"],
384 self.encoding_params["happy"] + 1)
386 sb = storage_client.StorageFarmBroker(self.tub, True, connection_threshold,
387 self.connected_enough_d, preferred_peers=preferred_peers)
388 self.storage_broker = sb
390 # load static server specifications from tahoe.cfg, if any.
391 # Not quite ready yet.
392 #if self.config.has_section("client-server-selection"):
393 # server_params = {} # maps serverid to dict of parameters
394 # for (name, value) in self.config.items("client-server-selection"):
395 # pieces = name.split(".")
396 # if pieces[0] == "server":
397 # serverid = pieces[1]
398 # if serverid not in server_params:
399 # server_params[serverid] = {}
400 # server_params[serverid][pieces[2]] = value
401 # for serverid, params in server_params.items():
402 # server_type = params.pop("type")
403 # if server_type == "tahoe-foolscap":
404 # s = storage_client.NativeStorageClient(*params)
406 # msg = ("unrecognized server type '%s' in "
407 # "tahoe.cfg [client-server-selection]server.%s.type"
408 # % (server_type, serverid))
409 # raise storage_client.UnknownServerTypeError(msg)
410 # sb.add_server(s.serverid, s)
412 # check to see if we're supposed to use the introducer too
413 if self.get_config("client-server-selection", "use_introducer",
414 default=True, boolean=True):
415 sb.use_introducer(self.introducer_client)
417 def get_storage_broker(self):
418 return self.storage_broker
420 def init_blacklist(self):
421 fn = os.path.join(self.basedir, "access.blacklist")
422 self.blacklist = Blacklist(fn)
424 def init_nodemaker(self):
425 default = self.get_config("client", "mutable.format", default="SDMF")
426 if default.upper() == "MDMF":
427 self.mutable_file_default = MDMF_VERSION
429 self.mutable_file_default = SDMF_VERSION
430 self.nodemaker = NodeMaker(self.storage_broker,
433 self.getServiceNamed("uploader"),
435 self.get_encoding_parameters(),
436 self.mutable_file_default,
440 def get_history(self):
443 def init_control(self):
444 d = self.when_tub_ready()
447 c.setServiceParent(self)
448 control_url = self.tub.registerReference(c)
449 self.write_private_config("control.furl", control_url + "\n")
450 d.addCallback(_publish)
451 d.addErrback(log.err, facility="tahoe.init",
452 level=log.BAD, umid="d3tNXA")
454 def init_helper(self):
455 d = self.when_tub_ready()
457 self.helper = Helper(os.path.join(self.basedir, "helper"),
458 self.storage_broker, self._secret_holder,
459 self.stats_provider, self.history)
460 # TODO: this is confusing. BASEDIR/private/helper.furl is created
461 # by the helper. BASEDIR/helper.furl is consumed by the client
462 # who wants to use the helper. I like having the filename be the
463 # same, since that makes 'cp' work smoothly, but the difference
464 # between config inputs and generated outputs is hard to see.
465 helper_furlfile = os.path.join(self.basedir,
466 "private", "helper.furl").encode(get_filesystem_encoding())
467 self.tub.registerReference(self.helper, furlFile=helper_furlfile)
468 d.addCallback(_publish)
469 d.addErrback(log.err, facility="tahoe.init",
470 level=log.BAD, umid="K0mW5w")
472 def init_key_gen(self, key_gen_furl):
473 d = self.when_tub_ready()
474 def _subscribe(self):
475 self.tub.connectTo(key_gen_furl, self._got_key_generator)
476 d.addCallback(_subscribe)
477 d.addErrback(log.err, facility="tahoe.init",
478 level=log.BAD, umid="z9DMzw")
480 def _got_key_generator(self, key_generator):
481 self._key_generator.set_remote_generator(key_generator)
482 key_generator.notifyOnDisconnect(self._lost_key_generator)
484 def _lost_key_generator(self):
485 self._key_generator.set_remote_generator(None)
487 def set_default_mutable_keysize(self, keysize):
488 self._key_generator.set_default_keysize(keysize)
490 def init_web(self, webport):
491 self.log("init_web(webport=%s)", args=(webport,))
493 from allmydata.webish import WebishServer
494 nodeurl_path = os.path.join(self.basedir, "node.url")
495 staticdir_config = self.get_config("node", "web.static", "public_html").decode("utf-8")
496 staticdir = abspath_expanduser_unicode(staticdir_config, base=self.basedir)
497 ws = WebishServer(self, webport, nodeurl_path, staticdir)
500 def init_ftp_server(self):
501 if self.get_config("ftpd", "enabled", False, boolean=True):
502 accountfile = from_utf8_or_none(
503 self.get_config("ftpd", "accounts.file", None))
505 accountfile = abspath_expanduser_unicode(accountfile, base=self.basedir)
506 accounturl = self.get_config("ftpd", "accounts.url", None)
507 ftp_portstr = self.get_config("ftpd", "port", "8021")
509 from allmydata.frontends import ftpd
510 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
511 s.setServiceParent(self)
513 def init_sftp_server(self):
514 if self.get_config("sftpd", "enabled", False, boolean=True):
515 accountfile = from_utf8_or_none(
516 self.get_config("sftpd", "accounts.file", None))
518 accountfile = abspath_expanduser_unicode(accountfile, base=self.basedir)
519 accounturl = self.get_config("sftpd", "accounts.url", None)
520 sftp_portstr = self.get_config("sftpd", "port", "8022")
521 pubkey_file = from_utf8_or_none(self.get_config("sftpd", "host_pubkey_file"))
522 privkey_file = from_utf8_or_none(self.get_config("sftpd", "host_privkey_file"))
524 from allmydata.frontends import sftpd
525 s = sftpd.SFTPServer(self, accountfile, accounturl,
526 sftp_portstr, pubkey_file, privkey_file)
527 s.setServiceParent(self)
529 def init_magic_folder(self):
530 #print "init_magic_folder"
531 if self.get_config("drop_upload", "enabled", False, boolean=True):
532 raise OldConfigOptionError("The [drop_upload] section must be renamed to [magic_folder].\n"
533 "See docs/frontends/magic-folder.rst for more information.")
535 if self.get_config("magic_folder", "enabled", False, boolean=True):
536 #print "magic folder enabled"
537 upload_dircap = self.get_private_config("magic_folder_dircap")
538 collective_dircap = self.get_private_config("collective_dircap")
540 local_dir_config = self.get_config("magic_folder", "local.directory").decode("utf-8")
541 local_dir = abspath_expanduser_unicode(local_dir_config, base=self.basedir)
543 dbfile = os.path.join(self.basedir, "private", "magicfolderdb.sqlite")
544 dbfile = abspath_expanduser_unicode(dbfile)
546 from allmydata.frontends import magic_folder
547 umask = self.get_config("magic_folder", "download.umask", 0077)
548 s = magic_folder.MagicFolder(self, upload_dircap, collective_dircap, local_dir, dbfile, umask)
549 self._magic_folder = s
550 s.setServiceParent(self)
553 # start processing the upload queue when we've connected to enough servers
554 self.connected_enough_d.addCallback(lambda ign: s.ready())
556 def _check_exit_trigger(self, exit_trigger_file):
557 if os.path.exists(exit_trigger_file):
558 mtime = os.stat(exit_trigger_file)[stat.ST_MTIME]
559 if mtime > time.time() - 120.0:
562 self.log("%s file too old, shutting down" % (self.EXIT_TRIGGER_FILE,))
564 self.log("%s file missing, shutting down" % (self.EXIT_TRIGGER_FILE,))
567 def get_encoding_parameters(self):
568 return self.encoding_params
570 def connected_to_introducer(self):
571 if self.introducer_client:
572 return self.introducer_client.connected_to_introducer()
575 def get_renewal_secret(self): # this will go away
576 return self._secret_holder.get_renewal_secret()
578 def get_cancel_secret(self):
579 return self._secret_holder.get_cancel_secret()
581 def debug_wait_for_client_connections(self, num_clients):
582 """Return a Deferred that fires (with None) when we have connections
583 to the given number of peers. Useful for tests that set up a
584 temporary test network and need to know when it is safe to proceed
585 with an upload or download."""
587 return len(self.storage_broker.get_connected_servers()) >= num_clients
588 d = self.poll(_check, 0.5)
589 d.addCallback(lambda res: None)
593 # these four methods are the primitives for creating filenodes and
594 # dirnodes. The first takes a URI and produces a filenode or (new-style)
595 # dirnode. The other three create brand-new filenodes/dirnodes.
597 def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
598 # This returns synchronously.
599 # Note that it does *not* validate the write_uri and read_uri; instead we
600 # may get an opaque node if there were any problems.
601 return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
603 def create_dirnode(self, initial_children={}, version=None):
604 d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
607 def create_immutable_dirnode(self, children, convergence=None):
608 return self.nodemaker.create_immutable_directory(children, convergence)
610 def create_mutable_file(self, contents=None, keysize=None, version=None):
611 return self.nodemaker.create_mutable_file(contents, keysize,
614 def upload(self, uploadable):
615 uploader = self.getServiceNamed("uploader")
616 return uploader.upload(uploadable)