1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
5 from zope.interface import implements
6 from twisted.internet import reactor, defer
7 from twisted.application import service
8 from twisted.application.internet import TimerService
9 from foolscap.api import Referenceable
10 from pycryptopp.publickey import rsa
13 from allmydata.storage.server import StorageServer
14 from allmydata import storage_client
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.offloaded import Helper
17 from allmydata.control import ControlServer
18 from allmydata.introducer.client import IntroducerClient
19 from allmydata.util import hashutil, base32, pollmixin, log
20 from allmydata.util.encodingutil import get_filesystem_encoding
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.stats import StatsProvider
24 from allmydata.history import History
25 from allmydata.interfaces import IStatsProducer, RIStubClient, \
26 SDMF_VERSION, MDMF_VERSION
27 from allmydata.nodemaker import NodeMaker
28 from allmydata.blacklist import Blacklist
37 class StubClient(Referenceable):
38 implements(RIStubClient)
41 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
44 def __init__(self, lease_secret, convergence_secret):
45 self._lease_secret = lease_secret
46 self._convergence_secret = convergence_secret
48 def get_renewal_secret(self):
49 return hashutil.my_renewal_secret_hash(self._lease_secret)
51 def get_cancel_secret(self):
52 return hashutil.my_cancel_secret_hash(self._lease_secret)
54 def get_convergence_secret(self):
55 return self._convergence_secret
58 """I create RSA keys for mutable files. Each call to generate() returns a
59 single keypair. The keysize is specified first by the keysize= argument
60 to generate(), then with a default set by set_default_keysize(), then
61 with a built-in default of 2048 bits."""
64 self.default_keysize = 2048
66 def set_remote_generator(self, keygen):
68 def set_default_keysize(self, keysize):
69 """Call this to override the size of the RSA keys created for new
70 mutable files which don't otherwise specify a size. This will affect
71 all subsequent calls to generate() without a keysize= argument. The
72 default size is 2048 bits. Test cases should call this method once
73 during setup, to cause me to create smaller keys, so the unit tests
75 self.default_keysize = keysize
77 def generate(self, keysize=None):
78 """I return a Deferred that fires with a (verifyingkey, signingkey)
79 pair. I accept a keysize in bits (2048 bit keys are standard, smaller
80 keys are used for testing). If you do not provide a keysize, I will
81 use my default, which is set by a call to set_default_keysize(). If
82 set_default_keysize() has never been called, I will create 2048 bit
84 keysize = keysize or self.default_keysize
86 d = self._remote.callRemote('get_rsa_key_pair', keysize)
87 def make_key_objs((verifying_key, signing_key)):
88 v = rsa.create_verifying_key_from_string(verifying_key)
89 s = rsa.create_signing_key_from_string(signing_key)
91 d.addCallback(make_key_objs)
94 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
96 signer = rsa.generate(keysize)
97 verifier = signer.get_verifying_key()
98 return defer.succeed( (verifier, signer) )
100 class Terminator(service.Service):
102 self._clients = weakref.WeakKeyDictionary()
103 def register(self, c):
104 self._clients[c] = None
105 def stopService(self):
106 for c in self._clients:
108 return service.Service.stopService(self)
111 class Client(node.Node, pollmixin.PollMixin):
112 implements(IStatsProducer)
114 PORTNUMFILE = "client.port"
117 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
119 # This means that if a storage server treats me as though I were a
120 # 1.0.0 storage client, it will work as they expect.
121 OLDEST_SUPPORTED_VERSION = "1.0.0"
123 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
124 # is the number of shares required to reconstruct a file. 'desired' means
125 # that we will abort an upload unless we can allocate space for at least
126 # this many. 'total' is the total number of shares created by encoding.
127 # If everybody has room then this is is how many we will upload.
128 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
131 "max_segment_size": 128*KiB,
134 def __init__(self, basedir="."):
135 node.Node.__init__(self, basedir)
136 self.started_timestamp = time.time()
137 self.logSource="Client"
138 self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
139 self.init_introducer_client()
140 self.init_stats_provider()
145 if self.get_config("helper", "enabled", False, boolean=True):
147 self._key_generator = KeyGenerator()
148 key_gen_furl = self.get_config("client", "key_generator.furl", None)
150 self.init_key_gen(key_gen_furl)
152 # ControlServer and Helper are attached after Tub startup
153 self.init_ftp_server()
154 self.init_sftp_server()
155 self.init_drop_uploader()
157 hotline_file = os.path.join(self.basedir,
158 self.SUICIDE_PREVENTION_HOTLINE_FILE)
159 if os.path.exists(hotline_file):
160 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
161 self.log("hotline file noticed (%ds old), starting timer" % age)
162 hotline = TimerService(1.0, self._check_hotline, hotline_file)
163 hotline.setServiceParent(self)
165 # this needs to happen last, so it can use getServiceNamed() to
166 # acquire references to StorageServer and other web-statusable things
167 webport = self.get_config("node", "web.port", None)
169 self.init_web(webport) # strports string
171 def init_introducer_client(self):
172 self.introducer_furl = self.get_config("client", "introducer.furl")
173 ic = IntroducerClient(self.tub, self.introducer_furl,
175 str(allmydata.__full_version__),
176 str(self.OLDEST_SUPPORTED_VERSION))
177 self.introducer_client = ic
178 # hold off on starting the IntroducerClient until our tub has been
179 # started, so we'll have a useful address on our RemoteReference, so
180 # that the introducer's status page will show us.
181 d = self.when_tub_ready()
182 def _start_introducer_client(res):
183 ic.setServiceParent(self)
184 d.addCallback(_start_introducer_client)
185 d.addErrback(log.err, facility="tahoe.init",
186 level=log.BAD, umid="URyI5w")
188 def init_stats_provider(self):
189 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
190 self.stats_provider = StatsProvider(self, gatherer_furl)
191 self.add_service(self.stats_provider)
192 self.stats_provider.register_producer(self)
195 return { 'node.uptime': time.time() - self.started_timestamp }
197 def init_secrets(self):
198 lease_s = self.get_or_create_private_config("secret", _make_secret)
199 lease_secret = base32.a2b(lease_s)
200 convergence_s = self.get_or_create_private_config('convergence',
202 self.convergence = base32.a2b(convergence_s)
203 self._secret_holder = SecretHolder(lease_secret, self.convergence)
205 def init_storage(self):
206 # should we run a storage server (and publish it for others to use)?
207 if not self.get_config("storage", "enabled", True, boolean=True):
209 readonly = self.get_config("storage", "readonly", False, boolean=True)
211 storedir = os.path.join(self.basedir, self.STOREDIR)
213 data = self.get_config("storage", "reserved_space", None)
216 reserved = parse_abbreviated_size(data)
218 log.msg("[storage]reserved_space= contains unparseable value %s"
222 discard = self.get_config("storage", "debug_discard", False,
225 expire = self.get_config("storage", "expire.enabled", False, boolean=True)
227 mode = self.get_config("storage", "expire.mode") # require a mode
229 mode = self.get_config("storage", "expire.mode", "age")
231 o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
232 if o_l_d is not None:
233 o_l_d = parse_duration(o_l_d)
236 if mode == "cutoff-date":
237 cutoff_date = self.get_config("storage", "expire.cutoff_date")
238 cutoff_date = parse_date(cutoff_date)
241 if self.get_config("storage", "expire.immutable", True, boolean=True):
242 sharetypes.append("immutable")
243 if self.get_config("storage", "expire.mutable", True, boolean=True):
244 sharetypes.append("mutable")
245 expiration_sharetypes = tuple(sharetypes)
247 ss = StorageServer(storedir, self.nodeid,
248 reserved_space=reserved,
249 discard_storage=discard,
250 readonly_storage=readonly,
251 stats_provider=self.stats_provider,
252 expiration_enabled=expire,
253 expiration_mode=mode,
254 expiration_override_lease_duration=o_l_d,
255 expiration_cutoff_date=cutoff_date,
256 expiration_sharetypes=expiration_sharetypes)
259 d = self.when_tub_ready()
260 # we can't do registerReference until the Tub is ready
262 furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
263 furl = self.tub.registerReference(ss, furlFile=furl_file)
264 ri_name = RIStorageServer.__remote_name__
265 self.introducer_client.publish(furl, "storage", ri_name)
266 d.addCallback(_publish)
267 d.addErrback(log.err, facility="tahoe.init",
268 level=log.BAD, umid="aLGBKw")
270 def init_client(self):
271 helper_furl = self.get_config("client", "helper.furl", None)
272 DEP = self.DEFAULT_ENCODING_PARAMETERS
273 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
274 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
275 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
277 self.init_client_storage_broker()
278 self.history = History(self.stats_provider)
279 self.terminator = Terminator()
280 self.terminator.setServiceParent(self)
281 self.add_service(Uploader(helper_furl, self.stats_provider,
283 self.init_stub_client()
284 self.init_blacklist()
285 self.init_nodemaker()
287 def init_client_storage_broker(self):
288 # create a StorageFarmBroker object, for use by Uploader/Downloader
289 # (and everybody else who wants to use storage servers)
290 sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
291 self.storage_broker = sb
293 # load static server specifications from tahoe.cfg, if any.
294 # Not quite ready yet.
295 #if self.config.has_section("client-server-selection"):
296 # server_params = {} # maps serverid to dict of parameters
297 # for (name, value) in self.config.items("client-server-selection"):
298 # pieces = name.split(".")
299 # if pieces[0] == "server":
300 # serverid = pieces[1]
301 # if serverid not in server_params:
302 # server_params[serverid] = {}
303 # server_params[serverid][pieces[2]] = value
304 # for serverid, params in server_params.items():
305 # server_type = params.pop("type")
306 # if server_type == "tahoe-foolscap":
307 # s = storage_client.NativeStorageClient(*params)
309 # msg = ("unrecognized server type '%s' in "
310 # "tahoe.cfg [client-server-selection]server.%s.type"
311 # % (server_type, serverid))
312 # raise storage_client.UnknownServerTypeError(msg)
313 # sb.add_server(s.serverid, s)
315 # check to see if we're supposed to use the introducer too
316 if self.get_config("client-server-selection", "use_introducer",
317 default=True, boolean=True):
318 sb.use_introducer(self.introducer_client)
320 def get_storage_broker(self):
321 return self.storage_broker
323 def init_stub_client(self):
325 # we publish an empty object so that the introducer can count how
326 # many clients are connected and see what versions they're
329 furl = self.tub.registerReference(sc)
330 ri_name = RIStubClient.__remote_name__
331 self.introducer_client.publish(furl, "stub_client", ri_name)
332 d = self.when_tub_ready()
333 d.addCallback(_publish)
334 d.addErrback(log.err, facility="tahoe.init",
335 level=log.BAD, umid="OEHq3g")
337 def init_blacklist(self):
338 fn = os.path.join(self.basedir, "access.blacklist")
339 self.blacklist = Blacklist(fn)
341 def init_nodemaker(self):
342 default = self.get_config("client", "mutable.format", default="SDMF")
343 if default.upper() == "MDMF":
344 self.mutable_file_default = MDMF_VERSION
346 self.mutable_file_default = SDMF_VERSION
347 self.nodemaker = NodeMaker(self.storage_broker,
350 self.getServiceNamed("uploader"),
352 self.get_encoding_parameters(),
353 self.mutable_file_default,
357 def get_history(self):
360 def init_control(self):
361 d = self.when_tub_ready()
364 c.setServiceParent(self)
365 control_url = self.tub.registerReference(c)
366 self.write_private_config("control.furl", control_url + "\n")
367 d.addCallback(_publish)
368 d.addErrback(log.err, facility="tahoe.init",
369 level=log.BAD, umid="d3tNXA")
371 def init_helper(self):
372 d = self.when_tub_ready()
374 self.helper = Helper(os.path.join(self.basedir, "helper"),
375 self.storage_broker, self._secret_holder,
376 self.stats_provider, self.history)
377 # TODO: this is confusing. BASEDIR/private/helper.furl is created
378 # by the helper. BASEDIR/helper.furl is consumed by the client
379 # who wants to use the helper. I like having the filename be the
380 # same, since that makes 'cp' work smoothly, but the difference
381 # between config inputs and generated outputs is hard to see.
382 helper_furlfile = os.path.join(self.basedir,
383 "private", "helper.furl").encode(get_filesystem_encoding())
384 self.tub.registerReference(self.helper, furlFile=helper_furlfile)
385 d.addCallback(_publish)
386 d.addErrback(log.err, facility="tahoe.init",
387 level=log.BAD, umid="K0mW5w")
389 def init_key_gen(self, key_gen_furl):
390 d = self.when_tub_ready()
391 def _subscribe(self):
392 self.tub.connectTo(key_gen_furl, self._got_key_generator)
393 d.addCallback(_subscribe)
394 d.addErrback(log.err, facility="tahoe.init",
395 level=log.BAD, umid="z9DMzw")
397 def _got_key_generator(self, key_generator):
398 self._key_generator.set_remote_generator(key_generator)
399 key_generator.notifyOnDisconnect(self._lost_key_generator)
401 def _lost_key_generator(self):
402 self._key_generator.set_remote_generator(None)
404 def set_default_mutable_keysize(self, keysize):
405 self._key_generator.set_default_keysize(keysize)
407 def init_web(self, webport):
408 self.log("init_web(webport=%s)", args=(webport,))
410 from allmydata.webish import WebishServer
411 nodeurl_path = os.path.join(self.basedir, "node.url")
412 staticdir = self.get_config("node", "web.static", "public_html")
413 staticdir = os.path.expanduser(staticdir)
414 ws = WebishServer(self, webport, nodeurl_path, staticdir)
417 def init_ftp_server(self):
418 if self.get_config("ftpd", "enabled", False, boolean=True):
419 accountfile = self.get_config("ftpd", "accounts.file", None)
420 accounturl = self.get_config("ftpd", "accounts.url", None)
421 ftp_portstr = self.get_config("ftpd", "port", "8021")
423 from allmydata.frontends import ftpd
424 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
425 s.setServiceParent(self)
427 def init_sftp_server(self):
428 if self.get_config("sftpd", "enabled", False, boolean=True):
429 accountfile = self.get_config("sftpd", "accounts.file", None)
430 accounturl = self.get_config("sftpd", "accounts.url", None)
431 sftp_portstr = self.get_config("sftpd", "port", "8022")
432 pubkey_file = self.get_config("sftpd", "host_pubkey_file")
433 privkey_file = self.get_config("sftpd", "host_privkey_file")
435 from allmydata.frontends import sftpd
436 s = sftpd.SFTPServer(self, accountfile, accounturl,
437 sftp_portstr, pubkey_file, privkey_file)
438 s.setServiceParent(self)
440 def init_drop_uploader(self):
441 if self.get_config("drop_upload", "enabled", False, boolean=True):
442 upload_dircap = self.get_config("drop_upload", "upload.dircap", None)
443 local_dir_utf8 = self.get_config("drop_upload", "local.directory", None)
445 if upload_dircap and local_dir_utf8:
447 from allmydata.frontends import drop_upload
448 s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
449 s.setServiceParent(self)
452 self.log("couldn't start drop-uploader: %r", args=(e,))
454 self.log("couldn't start drop-uploader: upload.dircap or local.directory not specified")
456 def _check_hotline(self, hotline_file):
457 if os.path.exists(hotline_file):
458 mtime = os.stat(hotline_file)[stat.ST_MTIME]
459 if mtime > time.time() - 120.0:
462 self.log("hotline file too old, shutting down")
464 self.log("hotline file missing, shutting down")
467 def get_encoding_parameters(self):
468 return self.DEFAULT_ENCODING_PARAMETERS
470 def connected_to_introducer(self):
471 if self.introducer_client:
472 return self.introducer_client.connected_to_introducer()
475 def get_renewal_secret(self): # this will go away
476 return self._secret_holder.get_renewal_secret()
478 def get_cancel_secret(self):
479 return self._secret_holder.get_cancel_secret()
481 def debug_wait_for_client_connections(self, num_clients):
482 """Return a Deferred that fires (with None) when we have connections
483 to the given number of peers. Useful for tests that set up a
484 temporary test network and need to know when it is safe to proceed
485 with an upload or download."""
487 return len(self.storage_broker.get_connected_servers()) >= num_clients
488 d = self.poll(_check, 0.5)
489 d.addCallback(lambda res: None)
493 # these four methods are the primitives for creating filenodes and
494 # dirnodes. The first takes a URI and produces a filenode or (new-style)
495 # dirnode. The other three create brand-new filenodes/dirnodes.
497 def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
498 # This returns synchronously.
499 # Note that it does *not* validate the write_uri and read_uri; instead we
500 # may get an opaque node if there were any problems.
501 return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
503 def create_dirnode(self, initial_children={}, version=None):
504 d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
507 def create_immutable_dirnode(self, children, convergence=None):
508 return self.nodemaker.create_immutable_directory(children, convergence)
510 def create_mutable_file(self, contents=None, keysize=None, version=None):
511 return self.nodemaker.create_mutable_file(contents, keysize,
514 def upload(self, uploadable):
515 uploader = self.getServiceNamed("uploader")
516 return uploader.upload(uploadable)