1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
5 from zope.interface import implements
6 from twisted.internet import reactor, defer
7 from twisted.application import service
8 from twisted.application.internet import TimerService
9 from foolscap.api import Referenceable
10 from pycryptopp.publickey import rsa
13 from allmydata.storage.server import StorageServer
14 from allmydata import storage_client
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.offloaded import Helper
17 from allmydata.control import ControlServer
18 from allmydata.introducer.client import IntroducerClient
19 from allmydata.util import hashutil, base32, pollmixin, log
20 from allmydata.util.encodingutil import get_filesystem_encoding
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.stats import StatsProvider
24 from allmydata.history import History
25 from allmydata.interfaces import IStatsProducer, RIStubClient, \
26 SDMF_VERSION, MDMF_VERSION
27 from allmydata.nodemaker import NodeMaker
28 from allmydata.blacklist import Blacklist
29 from allmydata.node import OldConfigOptionError
38 class StubClient(Referenceable):
39 implements(RIStubClient)
42 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
45 def __init__(self, lease_secret, convergence_secret):
46 self._lease_secret = lease_secret
47 self._convergence_secret = convergence_secret
49 def get_renewal_secret(self):
50 return hashutil.my_renewal_secret_hash(self._lease_secret)
52 def get_cancel_secret(self):
53 return hashutil.my_cancel_secret_hash(self._lease_secret)
55 def get_convergence_secret(self):
56 return self._convergence_secret
59 """I create RSA keys for mutable files. Each call to generate() returns a
60 single keypair. The keysize is specified first by the keysize= argument
61 to generate(), then with a default set by set_default_keysize(), then
62 with a built-in default of 2048 bits."""
65 self.default_keysize = 2048
67 def set_remote_generator(self, keygen):
69 def set_default_keysize(self, keysize):
70 """Call this to override the size of the RSA keys created for new
71 mutable files which don't otherwise specify a size. This will affect
72 all subsequent calls to generate() without a keysize= argument. The
73 default size is 2048 bits. Test cases should call this method once
74 during setup, to cause me to create smaller keys, so the unit tests
76 self.default_keysize = keysize
78 def generate(self, keysize=None):
79 """I return a Deferred that fires with a (verifyingkey, signingkey)
80 pair. I accept a keysize in bits (2048 bit keys are standard, smaller
81 keys are used for testing). If you do not provide a keysize, I will
82 use my default, which is set by a call to set_default_keysize(). If
83 set_default_keysize() has never been called, I will create 2048 bit
85 keysize = keysize or self.default_keysize
87 d = self._remote.callRemote('get_rsa_key_pair', keysize)
88 def make_key_objs((verifying_key, signing_key)):
89 v = rsa.create_verifying_key_from_string(verifying_key)
90 s = rsa.create_signing_key_from_string(signing_key)
92 d.addCallback(make_key_objs)
95 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
97 signer = rsa.generate(keysize)
98 verifier = signer.get_verifying_key()
99 return defer.succeed( (verifier, signer) )
101 class Terminator(service.Service):
103 self._clients = weakref.WeakKeyDictionary()
104 def register(self, c):
105 self._clients[c] = None
106 def stopService(self):
107 for c in self._clients:
109 return service.Service.stopService(self)
112 class Client(node.Node, pollmixin.PollMixin):
113 implements(IStatsProducer)
115 PORTNUMFILE = "client.port"
118 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
120 # This means that if a storage server treats me as though I were a
121 # 1.0.0 storage client, it will work as they expect.
122 OLDEST_SUPPORTED_VERSION = "1.0.0"
124 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
125 # is the number of shares required to reconstruct a file. 'desired' means
126 # that we will abort an upload unless we can allocate space for at least
127 # this many. 'total' is the total number of shares created by encoding.
128 # If everybody has room then this is is how many we will upload.
129 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
132 "max_segment_size": 128*KiB,
135 def __init__(self, basedir="."):
136 node.Node.__init__(self, basedir)
137 self.started_timestamp = time.time()
138 self.logSource="Client"
139 self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
140 self.init_introducer_client()
141 self.init_stats_provider()
146 if self.get_config("helper", "enabled", False, boolean=True):
148 self._key_generator = KeyGenerator()
149 key_gen_furl = self.get_config("client", "key_generator.furl", None)
151 self.init_key_gen(key_gen_furl)
153 # ControlServer and Helper are attached after Tub startup
154 self.init_ftp_server()
155 self.init_sftp_server()
156 self.init_drop_uploader()
158 hotline_file = os.path.join(self.basedir,
159 self.SUICIDE_PREVENTION_HOTLINE_FILE)
160 if os.path.exists(hotline_file):
161 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
162 self.log("hotline file noticed (%ds old), starting timer" % age)
163 hotline = TimerService(1.0, self._check_hotline, hotline_file)
164 hotline.setServiceParent(self)
166 # this needs to happen last, so it can use getServiceNamed() to
167 # acquire references to StorageServer and other web-statusable things
168 webport = self.get_config("node", "web.port", None)
170 self.init_web(webport) # strports string
172 def init_introducer_client(self):
173 self.introducer_furl = self.get_config("client", "introducer.furl")
174 ic = IntroducerClient(self.tub, self.introducer_furl,
176 str(allmydata.__full_version__),
177 str(self.OLDEST_SUPPORTED_VERSION))
178 self.introducer_client = ic
179 # hold off on starting the IntroducerClient until our tub has been
180 # started, so we'll have a useful address on our RemoteReference, so
181 # that the introducer's status page will show us.
182 d = self.when_tub_ready()
183 def _start_introducer_client(res):
184 ic.setServiceParent(self)
185 d.addCallback(_start_introducer_client)
186 d.addErrback(log.err, facility="tahoe.init",
187 level=log.BAD, umid="URyI5w")
189 def init_stats_provider(self):
190 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
191 self.stats_provider = StatsProvider(self, gatherer_furl)
192 self.add_service(self.stats_provider)
193 self.stats_provider.register_producer(self)
196 return { 'node.uptime': time.time() - self.started_timestamp }
198 def init_secrets(self):
199 lease_s = self.get_or_create_private_config("secret", _make_secret)
200 lease_secret = base32.a2b(lease_s)
201 convergence_s = self.get_or_create_private_config('convergence',
203 self.convergence = base32.a2b(convergence_s)
204 self._secret_holder = SecretHolder(lease_secret, self.convergence)
206 def init_storage(self):
207 # should we run a storage server (and publish it for others to use)?
208 if not self.get_config("storage", "enabled", True, boolean=True):
210 readonly = self.get_config("storage", "readonly", False, boolean=True)
212 storedir = os.path.join(self.basedir, self.STOREDIR)
214 data = self.get_config("storage", "reserved_space", None)
217 reserved = parse_abbreviated_size(data)
219 log.msg("[storage]reserved_space= contains unparseable value %s"
223 discard = self.get_config("storage", "debug_discard", False,
226 expire = self.get_config("storage", "expire.enabled", False, boolean=True)
228 mode = self.get_config("storage", "expire.mode") # require a mode
230 mode = self.get_config("storage", "expire.mode", "age")
232 o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
233 if o_l_d is not None:
234 o_l_d = parse_duration(o_l_d)
237 if mode == "cutoff-date":
238 cutoff_date = self.get_config("storage", "expire.cutoff_date")
239 cutoff_date = parse_date(cutoff_date)
242 if self.get_config("storage", "expire.immutable", True, boolean=True):
243 sharetypes.append("immutable")
244 if self.get_config("storage", "expire.mutable", True, boolean=True):
245 sharetypes.append("mutable")
246 expiration_sharetypes = tuple(sharetypes)
248 ss = StorageServer(storedir, self.nodeid,
249 reserved_space=reserved,
250 discard_storage=discard,
251 readonly_storage=readonly,
252 stats_provider=self.stats_provider,
253 expiration_enabled=expire,
254 expiration_mode=mode,
255 expiration_override_lease_duration=o_l_d,
256 expiration_cutoff_date=cutoff_date,
257 expiration_sharetypes=expiration_sharetypes)
260 d = self.when_tub_ready()
261 # we can't do registerReference until the Tub is ready
263 furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
264 furl = self.tub.registerReference(ss, furlFile=furl_file)
265 ri_name = RIStorageServer.__remote_name__
266 self.introducer_client.publish(furl, "storage", ri_name)
267 d.addCallback(_publish)
268 d.addErrback(log.err, facility="tahoe.init",
269 level=log.BAD, umid="aLGBKw")
271 def init_client(self):
272 helper_furl = self.get_config("client", "helper.furl", None)
273 DEP = self.DEFAULT_ENCODING_PARAMETERS
274 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
275 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
276 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
278 self.init_client_storage_broker()
279 self.history = History(self.stats_provider)
280 self.terminator = Terminator()
281 self.terminator.setServiceParent(self)
282 self.add_service(Uploader(helper_furl, self.stats_provider,
284 self.init_stub_client()
285 self.init_blacklist()
286 self.init_nodemaker()
288 def init_client_storage_broker(self):
289 # create a StorageFarmBroker object, for use by Uploader/Downloader
290 # (and everybody else who wants to use storage servers)
291 sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
292 self.storage_broker = sb
294 # load static server specifications from tahoe.cfg, if any.
295 # Not quite ready yet.
296 #if self.config.has_section("client-server-selection"):
297 # server_params = {} # maps serverid to dict of parameters
298 # for (name, value) in self.config.items("client-server-selection"):
299 # pieces = name.split(".")
300 # if pieces[0] == "server":
301 # serverid = pieces[1]
302 # if serverid not in server_params:
303 # server_params[serverid] = {}
304 # server_params[serverid][pieces[2]] = value
305 # for serverid, params in server_params.items():
306 # server_type = params.pop("type")
307 # if server_type == "tahoe-foolscap":
308 # s = storage_client.NativeStorageClient(*params)
310 # msg = ("unrecognized server type '%s' in "
311 # "tahoe.cfg [client-server-selection]server.%s.type"
312 # % (server_type, serverid))
313 # raise storage_client.UnknownServerTypeError(msg)
314 # sb.add_server(s.serverid, s)
316 # check to see if we're supposed to use the introducer too
317 if self.get_config("client-server-selection", "use_introducer",
318 default=True, boolean=True):
319 sb.use_introducer(self.introducer_client)
321 def get_storage_broker(self):
322 return self.storage_broker
324 def init_stub_client(self):
326 # we publish an empty object so that the introducer can count how
327 # many clients are connected and see what versions they're
330 furl = self.tub.registerReference(sc)
331 ri_name = RIStubClient.__remote_name__
332 self.introducer_client.publish(furl, "stub_client", ri_name)
333 d = self.when_tub_ready()
334 d.addCallback(_publish)
335 d.addErrback(log.err, facility="tahoe.init",
336 level=log.BAD, umid="OEHq3g")
338 def init_blacklist(self):
339 fn = os.path.join(self.basedir, "access.blacklist")
340 self.blacklist = Blacklist(fn)
342 def init_nodemaker(self):
343 default = self.get_config("client", "mutable.format", default="SDMF")
344 if default.upper() == "MDMF":
345 self.mutable_file_default = MDMF_VERSION
347 self.mutable_file_default = SDMF_VERSION
348 self.nodemaker = NodeMaker(self.storage_broker,
351 self.getServiceNamed("uploader"),
353 self.get_encoding_parameters(),
354 self.mutable_file_default,
358 def get_history(self):
361 def init_control(self):
362 d = self.when_tub_ready()
365 c.setServiceParent(self)
366 control_url = self.tub.registerReference(c)
367 self.write_private_config("control.furl", control_url + "\n")
368 d.addCallback(_publish)
369 d.addErrback(log.err, facility="tahoe.init",
370 level=log.BAD, umid="d3tNXA")
372 def init_helper(self):
373 d = self.when_tub_ready()
375 self.helper = Helper(os.path.join(self.basedir, "helper"),
376 self.storage_broker, self._secret_holder,
377 self.stats_provider, self.history)
378 # TODO: this is confusing. BASEDIR/private/helper.furl is created
379 # by the helper. BASEDIR/helper.furl is consumed by the client
380 # who wants to use the helper. I like having the filename be the
381 # same, since that makes 'cp' work smoothly, but the difference
382 # between config inputs and generated outputs is hard to see.
383 helper_furlfile = os.path.join(self.basedir,
384 "private", "helper.furl").encode(get_filesystem_encoding())
385 self.tub.registerReference(self.helper, furlFile=helper_furlfile)
386 d.addCallback(_publish)
387 d.addErrback(log.err, facility="tahoe.init",
388 level=log.BAD, umid="K0mW5w")
390 def init_key_gen(self, key_gen_furl):
391 d = self.when_tub_ready()
392 def _subscribe(self):
393 self.tub.connectTo(key_gen_furl, self._got_key_generator)
394 d.addCallback(_subscribe)
395 d.addErrback(log.err, facility="tahoe.init",
396 level=log.BAD, umid="z9DMzw")
398 def _got_key_generator(self, key_generator):
399 self._key_generator.set_remote_generator(key_generator)
400 key_generator.notifyOnDisconnect(self._lost_key_generator)
402 def _lost_key_generator(self):
403 self._key_generator.set_remote_generator(None)
405 def set_default_mutable_keysize(self, keysize):
406 self._key_generator.set_default_keysize(keysize)
408 def init_web(self, webport):
409 self.log("init_web(webport=%s)", args=(webport,))
411 from allmydata.webish import WebishServer
412 nodeurl_path = os.path.join(self.basedir, "node.url")
413 staticdir = self.get_config("node", "web.static", "public_html")
414 staticdir = os.path.expanduser(staticdir)
415 ws = WebishServer(self, webport, nodeurl_path, staticdir)
418 def init_ftp_server(self):
419 if self.get_config("ftpd", "enabled", False, boolean=True):
420 accountfile = self.get_config("ftpd", "accounts.file", None)
421 accounturl = self.get_config("ftpd", "accounts.url", None)
422 ftp_portstr = self.get_config("ftpd", "port", "8021")
424 from allmydata.frontends import ftpd
425 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
426 s.setServiceParent(self)
428 def init_sftp_server(self):
429 if self.get_config("sftpd", "enabled", False, boolean=True):
430 accountfile = self.get_config("sftpd", "accounts.file", None)
431 accounturl = self.get_config("sftpd", "accounts.url", None)
432 sftp_portstr = self.get_config("sftpd", "port", "8022")
433 pubkey_file = self.get_config("sftpd", "host_pubkey_file")
434 privkey_file = self.get_config("sftpd", "host_privkey_file")
436 from allmydata.frontends import sftpd
437 s = sftpd.SFTPServer(self, accountfile, accounturl,
438 sftp_portstr, pubkey_file, privkey_file)
439 s.setServiceParent(self)
441 def init_drop_uploader(self):
442 if self.get_config("drop_upload", "enabled", False, boolean=True):
443 if self.get_config("drop_upload", "upload.dircap", None):
444 raise OldConfigOptionError("The [drop_upload]upload.dircap option is no longer supported; please "
445 "put the cap in a 'private/drop_upload_dircap' file, and delete this option.")
447 upload_dircap = self.get_or_create_private_config("drop_upload_dircap")
448 local_dir_utf8 = self.get_config("drop_upload", "local.directory")
451 from allmydata.frontends import drop_upload
452 s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
453 s.setServiceParent(self)
456 self.log("couldn't start drop-uploader: %r", args=(e,))
458 def _check_hotline(self, hotline_file):
459 if os.path.exists(hotline_file):
460 mtime = os.stat(hotline_file)[stat.ST_MTIME]
461 if mtime > time.time() - 120.0:
464 self.log("hotline file too old, shutting down")
466 self.log("hotline file missing, shutting down")
469 def get_encoding_parameters(self):
470 return self.DEFAULT_ENCODING_PARAMETERS
472 def connected_to_introducer(self):
473 if self.introducer_client:
474 return self.introducer_client.connected_to_introducer()
477 def get_renewal_secret(self): # this will go away
478 return self._secret_holder.get_renewal_secret()
480 def get_cancel_secret(self):
481 return self._secret_holder.get_cancel_secret()
483 def debug_wait_for_client_connections(self, num_clients):
484 """Return a Deferred that fires (with None) when we have connections
485 to the given number of peers. Useful for tests that set up a
486 temporary test network and need to know when it is safe to proceed
487 with an upload or download."""
489 return len(self.storage_broker.get_connected_servers()) >= num_clients
490 d = self.poll(_check, 0.5)
491 d.addCallback(lambda res: None)
495 # these four methods are the primitives for creating filenodes and
496 # dirnodes. The first takes a URI and produces a filenode or (new-style)
497 # dirnode. The other three create brand-new filenodes/dirnodes.
499 def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
500 # This returns synchronously.
501 # Note that it does *not* validate the write_uri and read_uri; instead we
502 # may get an opaque node if there were any problems.
503 return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
505 def create_dirnode(self, initial_children={}, version=None):
506 d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
509 def create_immutable_dirnode(self, children, convergence=None):
510 return self.nodemaker.create_immutable_directory(children, convergence)
512 def create_mutable_file(self, contents=None, keysize=None, version=None):
513 return self.nodemaker.create_mutable_file(contents, keysize,
516 def upload(self, uploadable):
517 uploader = self.getServiceNamed("uploader")
518 return uploader.upload(uploadable)