1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
5 from zope.interface import implements
6 from twisted.internet import reactor, defer
7 from twisted.application import service
8 from twisted.application.internet import TimerService
9 from foolscap.api import Referenceable
10 from pycryptopp.publickey import rsa
13 from allmydata.storage.server import StorageServer
14 from allmydata import storage_client
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.offloaded import Helper
17 from allmydata.control import ControlServer
18 from allmydata.introducer.client import IntroducerClient
19 from allmydata.util import hashutil, base32, pollmixin, log
20 from allmydata.util.encodingutil import get_filesystem_encoding
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.stats import StatsProvider
24 from allmydata.history import History
25 from allmydata.interfaces import IStatsProducer, RIStubClient, \
26 SDMF_VERSION, MDMF_VERSION
27 from allmydata.nodemaker import NodeMaker
28 from allmydata.blacklist import Blacklist
37 class StubClient(Referenceable):
38 implements(RIStubClient)
41 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
44 def __init__(self, lease_secret, convergence_secret):
45 self._lease_secret = lease_secret
46 self._convergence_secret = convergence_secret
48 def get_renewal_secret(self):
49 return hashutil.my_renewal_secret_hash(self._lease_secret)
51 def get_cancel_secret(self):
52 return hashutil.my_cancel_secret_hash(self._lease_secret)
54 def get_convergence_secret(self):
55 return self._convergence_secret
58 """I create RSA keys for mutable files. Each call to generate() returns a
59 single keypair. The keysize is specified first by the keysize= argument
60 to generate(), then with a default set by set_default_keysize(), then
61 with a built-in default of 2048 bits."""
64 self.default_keysize = 2048
66 def set_remote_generator(self, keygen):
68 def set_default_keysize(self, keysize):
69 """Call this to override the size of the RSA keys created for new
70 mutable files which don't otherwise specify a size. This will affect
71 all subsequent calls to generate() without a keysize= argument. The
72 default size is 2048 bits. Test cases should call this method once
73 during setup, to cause me to create smaller keys, so the unit tests
75 self.default_keysize = keysize
77 def generate(self, keysize=None):
78 """I return a Deferred that fires with a (verifyingkey, signingkey)
79 pair. I accept a keysize in bits (2048 bit keys are standard, smaller
80 keys are used for testing). If you do not provide a keysize, I will
81 use my default, which is set by a call to set_default_keysize(). If
82 set_default_keysize() has never been called, I will create 2048 bit
84 keysize = keysize or self.default_keysize
86 d = self._remote.callRemote('get_rsa_key_pair', keysize)
87 def make_key_objs((verifying_key, signing_key)):
88 v = rsa.create_verifying_key_from_string(verifying_key)
89 s = rsa.create_signing_key_from_string(signing_key)
91 d.addCallback(make_key_objs)
94 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
96 signer = rsa.generate(keysize)
97 verifier = signer.get_verifying_key()
98 return defer.succeed( (verifier, signer) )
100 class Terminator(service.Service):
102 self._clients = weakref.WeakKeyDictionary()
103 def register(self, c):
104 self._clients[c] = None
105 def stopService(self):
106 for c in self._clients:
108 return service.Service.stopService(self)
111 class Client(node.Node, pollmixin.PollMixin):
112 implements(IStatsProducer)
114 PORTNUMFILE = "client.port"
117 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
119 # This means that if a storage server treats me as though I were a
120 # 1.0.0 storage client, it will work as they expect.
121 OLDEST_SUPPORTED_VERSION = "1.0.0"
123 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
124 # is the number of shares required to reconstruct a file. 'desired' means
125 # that we will abort an upload unless we can allocate space for at least
126 # this many. 'total' is the total number of shares created by encoding.
127 # If everybody has room then this is is how many we will upload.
128 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
131 "max_segment_size": 128*KiB,
134 def __init__(self, basedir="."):
135 node.Node.__init__(self, basedir)
136 self.started_timestamp = time.time()
137 self.logSource="Client"
138 self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
139 self.init_introducer_client()
140 self.init_stats_provider()
145 if self.get_config("helper", "enabled", False, boolean=True):
147 self._key_generator = KeyGenerator()
148 key_gen_furl = self.get_config("client", "key_generator.furl", None)
150 self.init_key_gen(key_gen_furl)
152 # ControlServer and Helper are attached after Tub startup
153 self.init_ftp_server()
154 self.init_sftp_server()
155 self.init_drop_uploader()
157 hotline_file = os.path.join(self.basedir,
158 self.SUICIDE_PREVENTION_HOTLINE_FILE)
159 if os.path.exists(hotline_file):
160 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
161 self.log("hotline file noticed (%ds old), starting timer" % age)
162 hotline = TimerService(1.0, self._check_hotline, hotline_file)
163 hotline.setServiceParent(self)
165 # this needs to happen last, so it can use getServiceNamed() to
166 # acquire references to StorageServer and other web-statusable things
167 webport = self.get_config("node", "web.port", None)
169 self.init_web(webport) # strports string
171 def init_introducer_client(self):
172 self.introducer_furl = self.get_config("client", "introducer.furl")
173 ic = IntroducerClient(self.tub, self.introducer_furl,
175 str(allmydata.__full_version__),
176 str(self.OLDEST_SUPPORTED_VERSION))
177 self.introducer_client = ic
178 # hold off on starting the IntroducerClient until our tub has been
179 # started, so we'll have a useful address on our RemoteReference, so
180 # that the introducer's status page will show us.
181 d = self.when_tub_ready()
182 def _start_introducer_client(res):
183 ic.setServiceParent(self)
184 d.addCallback(_start_introducer_client)
185 d.addErrback(log.err, facility="tahoe.init",
186 level=log.BAD, umid="URyI5w")
188 def init_stats_provider(self):
189 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
190 self.stats_provider = StatsProvider(self, gatherer_furl)
191 self.add_service(self.stats_provider)
192 self.stats_provider.register_producer(self)
195 return { 'node.uptime': time.time() - self.started_timestamp }
197 def init_secrets(self):
198 lease_s = self.get_or_create_private_config("secret", _make_secret)
199 lease_secret = base32.a2b(lease_s)
200 convergence_s = self.get_or_create_private_config('convergence',
202 self.convergence = base32.a2b(convergence_s)
203 self._secret_holder = SecretHolder(lease_secret, self.convergence)
205 def init_storage(self):
206 # should we run a storage server (and publish it for others to use)?
207 if not self.get_config("storage", "enabled", True, boolean=True):
209 readonly = self.get_config("storage", "readonly", False, boolean=True)
211 storedir = os.path.join(self.basedir, self.STOREDIR)
213 data = self.get_config("storage", "reserved_space", None)
216 reserved = parse_abbreviated_size(data)
218 log.msg("[storage]reserved_space= contains unparseable value %s"
222 discard = self.get_config("storage", "debug_discard", False,
225 expire = self.get_config("storage", "expire.enabled", False, boolean=True)
227 mode = self.get_config("storage", "expire.mode") # require a mode
229 mode = self.get_config("storage", "expire.mode", "age")
231 o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
232 if o_l_d is not None:
233 o_l_d = parse_duration(o_l_d)
236 if mode == "cutoff-date":
237 cutoff_date = self.get_config("storage", "expire.cutoff_date")
238 cutoff_date = parse_date(cutoff_date)
241 if self.get_config("storage", "expire.immutable", True, boolean=True):
242 sharetypes.append("immutable")
243 if self.get_config("storage", "expire.mutable", True, boolean=True):
244 sharetypes.append("mutable")
245 expiration_sharetypes = tuple(sharetypes)
247 ss = StorageServer(storedir, self.nodeid,
248 reserved_space=reserved,
249 discard_storage=discard,
250 readonly_storage=readonly,
251 stats_provider=self.stats_provider,
252 expiration_enabled=expire,
253 expiration_mode=mode,
254 expiration_override_lease_duration=o_l_d,
255 expiration_cutoff_date=cutoff_date,
256 expiration_sharetypes=expiration_sharetypes)
259 d = self.when_tub_ready()
260 # we can't do registerReference until the Tub is ready
262 furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
263 furl = self.tub.registerReference(ss, furlFile=furl_file)
264 ri_name = RIStorageServer.__remote_name__
265 self.introducer_client.publish(furl, "storage", ri_name)
266 d.addCallback(_publish)
267 d.addErrback(log.err, facility="tahoe.init",
268 level=log.BAD, umid="aLGBKw")
270 def init_client(self):
271 helper_furl = self.get_config("client", "helper.furl", None)
272 DEP = self.DEFAULT_ENCODING_PARAMETERS
273 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
274 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
275 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
277 self.init_client_storage_broker()
278 self.history = History(self.stats_provider)
279 self.terminator = Terminator()
280 self.terminator.setServiceParent(self)
281 self.add_service(Uploader(helper_furl, self.stats_provider,
283 self.init_stub_client()
284 self.init_blacklist()
285 self.init_nodemaker()
287 def init_client_storage_broker(self):
288 # create a StorageFarmBroker object, for use by Uploader/Downloader
289 # (and everybody else who wants to use storage servers)
290 sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
291 self.storage_broker = sb
293 # load static server specifications from tahoe.cfg, if any.
294 # Not quite ready yet.
295 #if self.config.has_section("client-server-selection"):
296 # server_params = {} # maps serverid to dict of parameters
297 # for (name, value) in self.config.items("client-server-selection"):
298 # pieces = name.split(".")
299 # if pieces[0] == "server":
300 # serverid = pieces[1]
301 # if serverid not in server_params:
302 # server_params[serverid] = {}
303 # server_params[serverid][pieces[2]] = value
304 # for serverid, params in server_params.items():
305 # server_type = params.pop("type")
306 # if server_type == "tahoe-foolscap":
307 # s = storage_client.NativeStorageClient(*params)
309 # msg = ("unrecognized server type '%s' in "
310 # "tahoe.cfg [client-server-selection]server.%s.type"
311 # % (server_type, serverid))
312 # raise storage_client.UnknownServerTypeError(msg)
313 # sb.add_server(s.serverid, s)
315 # check to see if we're supposed to use the introducer too
316 if self.get_config("client-server-selection", "use_introducer",
317 default=True, boolean=True):
318 sb.use_introducer(self.introducer_client)
320 def get_storage_broker(self):
321 return self.storage_broker
323 def init_stub_client(self):
325 # we publish an empty object so that the introducer can count how
326 # many clients are connected and see what versions they're
329 furl = self.tub.registerReference(sc)
330 ri_name = RIStubClient.__remote_name__
331 self.introducer_client.publish(furl, "stub_client", ri_name)
332 d = self.when_tub_ready()
333 d.addCallback(_publish)
334 d.addErrback(log.err, facility="tahoe.init",
335 level=log.BAD, umid="OEHq3g")
337 def init_blacklist(self):
338 fn = os.path.join(self.basedir, "access.blacklist")
339 self.blacklist = Blacklist(fn)
341 def init_nodemaker(self):
342 self.nodemaker = NodeMaker(self.storage_broker,
345 self.getServiceNamed("uploader"),
347 self.get_encoding_parameters(),
350 default = self.get_config("client", "mutable.format", default="sdmf")
351 if default == "mdmf":
352 self.mutable_file_default = MDMF_VERSION
354 self.mutable_file_default = SDMF_VERSION
356 def get_history(self):
359 def init_control(self):
360 d = self.when_tub_ready()
363 c.setServiceParent(self)
364 control_url = self.tub.registerReference(c)
365 self.write_private_config("control.furl", control_url + "\n")
366 d.addCallback(_publish)
367 d.addErrback(log.err, facility="tahoe.init",
368 level=log.BAD, umid="d3tNXA")
370 def init_helper(self):
371 d = self.when_tub_ready()
373 self.helper = Helper(os.path.join(self.basedir, "helper"),
374 self.storage_broker, self._secret_holder,
375 self.stats_provider, self.history)
376 # TODO: this is confusing. BASEDIR/private/helper.furl is created
377 # by the helper. BASEDIR/helper.furl is consumed by the client
378 # who wants to use the helper. I like having the filename be the
379 # same, since that makes 'cp' work smoothly, but the difference
380 # between config inputs and generated outputs is hard to see.
381 helper_furlfile = os.path.join(self.basedir,
382 "private", "helper.furl").encode(get_filesystem_encoding())
383 self.tub.registerReference(self.helper, furlFile=helper_furlfile)
384 d.addCallback(_publish)
385 d.addErrback(log.err, facility="tahoe.init",
386 level=log.BAD, umid="K0mW5w")
388 def init_key_gen(self, key_gen_furl):
389 d = self.when_tub_ready()
390 def _subscribe(self):
391 self.tub.connectTo(key_gen_furl, self._got_key_generator)
392 d.addCallback(_subscribe)
393 d.addErrback(log.err, facility="tahoe.init",
394 level=log.BAD, umid="z9DMzw")
396 def _got_key_generator(self, key_generator):
397 self._key_generator.set_remote_generator(key_generator)
398 key_generator.notifyOnDisconnect(self._lost_key_generator)
400 def _lost_key_generator(self):
401 self._key_generator.set_remote_generator(None)
403 def set_default_mutable_keysize(self, keysize):
404 self._key_generator.set_default_keysize(keysize)
406 def init_web(self, webport):
407 self.log("init_web(webport=%s)", args=(webport,))
409 from allmydata.webish import WebishServer
410 nodeurl_path = os.path.join(self.basedir, "node.url")
411 staticdir = self.get_config("node", "web.static", "public_html")
412 staticdir = os.path.expanduser(staticdir)
413 ws = WebishServer(self, webport, nodeurl_path, staticdir)
416 def init_ftp_server(self):
417 if self.get_config("ftpd", "enabled", False, boolean=True):
418 accountfile = self.get_config("ftpd", "accounts.file", None)
419 accounturl = self.get_config("ftpd", "accounts.url", None)
420 ftp_portstr = self.get_config("ftpd", "port", "8021")
422 from allmydata.frontends import ftpd
423 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
424 s.setServiceParent(self)
426 def init_sftp_server(self):
427 if self.get_config("sftpd", "enabled", False, boolean=True):
428 accountfile = self.get_config("sftpd", "accounts.file", None)
429 accounturl = self.get_config("sftpd", "accounts.url", None)
430 sftp_portstr = self.get_config("sftpd", "port", "8022")
431 pubkey_file = self.get_config("sftpd", "host_pubkey_file")
432 privkey_file = self.get_config("sftpd", "host_privkey_file")
434 from allmydata.frontends import sftpd
435 s = sftpd.SFTPServer(self, accountfile, accounturl,
436 sftp_portstr, pubkey_file, privkey_file)
437 s.setServiceParent(self)
439 def init_drop_uploader(self):
440 if self.get_config("drop_upload", "enabled", False, boolean=True):
441 upload_dircap = self.get_config("drop_upload", "upload.dircap", None)
442 local_dir_utf8 = self.get_config("drop_upload", "local.directory", None)
444 if upload_dircap and local_dir_utf8:
446 from allmydata.frontends import drop_upload
447 s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
448 s.setServiceParent(self)
451 self.log("couldn't start drop-uploader: %r", args=(e,))
453 self.log("couldn't start drop-uploader: upload.dircap or local.directory not specified")
455 def _check_hotline(self, hotline_file):
456 if os.path.exists(hotline_file):
457 mtime = os.stat(hotline_file)[stat.ST_MTIME]
458 if mtime > time.time() - 120.0:
461 self.log("hotline file too old, shutting down")
463 self.log("hotline file missing, shutting down")
466 def get_encoding_parameters(self):
467 return self.DEFAULT_ENCODING_PARAMETERS
469 def connected_to_introducer(self):
470 if self.introducer_client:
471 return self.introducer_client.connected_to_introducer()
474 def get_renewal_secret(self): # this will go away
475 return self._secret_holder.get_renewal_secret()
477 def get_cancel_secret(self):
478 return self._secret_holder.get_cancel_secret()
480 def debug_wait_for_client_connections(self, num_clients):
481 """Return a Deferred that fires (with None) when we have connections
482 to the given number of peers. Useful for tests that set up a
483 temporary test network and need to know when it is safe to proceed
484 with an upload or download."""
486 return len(self.storage_broker.get_connected_servers()) >= num_clients
487 d = self.poll(_check, 0.5)
488 d.addCallback(lambda res: None)
492 # these four methods are the primitives for creating filenodes and
493 # dirnodes. The first takes a URI and produces a filenode or (new-style)
494 # dirnode. The other three create brand-new filenodes/dirnodes.
496 def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
497 # This returns synchronously.
498 # Note that it does *not* validate the write_uri and read_uri; instead we
499 # may get an opaque node if there were any problems.
500 return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
502 def create_dirnode(self, initial_children={}, version=None):
503 d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
506 def create_immutable_dirnode(self, children, convergence=None):
507 return self.nodemaker.create_immutable_directory(children, convergence)
509 def create_mutable_file(self, contents=None, keysize=None, version=None):
511 version = self.mutable_file_default
512 return self.nodemaker.create_mutable_file(contents, keysize,
515 def upload(self, uploadable):
516 uploader = self.getServiceNamed("uploader")
517 return uploader.upload(uploadable)