1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
5 from zope.interface import implements
6 from twisted.internet import reactor, defer
7 from twisted.application import service
8 from twisted.application.internet import TimerService
9 from foolscap.api import Referenceable
10 from pycryptopp.publickey import rsa
13 from allmydata.storage.server import StorageServer
14 from allmydata import storage_client
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.offloaded import Helper
17 from allmydata.control import ControlServer
18 from allmydata.introducer.client import IntroducerClient
19 from allmydata.util import hashutil, base32, pollmixin, log
20 from allmydata.util.encodingutil import get_filesystem_encoding
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.stats import StatsProvider
24 from allmydata.history import History
25 from allmydata.interfaces import IStatsProducer, RIStubClient, \
26 SDMF_VERSION, MDMF_VERSION
27 from allmydata.nodemaker import NodeMaker
28 from allmydata.blacklist import Blacklist
37 class StubClient(Referenceable):
38 implements(RIStubClient)
41 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
44 def __init__(self, lease_secret, convergence_secret):
45 self._lease_secret = lease_secret
46 self._convergence_secret = convergence_secret
48 def get_renewal_secret(self):
49 return hashutil.my_renewal_secret_hash(self._lease_secret)
51 def get_cancel_secret(self):
52 return hashutil.my_cancel_secret_hash(self._lease_secret)
54 def get_convergence_secret(self):
55 return self._convergence_secret
58 """I create RSA keys for mutable files. Each call to generate() returns a
59 single keypair. The keysize is specified first by the keysize= argument
60 to generate(), then with a default set by set_default_keysize(), then
61 with a built-in default of 2048 bits."""
64 self.default_keysize = 2048
66 def set_remote_generator(self, keygen):
68 def set_default_keysize(self, keysize):
69 """Call this to override the size of the RSA keys created for new
70 mutable files which don't otherwise specify a size. This will affect
71 all subsequent calls to generate() without a keysize= argument. The
72 default size is 2048 bits. Test cases should call this method once
73 during setup, to cause me to create smaller keys, so the unit tests
75 self.default_keysize = keysize
77 def generate(self, keysize=None):
78 """I return a Deferred that fires with a (verifyingkey, signingkey)
79 pair. I accept a keysize in bits (2048 bit keys are standard, smaller
80 keys are used for testing). If you do not provide a keysize, I will
81 use my default, which is set by a call to set_default_keysize(). If
82 set_default_keysize() has never been called, I will create 2048 bit
84 keysize = keysize or self.default_keysize
86 d = self._remote.callRemote('get_rsa_key_pair', keysize)
87 def make_key_objs((verifying_key, signing_key)):
88 v = rsa.create_verifying_key_from_string(verifying_key)
89 s = rsa.create_signing_key_from_string(signing_key)
91 d.addCallback(make_key_objs)
94 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
96 signer = rsa.generate(keysize)
97 verifier = signer.get_verifying_key()
98 return defer.succeed( (verifier, signer) )
100 class Terminator(service.Service):
102 self._clients = weakref.WeakKeyDictionary()
103 def register(self, c):
104 self._clients[c] = None
105 def stopService(self):
106 for c in self._clients:
108 return service.Service.stopService(self)
111 class Client(node.Node, pollmixin.PollMixin):
112 implements(IStatsProducer)
114 PORTNUMFILE = "client.port"
117 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
119 # This means that if a storage server treats me as though I were a
120 # 1.0.0 storage client, it will work as they expect.
121 OLDEST_SUPPORTED_VERSION = "1.0.0"
123 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
124 # is the number of shares required to reconstruct a file. 'desired' means
125 # that we will abort an upload unless we can allocate space for at least
126 # this many. 'total' is the total number of shares created by encoding.
127 # If everybody has room then this is is how many we will upload.
128 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
131 "max_segment_size": 128*KiB,
134 def __init__(self, basedir="."):
135 node.Node.__init__(self, basedir)
136 self.started_timestamp = time.time()
137 self.logSource="Client"
138 self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
139 self.init_introducer_client()
140 self.init_stats_provider()
145 if self.get_config("helper", "enabled", False, boolean=True):
147 self._key_generator = KeyGenerator()
148 key_gen_furl = self.get_config("client", "key_generator.furl", None)
150 self.init_key_gen(key_gen_furl)
152 # ControlServer and Helper are attached after Tub startup
153 self.init_ftp_server()
154 self.init_sftp_server()
155 self.init_drop_uploader()
157 hotline_file = os.path.join(self.basedir,
158 self.SUICIDE_PREVENTION_HOTLINE_FILE)
159 if os.path.exists(hotline_file):
160 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
161 self.log("hotline file noticed (%ds old), starting timer" % age)
162 hotline = TimerService(1.0, self._check_hotline, hotline_file)
163 hotline.setServiceParent(self)
165 # this needs to happen last, so it can use getServiceNamed() to
166 # acquire references to StorageServer and other web-statusable things
167 webport = self.get_config("node", "web.port", None)
169 self.init_web(webport) # strports string
171 def init_introducer_client(self):
172 self.introducer_furl = self.get_config("client", "introducer.furl")
173 ic = IntroducerClient(self.tub, self.introducer_furl,
175 str(allmydata.__full_version__),
176 str(self.OLDEST_SUPPORTED_VERSION))
177 self.introducer_client = ic
178 # hold off on starting the IntroducerClient until our tub has been
179 # started, so we'll have a useful address on our RemoteReference, so
180 # that the introducer's status page will show us.
181 d = self.when_tub_ready()
182 def _start_introducer_client(res):
183 ic.setServiceParent(self)
184 d.addCallback(_start_introducer_client)
185 d.addErrback(log.err, facility="tahoe.init",
186 level=log.BAD, umid="URyI5w")
188 def init_stats_provider(self):
189 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
190 self.stats_provider = StatsProvider(self, gatherer_furl)
191 self.add_service(self.stats_provider)
192 self.stats_provider.register_producer(self)
195 return { 'node.uptime': time.time() - self.started_timestamp }
197 def init_secrets(self):
198 lease_s = self.get_or_create_private_config("secret", _make_secret)
199 lease_secret = base32.a2b(lease_s)
200 convergence_s = self.get_or_create_private_config('convergence',
202 self.convergence = base32.a2b(convergence_s)
203 self._secret_holder = SecretHolder(lease_secret, self.convergence)
205 def init_storage(self):
206 # should we run a storage server (and publish it for others to use)?
207 if not self.get_config("storage", "enabled", True, boolean=True):
209 readonly = self.get_config("storage", "readonly", False, boolean=True)
211 storedir = os.path.join(self.basedir, self.STOREDIR)
213 data = self.get_config("storage", "reserved_space", None)
216 reserved = parse_abbreviated_size(data)
218 log.msg("[storage]reserved_space= contains unparseable value %s"
222 discard = self.get_config("storage", "debug_discard", False,
225 expire = self.get_config("storage", "expire.enabled", False, boolean=True)
227 mode = self.get_config("storage", "expire.mode") # require a mode
229 mode = self.get_config("storage", "expire.mode", "age")
231 o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
232 if o_l_d is not None:
233 o_l_d = parse_duration(o_l_d)
236 if mode == "cutoff-date":
237 cutoff_date = self.get_config("storage", "expire.cutoff_date")
238 cutoff_date = parse_date(cutoff_date)
241 if self.get_config("storage", "expire.immutable", True, boolean=True):
242 sharetypes.append("immutable")
243 if self.get_config("storage", "expire.mutable", True, boolean=True):
244 sharetypes.append("mutable")
245 expiration_sharetypes = tuple(sharetypes)
247 ss = StorageServer(storedir, self.nodeid,
248 reserved_space=reserved,
249 discard_storage=discard,
250 readonly_storage=readonly,
251 stats_provider=self.stats_provider,
252 expiration_enabled=expire,
253 expiration_mode=mode,
254 expiration_override_lease_duration=o_l_d,
255 expiration_cutoff_date=cutoff_date,
256 expiration_sharetypes=expiration_sharetypes)
259 d = self.when_tub_ready()
260 # we can't do registerReference until the Tub is ready
262 furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
263 furl = self.tub.registerReference(ss, furlFile=furl_file)
264 ri_name = RIStorageServer.__remote_name__
265 self.introducer_client.publish(furl, "storage", ri_name)
266 d.addCallback(_publish)
267 d.addErrback(log.err, facility="tahoe.init",
268 level=log.BAD, umid="aLGBKw")
270 def init_client(self):
271 helper_furl = self.get_config("client", "helper.furl", None)
272 DEP = self.DEFAULT_ENCODING_PARAMETERS
273 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
274 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
275 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
277 self.init_client_storage_broker()
278 self.history = History(self.stats_provider)
279 self.terminator = Terminator()
280 self.terminator.setServiceParent(self)
281 self.add_service(Uploader(helper_furl, self.stats_provider))
282 self.init_stub_client()
283 self.init_blacklist()
284 self.init_nodemaker()
286 def init_client_storage_broker(self):
287 # create a StorageFarmBroker object, for use by Uploader/Downloader
288 # (and everybody else who wants to use storage servers)
289 sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
290 self.storage_broker = sb
292 # load static server specifications from tahoe.cfg, if any.
293 # Not quite ready yet.
294 #if self.config.has_section("client-server-selection"):
295 # server_params = {} # maps serverid to dict of parameters
296 # for (name, value) in self.config.items("client-server-selection"):
297 # pieces = name.split(".")
298 # if pieces[0] == "server":
299 # serverid = pieces[1]
300 # if serverid not in server_params:
301 # server_params[serverid] = {}
302 # server_params[serverid][pieces[2]] = value
303 # for serverid, params in server_params.items():
304 # server_type = params.pop("type")
305 # if server_type == "tahoe-foolscap":
306 # s = storage_client.NativeStorageClient(*params)
308 # msg = ("unrecognized server type '%s' in "
309 # "tahoe.cfg [client-server-selection]server.%s.type"
310 # % (server_type, serverid))
311 # raise storage_client.UnknownServerTypeError(msg)
312 # sb.add_server(s.serverid, s)
314 # check to see if we're supposed to use the introducer too
315 if self.get_config("client-server-selection", "use_introducer",
316 default=True, boolean=True):
317 sb.use_introducer(self.introducer_client)
319 def get_storage_broker(self):
320 return self.storage_broker
322 def init_stub_client(self):
324 # we publish an empty object so that the introducer can count how
325 # many clients are connected and see what versions they're
328 furl = self.tub.registerReference(sc)
329 ri_name = RIStubClient.__remote_name__
330 self.introducer_client.publish(furl, "stub_client", ri_name)
331 d = self.when_tub_ready()
332 d.addCallback(_publish)
333 d.addErrback(log.err, facility="tahoe.init",
334 level=log.BAD, umid="OEHq3g")
336 def init_blacklist(self):
337 fn = os.path.join(self.basedir, "access.blacklist")
338 self.blacklist = Blacklist(fn)
340 def init_nodemaker(self):
341 self.nodemaker = NodeMaker(self.storage_broker,
344 self.getServiceNamed("uploader"),
346 self.get_encoding_parameters(),
349 default = self.get_config("client", "mutable.format", default="sdmf")
350 if default == "mdmf":
351 self.mutable_file_default = MDMF_VERSION
353 self.mutable_file_default = SDMF_VERSION
355 def get_history(self):
358 def init_control(self):
359 d = self.when_tub_ready()
362 c.setServiceParent(self)
363 control_url = self.tub.registerReference(c)
364 self.write_private_config("control.furl", control_url + "\n")
365 d.addCallback(_publish)
366 d.addErrback(log.err, facility="tahoe.init",
367 level=log.BAD, umid="d3tNXA")
369 def init_helper(self):
370 d = self.when_tub_ready()
372 self.helper = Helper(os.path.join(self.basedir, "helper"),
373 self.storage_broker, self._secret_holder,
374 self.stats_provider, self.history)
375 # TODO: this is confusing. BASEDIR/private/helper.furl is created
376 # by the helper. BASEDIR/helper.furl is consumed by the client
377 # who wants to use the helper. I like having the filename be the
378 # same, since that makes 'cp' work smoothly, but the difference
379 # between config inputs and generated outputs is hard to see.
380 helper_furlfile = os.path.join(self.basedir,
381 "private", "helper.furl").encode(get_filesystem_encoding())
382 self.tub.registerReference(self.helper, furlFile=helper_furlfile)
383 d.addCallback(_publish)
384 d.addErrback(log.err, facility="tahoe.init",
385 level=log.BAD, umid="K0mW5w")
387 def init_key_gen(self, key_gen_furl):
388 d = self.when_tub_ready()
389 def _subscribe(self):
390 self.tub.connectTo(key_gen_furl, self._got_key_generator)
391 d.addCallback(_subscribe)
392 d.addErrback(log.err, facility="tahoe.init",
393 level=log.BAD, umid="z9DMzw")
395 def _got_key_generator(self, key_generator):
396 self._key_generator.set_remote_generator(key_generator)
397 key_generator.notifyOnDisconnect(self._lost_key_generator)
399 def _lost_key_generator(self):
400 self._key_generator.set_remote_generator(None)
402 def set_default_mutable_keysize(self, keysize):
403 self._key_generator.set_default_keysize(keysize)
405 def init_web(self, webport):
406 self.log("init_web(webport=%s)", args=(webport,))
408 from allmydata.webish import WebishServer
409 nodeurl_path = os.path.join(self.basedir, "node.url")
410 staticdir = self.get_config("node", "web.static", "public_html")
411 staticdir = os.path.expanduser(staticdir)
412 ws = WebishServer(self, webport, nodeurl_path, staticdir)
415 def init_ftp_server(self):
416 if self.get_config("ftpd", "enabled", False, boolean=True):
417 accountfile = self.get_config("ftpd", "accounts.file", None)
418 accounturl = self.get_config("ftpd", "accounts.url", None)
419 ftp_portstr = self.get_config("ftpd", "port", "8021")
421 from allmydata.frontends import ftpd
422 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
423 s.setServiceParent(self)
425 def init_sftp_server(self):
426 if self.get_config("sftpd", "enabled", False, boolean=True):
427 accountfile = self.get_config("sftpd", "accounts.file", None)
428 accounturl = self.get_config("sftpd", "accounts.url", None)
429 sftp_portstr = self.get_config("sftpd", "port", "8022")
430 pubkey_file = self.get_config("sftpd", "host_pubkey_file")
431 privkey_file = self.get_config("sftpd", "host_privkey_file")
433 from allmydata.frontends import sftpd
434 s = sftpd.SFTPServer(self, accountfile, accounturl,
435 sftp_portstr, pubkey_file, privkey_file)
436 s.setServiceParent(self)
438 def init_drop_uploader(self):
439 if self.get_config("drop_upload", "enabled", False, boolean=True):
440 upload_dircap = self.get_config("drop_upload", "upload.dircap", None)
441 local_dir_utf8 = self.get_config("drop_upload", "local.directory", None)
443 if upload_dircap and local_dir_utf8:
445 from allmydata.frontends import drop_upload
446 s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
447 s.setServiceParent(self)
450 self.log("couldn't start drop-uploader: %r", args=(e,))
452 self.log("couldn't start drop-uploader: upload.dircap or local.directory not specified")
454 def _check_hotline(self, hotline_file):
455 if os.path.exists(hotline_file):
456 mtime = os.stat(hotline_file)[stat.ST_MTIME]
457 if mtime > time.time() - 120.0:
460 self.log("hotline file too old, shutting down")
462 self.log("hotline file missing, shutting down")
465 def get_encoding_parameters(self):
466 return self.DEFAULT_ENCODING_PARAMETERS
468 def connected_to_introducer(self):
469 if self.introducer_client:
470 return self.introducer_client.connected_to_introducer()
473 def get_renewal_secret(self): # this will go away
474 return self._secret_holder.get_renewal_secret()
476 def get_cancel_secret(self):
477 return self._secret_holder.get_cancel_secret()
479 def debug_wait_for_client_connections(self, num_clients):
480 """Return a Deferred that fires (with None) when we have connections
481 to the given number of peers. Useful for tests that set up a
482 temporary test network and need to know when it is safe to proceed
483 with an upload or download."""
485 return len(self.storage_broker.get_connected_servers()) >= num_clients
486 d = self.poll(_check, 0.5)
487 d.addCallback(lambda res: None)
491 # these four methods are the primitives for creating filenodes and
492 # dirnodes. The first takes a URI and produces a filenode or (new-style)
493 # dirnode. The other three create brand-new filenodes/dirnodes.
495 def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
496 # This returns synchronously.
497 # Note that it does *not* validate the write_uri and read_uri; instead we
498 # may get an opaque node if there were any problems.
499 return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
501 def create_dirnode(self, initial_children={}, version=SDMF_VERSION):
502 d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
505 def create_immutable_dirnode(self, children, convergence=None):
506 return self.nodemaker.create_immutable_directory(children, convergence)
508 def create_mutable_file(self, contents=None, keysize=None, version=None):
510 version = self.mutable_file_default
511 return self.nodemaker.create_mutable_file(contents, keysize,
514 def upload(self, uploadable):
515 uploader = self.getServiceNamed("uploader")
516 return uploader.upload(uploadable, history=self.get_history())