1 import os, stat, time, weakref
2 from allmydata import node
4 from zope.interface import implements
5 from twisted.internet import reactor, defer
6 from twisted.application import service
7 from twisted.application.internet import TimerService
8 from pycryptopp.publickey import rsa
11 from allmydata.storage.server import StorageServer
12 from allmydata import storage_client
13 from allmydata.immutable.upload import Uploader
14 from allmydata.immutable.offloaded import Helper
15 from allmydata.control import ControlServer
16 from allmydata.introducer.client import IntroducerClient
17 from allmydata.util import hashutil, base32, pollmixin, log, keyutil, idlib
18 from allmydata.util.encodingutil import get_filesystem_encoding
19 from allmydata.util.abbreviate import parse_abbreviated_size
20 from allmydata.util.time_format import parse_duration, parse_date
21 from allmydata.stats import StatsProvider
22 from allmydata.history import History
23 from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION
24 from allmydata.nodemaker import NodeMaker
25 from allmydata.blacklist import Blacklist
26 from allmydata.node import OldConfigOptionError
36 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
39 def __init__(self, lease_secret, convergence_secret):
40 self._lease_secret = lease_secret
41 self._convergence_secret = convergence_secret
43 def get_renewal_secret(self):
44 return hashutil.my_renewal_secret_hash(self._lease_secret)
46 def get_cancel_secret(self):
47 return hashutil.my_cancel_secret_hash(self._lease_secret)
49 def get_convergence_secret(self):
50 return self._convergence_secret
53 """I create RSA keys for mutable files. Each call to generate() returns a
54 single keypair. The keysize is specified first by the keysize= argument
55 to generate(), then with a default set by set_default_keysize(), then
56 with a built-in default of 2048 bits."""
59 self.default_keysize = 2048
61 def set_remote_generator(self, keygen):
63 def set_default_keysize(self, keysize):
64 """Call this to override the size of the RSA keys created for new
65 mutable files which don't otherwise specify a size. This will affect
66 all subsequent calls to generate() without a keysize= argument. The
67 default size is 2048 bits. Test cases should call this method once
68 during setup, to cause me to create smaller keys, so the unit tests
70 self.default_keysize = keysize
72 def generate(self, keysize=None):
73 """I return a Deferred that fires with a (verifyingkey, signingkey)
74 pair. I accept a keysize in bits (2048 bit keys are standard, smaller
75 keys are used for testing). If you do not provide a keysize, I will
76 use my default, which is set by a call to set_default_keysize(). If
77 set_default_keysize() has never been called, I will create 2048 bit
79 keysize = keysize or self.default_keysize
81 d = self._remote.callRemote('get_rsa_key_pair', keysize)
82 def make_key_objs((verifying_key, signing_key)):
83 v = rsa.create_verifying_key_from_string(verifying_key)
84 s = rsa.create_signing_key_from_string(signing_key)
86 d.addCallback(make_key_objs)
89 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
91 signer = rsa.generate(keysize)
92 verifier = signer.get_verifying_key()
93 return defer.succeed( (verifier, signer) )
95 class Terminator(service.Service):
97 self._clients = weakref.WeakKeyDictionary()
98 def register(self, c):
99 self._clients[c] = None
100 def stopService(self):
101 for c in self._clients:
103 return service.Service.stopService(self)
106 class Client(node.Node, pollmixin.PollMixin):
107 implements(IStatsProducer)
109 PORTNUMFILE = "client.port"
112 EXIT_TRIGGER_FILE = "exit_trigger"
114 # This means that if a storage server treats me as though I were a
115 # 1.0.0 storage client, it will work as they expect.
116 OLDEST_SUPPORTED_VERSION = "1.0.0"
118 # This is a dictionary of (needed, desired, total, max_segment_size). 'needed'
119 # is the number of shares required to reconstruct a file. 'desired' means
120 # that we will abort an upload unless we can allocate space for at least
121 # this many. 'total' is the total number of shares created by encoding.
122 # If everybody has room then this is is how many we will upload.
123 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
126 "max_segment_size": 128*KiB,
129 def __init__(self, basedir="."):
130 node.Node.__init__(self, basedir)
131 self.started_timestamp = time.time()
132 self.logSource="Client"
133 self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
134 self.init_introducer_client()
135 self.init_stats_provider()
141 if self.get_config("helper", "enabled", False, boolean=True):
143 self._key_generator = KeyGenerator()
144 key_gen_furl = self.get_config("client", "key_generator.furl", None)
146 self.init_key_gen(key_gen_furl)
148 # ControlServer and Helper are attached after Tub startup
149 self.init_ftp_server()
150 self.init_sftp_server()
151 self.init_drop_uploader()
153 # If the node sees an exit_trigger file, it will poll every second to see
154 # whether the file still exists, and what its mtime is. If the file does not
155 # exist or has not been modified for a given timeout, the node will exit.
156 exit_trigger_file = os.path.join(self.basedir,
157 self.EXIT_TRIGGER_FILE)
158 if os.path.exists(exit_trigger_file):
159 age = time.time() - os.stat(exit_trigger_file)[stat.ST_MTIME]
160 self.log("%s file noticed (%ds old), starting timer" % (self.EXIT_TRIGGER_FILE, age))
161 exit_trigger = TimerService(1.0, self._check_exit_trigger, exit_trigger_file)
162 exit_trigger.setServiceParent(self)
164 # this needs to happen last, so it can use getServiceNamed() to
165 # acquire references to StorageServer and other web-statusable things
166 webport = self.get_config("node", "web.port", None)
168 self.init_web(webport) # strports string
170 def _sequencer(self):
171 seqnum_s = self.get_config_from_file("announcement-seqnum")
174 seqnum = int(seqnum_s.strip())
175 seqnum += 1 # increment
176 self.write_config("announcement-seqnum", "%d\n" % seqnum)
177 nonce = _make_secret().strip()
180 def init_introducer_client(self):
181 self.introducer_furl = self.get_config("client", "introducer.furl")
182 ic = IntroducerClient(self.tub, self.introducer_furl,
184 str(allmydata.__full_version__),
185 str(self.OLDEST_SUPPORTED_VERSION),
186 self.get_app_versions(),
188 self.introducer_client = ic
189 # hold off on starting the IntroducerClient until our tub has been
190 # started, so we'll have a useful address on our RemoteReference, so
191 # that the introducer's status page will show us.
192 d = self.when_tub_ready()
193 def _start_introducer_client(res):
194 ic.setServiceParent(self)
195 d.addCallback(_start_introducer_client)
196 d.addErrback(log.err, facility="tahoe.init",
197 level=log.BAD, umid="URyI5w")
199 def init_stats_provider(self):
200 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
201 self.stats_provider = StatsProvider(self, gatherer_furl)
202 self.add_service(self.stats_provider)
203 self.stats_provider.register_producer(self)
206 return { 'node.uptime': time.time() - self.started_timestamp }
208 def init_secrets(self):
209 lease_s = self.get_or_create_private_config("secret", _make_secret)
210 lease_secret = base32.a2b(lease_s)
211 convergence_s = self.get_or_create_private_config('convergence',
213 self.convergence = base32.a2b(convergence_s)
214 self._secret_holder = SecretHolder(lease_secret, self.convergence)
216 def init_node_key(self):
217 # we only create the key once. On all subsequent runs, we re-use the
220 sk_vs,vk_vs = keyutil.make_keypair()
222 sk_vs = self.get_or_create_private_config("node.privkey", _make_key)
223 sk,vk_vs = keyutil.parse_privkey(sk_vs.strip())
224 self.write_config("node.pubkey", vk_vs+"\n")
227 def get_long_nodeid(self):
228 # this matches what IServer.get_longname() says about us elsewhere
229 vk_bytes = self._node_key.get_verifying_key_bytes()
230 return "v0-"+base32.b2a(vk_bytes)
232 def get_long_tubid(self):
233 return idlib.nodeid_b2a(self.nodeid)
235 def _init_permutation_seed(self, ss):
236 seed = self.get_config_from_file("permutation-seed")
238 have_shares = ss.have_shares()
240 # if the server has shares but not a recorded
241 # permutation-seed, then it has been around since pre-#466
242 # days, and the clients who uploaded those shares used our
243 # TubID as a permutation-seed. We should keep using that same
244 # seed to keep the shares in the same place in the permuted
245 # ring, so those clients don't have to perform excessive
247 seed = base32.b2a(self.nodeid)
249 # otherwise, we're free to use the more natural seed of our
250 # pubkey-based serverid
251 vk_bytes = self._node_key.get_verifying_key_bytes()
252 seed = base32.b2a(vk_bytes)
253 self.write_config("permutation-seed", seed+"\n")
256 def init_storage(self):
257 # should we run a storage server (and publish it for others to use)?
258 if not self.get_config("storage", "enabled", True, boolean=True):
260 readonly = self.get_config("storage", "readonly", False, boolean=True)
262 storedir = os.path.join(self.basedir, self.STOREDIR)
264 data = self.get_config("storage", "reserved_space", None)
266 reserved = parse_abbreviated_size(data)
268 log.msg("[storage]reserved_space= contains unparseable value %s"
273 discard = self.get_config("storage", "debug_discard", False,
276 expire = self.get_config("storage", "expire.enabled", False, boolean=True)
278 mode = self.get_config("storage", "expire.mode") # require a mode
280 mode = self.get_config("storage", "expire.mode", "age")
282 o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
283 if o_l_d is not None:
284 o_l_d = parse_duration(o_l_d)
287 if mode == "cutoff-date":
288 cutoff_date = self.get_config("storage", "expire.cutoff_date")
289 cutoff_date = parse_date(cutoff_date)
292 if self.get_config("storage", "expire.immutable", True, boolean=True):
293 sharetypes.append("immutable")
294 if self.get_config("storage", "expire.mutable", True, boolean=True):
295 sharetypes.append("mutable")
296 expiration_sharetypes = tuple(sharetypes)
298 ss = StorageServer(storedir, self.nodeid,
299 reserved_space=reserved,
300 discard_storage=discard,
301 readonly_storage=readonly,
302 stats_provider=self.stats_provider,
303 expiration_enabled=expire,
304 expiration_mode=mode,
305 expiration_override_lease_duration=o_l_d,
306 expiration_cutoff_date=cutoff_date,
307 expiration_sharetypes=expiration_sharetypes)
310 d = self.when_tub_ready()
311 # we can't do registerReference until the Tub is ready
313 furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
314 furl = self.tub.registerReference(ss, furlFile=furl_file)
315 ann = {"anonymous-storage-FURL": furl,
316 "permutation-seed-base32": self._init_permutation_seed(ss),
318 self.introducer_client.publish("storage", ann, self._node_key)
319 d.addCallback(_publish)
320 d.addErrback(log.err, facility="tahoe.init",
321 level=log.BAD, umid="aLGBKw")
323 def init_client(self):
324 helper_furl = self.get_config("client", "helper.furl", None)
325 if helper_furl in ("None", ""):
328 DEP = self.encoding_params
329 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
330 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
331 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
333 self.init_client_storage_broker()
334 self.history = History(self.stats_provider)
335 self.terminator = Terminator()
336 self.terminator.setServiceParent(self)
337 self.add_service(Uploader(helper_furl, self.stats_provider,
339 self.init_blacklist()
340 self.init_nodemaker()
342 def init_client_storage_broker(self):
343 # create a StorageFarmBroker object, for use by Uploader/Downloader
344 # (and everybody else who wants to use storage servers)
345 sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
346 self.storage_broker = sb
348 # load static server specifications from tahoe.cfg, if any.
349 # Not quite ready yet.
350 #if self.config.has_section("client-server-selection"):
351 # server_params = {} # maps serverid to dict of parameters
352 # for (name, value) in self.config.items("client-server-selection"):
353 # pieces = name.split(".")
354 # if pieces[0] == "server":
355 # serverid = pieces[1]
356 # if serverid not in server_params:
357 # server_params[serverid] = {}
358 # server_params[serverid][pieces[2]] = value
359 # for serverid, params in server_params.items():
360 # server_type = params.pop("type")
361 # if server_type == "tahoe-foolscap":
362 # s = storage_client.NativeStorageClient(*params)
364 # msg = ("unrecognized server type '%s' in "
365 # "tahoe.cfg [client-server-selection]server.%s.type"
366 # % (server_type, serverid))
367 # raise storage_client.UnknownServerTypeError(msg)
368 # sb.add_server(s.serverid, s)
370 # check to see if we're supposed to use the introducer too
371 if self.get_config("client-server-selection", "use_introducer",
372 default=True, boolean=True):
373 sb.use_introducer(self.introducer_client)
375 def get_storage_broker(self):
376 return self.storage_broker
378 def init_blacklist(self):
379 fn = os.path.join(self.basedir, "access.blacklist")
380 self.blacklist = Blacklist(fn)
382 def init_nodemaker(self):
383 default = self.get_config("client", "mutable.format", default="SDMF")
384 if default.upper() == "MDMF":
385 self.mutable_file_default = MDMF_VERSION
387 self.mutable_file_default = SDMF_VERSION
388 self.nodemaker = NodeMaker(self.storage_broker,
391 self.getServiceNamed("uploader"),
393 self.get_encoding_parameters(),
394 self.mutable_file_default,
398 def get_history(self):
401 def init_control(self):
402 d = self.when_tub_ready()
405 c.setServiceParent(self)
406 control_url = self.tub.registerReference(c)
407 self.write_private_config("control.furl", control_url + "\n")
408 d.addCallback(_publish)
409 d.addErrback(log.err, facility="tahoe.init",
410 level=log.BAD, umid="d3tNXA")
412 def init_helper(self):
413 d = self.when_tub_ready()
415 self.helper = Helper(os.path.join(self.basedir, "helper"),
416 self.storage_broker, self._secret_holder,
417 self.stats_provider, self.history)
418 # TODO: this is confusing. BASEDIR/private/helper.furl is created
419 # by the helper. BASEDIR/helper.furl is consumed by the client
420 # who wants to use the helper. I like having the filename be the
421 # same, since that makes 'cp' work smoothly, but the difference
422 # between config inputs and generated outputs is hard to see.
423 helper_furlfile = os.path.join(self.basedir,
424 "private", "helper.furl").encode(get_filesystem_encoding())
425 self.tub.registerReference(self.helper, furlFile=helper_furlfile)
426 d.addCallback(_publish)
427 d.addErrback(log.err, facility="tahoe.init",
428 level=log.BAD, umid="K0mW5w")
430 def init_key_gen(self, key_gen_furl):
431 d = self.when_tub_ready()
432 def _subscribe(self):
433 self.tub.connectTo(key_gen_furl, self._got_key_generator)
434 d.addCallback(_subscribe)
435 d.addErrback(log.err, facility="tahoe.init",
436 level=log.BAD, umid="z9DMzw")
438 def _got_key_generator(self, key_generator):
439 self._key_generator.set_remote_generator(key_generator)
440 key_generator.notifyOnDisconnect(self._lost_key_generator)
442 def _lost_key_generator(self):
443 self._key_generator.set_remote_generator(None)
445 def set_default_mutable_keysize(self, keysize):
446 self._key_generator.set_default_keysize(keysize)
448 def init_web(self, webport):
449 self.log("init_web(webport=%s)", args=(webport,))
451 from allmydata.webish import WebishServer
452 nodeurl_path = os.path.join(self.basedir, "node.url")
453 staticdir = self.get_config("node", "web.static", "public_html")
454 staticdir = os.path.expanduser(staticdir)
455 ws = WebishServer(self, webport, nodeurl_path, staticdir)
458 def init_ftp_server(self):
459 if self.get_config("ftpd", "enabled", False, boolean=True):
460 accountfile = self.get_config("ftpd", "accounts.file", None)
461 accounturl = self.get_config("ftpd", "accounts.url", None)
462 ftp_portstr = self.get_config("ftpd", "port", "8021")
464 from allmydata.frontends import ftpd
465 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
466 s.setServiceParent(self)
468 def init_sftp_server(self):
469 if self.get_config("sftpd", "enabled", False, boolean=True):
470 accountfile = self.get_config("sftpd", "accounts.file", None)
471 accounturl = self.get_config("sftpd", "accounts.url", None)
472 sftp_portstr = self.get_config("sftpd", "port", "8022")
473 pubkey_file = self.get_config("sftpd", "host_pubkey_file")
474 privkey_file = self.get_config("sftpd", "host_privkey_file")
476 from allmydata.frontends import sftpd
477 s = sftpd.SFTPServer(self, accountfile, accounturl,
478 sftp_portstr, pubkey_file, privkey_file)
479 s.setServiceParent(self)
481 def init_drop_uploader(self):
482 if self.get_config("drop_upload", "enabled", False, boolean=True):
483 if self.get_config("drop_upload", "upload.dircap", None):
484 raise OldConfigOptionError("The [drop_upload]upload.dircap option is no longer supported; please "
485 "put the cap in a 'private/drop_upload_dircap' file, and delete this option.")
487 upload_dircap = self.get_or_create_private_config("drop_upload_dircap")
488 local_dir_utf8 = self.get_config("drop_upload", "local.directory")
491 from allmydata.frontends import drop_upload
492 s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
493 s.setServiceParent(self)
496 self.log("couldn't start drop-uploader: %r", args=(e,))
498 def _check_exit_trigger(self, exit_trigger_file):
499 if os.path.exists(exit_trigger_file):
500 mtime = os.stat(exit_trigger_file)[stat.ST_MTIME]
501 if mtime > time.time() - 120.0:
504 self.log("%s file too old, shutting down" % (self.EXIT_TRIGGER_FILE,))
506 self.log("%s file missing, shutting down" % (self.EXIT_TRIGGER_FILE,))
509 def get_encoding_parameters(self):
510 return self.encoding_params
512 def connected_to_introducer(self):
513 if self.introducer_client:
514 return self.introducer_client.connected_to_introducer()
517 def get_renewal_secret(self): # this will go away
518 return self._secret_holder.get_renewal_secret()
520 def get_cancel_secret(self):
521 return self._secret_holder.get_cancel_secret()
523 def debug_wait_for_client_connections(self, num_clients):
524 """Return a Deferred that fires (with None) when we have connections
525 to the given number of peers. Useful for tests that set up a
526 temporary test network and need to know when it is safe to proceed
527 with an upload or download."""
529 return len(self.storage_broker.get_connected_servers()) >= num_clients
530 d = self.poll(_check, 0.5)
531 d.addCallback(lambda res: None)
535 # these four methods are the primitives for creating filenodes and
536 # dirnodes. The first takes a URI and produces a filenode or (new-style)
537 # dirnode. The other three create brand-new filenodes/dirnodes.
539 def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
540 # This returns synchronously.
541 # Note that it does *not* validate the write_uri and read_uri; instead we
542 # may get an opaque node if there were any problems.
543 return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
545 def create_dirnode(self, initial_children={}, version=None):
546 d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
549 def create_immutable_dirnode(self, children, convergence=None):
550 return self.nodemaker.create_immutable_directory(children, convergence)
552 def create_mutable_file(self, contents=None, keysize=None, version=None):
553 return self.nodemaker.create_mutable_file(contents, keysize,
556 def upload(self, uploadable):
557 uploader = self.getServiceNamed("uploader")
558 return uploader.upload(uploadable)