1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
5 from zope.interface import implements
6 from twisted.internet import reactor, defer
7 from twisted.application import service
8 from twisted.application.internet import TimerService
9 from foolscap.api import Referenceable
10 from pycryptopp.publickey import rsa
13 from allmydata.storage.server import StorageServer
14 from allmydata import storage_client
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.offloaded import Helper
17 from allmydata.control import ControlServer
18 from allmydata.introducer.client import IntroducerClient
19 from allmydata.util import hashutil, base32, pollmixin, log
20 from allmydata.util.encodingutil import get_filesystem_encoding
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.stats import StatsProvider
24 from allmydata.history import History
25 from allmydata.interfaces import IStatsProducer, RIStubClient, \
26 SDMF_VERSION, MDMF_VERSION
27 from allmydata.nodemaker import NodeMaker
36 class StubClient(Referenceable):
37 implements(RIStubClient)
40 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
43 def __init__(self, lease_secret, convergence_secret):
44 self._lease_secret = lease_secret
45 self._convergence_secret = convergence_secret
47 def get_renewal_secret(self):
48 return hashutil.my_renewal_secret_hash(self._lease_secret)
50 def get_cancel_secret(self):
51 return hashutil.my_cancel_secret_hash(self._lease_secret)
53 def get_convergence_secret(self):
54 return self._convergence_secret
57 """I create RSA keys for mutable files. Each call to generate() returns a
58 single keypair. The keysize is specified first by the keysize= argument
59 to generate(), then with a default set by set_default_keysize(), then
60 with a built-in default of 2048 bits."""
63 self.default_keysize = 2048
65 def set_remote_generator(self, keygen):
67 def set_default_keysize(self, keysize):
68 """Call this to override the size of the RSA keys created for new
69 mutable files which don't otherwise specify a size. This will affect
70 all subsequent calls to generate() without a keysize= argument. The
71 default size is 2048 bits. Test cases should call this method once
72 during setup, to cause me to create smaller keys, so the unit tests
74 self.default_keysize = keysize
76 def generate(self, keysize=None):
77 """I return a Deferred that fires with a (verifyingkey, signingkey)
78 pair. I accept a keysize in bits (2048 bit keys are standard, smaller
79 keys are used for testing). If you do not provide a keysize, I will
80 use my default, which is set by a call to set_default_keysize(). If
81 set_default_keysize() has never been called, I will create 2048 bit
83 keysize = keysize or self.default_keysize
85 d = self._remote.callRemote('get_rsa_key_pair', keysize)
86 def make_key_objs((verifying_key, signing_key)):
87 v = rsa.create_verifying_key_from_string(verifying_key)
88 s = rsa.create_signing_key_from_string(signing_key)
90 d.addCallback(make_key_objs)
93 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
95 signer = rsa.generate(keysize)
96 verifier = signer.get_verifying_key()
97 return defer.succeed( (verifier, signer) )
99 class Terminator(service.Service):
101 self._clients = weakref.WeakKeyDictionary()
102 def register(self, c):
103 self._clients[c] = None
104 def stopService(self):
105 for c in self._clients:
107 return service.Service.stopService(self)
110 class Client(node.Node, pollmixin.PollMixin):
111 implements(IStatsProducer)
113 PORTNUMFILE = "client.port"
116 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
118 # This means that if a storage server treats me as though I were a
119 # 1.0.0 storage client, it will work as they expect.
120 OLDEST_SUPPORTED_VERSION = "1.0.0"
122 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
123 # is the number of shares required to reconstruct a file. 'desired' means
124 # that we will abort an upload unless we can allocate space for at least
125 # this many. 'total' is the total number of shares created by encoding.
126 # If everybody has room then this is is how many we will upload.
127 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
130 "max_segment_size": 128*KiB,
133 def __init__(self, basedir="."):
134 node.Node.__init__(self, basedir)
135 self.started_timestamp = time.time()
136 self.logSource="Client"
137 self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
138 self.init_introducer_client()
139 self.init_stats_provider()
144 if self.get_config("helper", "enabled", False, boolean=True):
146 self._key_generator = KeyGenerator()
147 key_gen_furl = self.get_config("client", "key_generator.furl", None)
149 self.init_key_gen(key_gen_furl)
151 # ControlServer and Helper are attached after Tub startup
152 self.init_ftp_server()
153 self.init_sftp_server()
154 self.init_drop_uploader()
156 hotline_file = os.path.join(self.basedir,
157 self.SUICIDE_PREVENTION_HOTLINE_FILE)
158 if os.path.exists(hotline_file):
159 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
160 self.log("hotline file noticed (%ds old), starting timer" % age)
161 hotline = TimerService(1.0, self._check_hotline, hotline_file)
162 hotline.setServiceParent(self)
164 # this needs to happen last, so it can use getServiceNamed() to
165 # acquire references to StorageServer and other web-statusable things
166 webport = self.get_config("node", "web.port", None)
168 self.init_web(webport) # strports string
170 def init_introducer_client(self):
171 self.introducer_furl = self.get_config("client", "introducer.furl")
172 ic = IntroducerClient(self.tub, self.introducer_furl,
174 str(allmydata.__full_version__),
175 str(self.OLDEST_SUPPORTED_VERSION))
176 self.introducer_client = ic
177 # hold off on starting the IntroducerClient until our tub has been
178 # started, so we'll have a useful address on our RemoteReference, so
179 # that the introducer's status page will show us.
180 d = self.when_tub_ready()
181 def _start_introducer_client(res):
182 ic.setServiceParent(self)
183 d.addCallback(_start_introducer_client)
184 d.addErrback(log.err, facility="tahoe.init",
185 level=log.BAD, umid="URyI5w")
187 def init_stats_provider(self):
188 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
189 self.stats_provider = StatsProvider(self, gatherer_furl)
190 self.add_service(self.stats_provider)
191 self.stats_provider.register_producer(self)
194 return { 'node.uptime': time.time() - self.started_timestamp }
196 def init_secrets(self):
197 lease_s = self.get_or_create_private_config("secret", _make_secret)
198 lease_secret = base32.a2b(lease_s)
199 convergence_s = self.get_or_create_private_config('convergence',
201 self.convergence = base32.a2b(convergence_s)
202 self._secret_holder = SecretHolder(lease_secret, self.convergence)
204 def init_storage(self):
205 # should we run a storage server (and publish it for others to use)?
206 if not self.get_config("storage", "enabled", True, boolean=True):
208 readonly = self.get_config("storage", "readonly", False, boolean=True)
210 storedir = os.path.join(self.basedir, self.STOREDIR)
212 data = self.get_config("storage", "reserved_space", None)
215 reserved = parse_abbreviated_size(data)
217 log.msg("[storage]reserved_space= contains unparseable value %s"
221 discard = self.get_config("storage", "debug_discard", False,
224 expire = self.get_config("storage", "expire.enabled", False, boolean=True)
226 mode = self.get_config("storage", "expire.mode") # require a mode
228 mode = self.get_config("storage", "expire.mode", "age")
230 o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
231 if o_l_d is not None:
232 o_l_d = parse_duration(o_l_d)
235 if mode == "cutoff-date":
236 cutoff_date = self.get_config("storage", "expire.cutoff_date")
237 cutoff_date = parse_date(cutoff_date)
240 if self.get_config("storage", "expire.immutable", True, boolean=True):
241 sharetypes.append("immutable")
242 if self.get_config("storage", "expire.mutable", True, boolean=True):
243 sharetypes.append("mutable")
244 expiration_sharetypes = tuple(sharetypes)
246 ss = StorageServer(storedir, self.nodeid,
247 reserved_space=reserved,
248 discard_storage=discard,
249 readonly_storage=readonly,
250 stats_provider=self.stats_provider,
251 expiration_enabled=expire,
252 expiration_mode=mode,
253 expiration_override_lease_duration=o_l_d,
254 expiration_cutoff_date=cutoff_date,
255 expiration_sharetypes=expiration_sharetypes)
258 d = self.when_tub_ready()
259 # we can't do registerReference until the Tub is ready
261 furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
262 furl = self.tub.registerReference(ss, furlFile=furl_file)
263 ri_name = RIStorageServer.__remote_name__
264 self.introducer_client.publish(furl, "storage", ri_name)
265 d.addCallback(_publish)
266 d.addErrback(log.err, facility="tahoe.init",
267 level=log.BAD, umid="aLGBKw")
269 def init_client(self):
270 helper_furl = self.get_config("client", "helper.furl", None)
271 DEP = self.DEFAULT_ENCODING_PARAMETERS
272 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
273 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
274 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
276 self.init_client_storage_broker()
277 self.history = History(self.stats_provider)
278 self.terminator = Terminator()
279 self.terminator.setServiceParent(self)
280 self.add_service(Uploader(helper_furl, self.stats_provider))
281 self.init_stub_client()
282 self.init_nodemaker()
284 def init_client_storage_broker(self):
285 # create a StorageFarmBroker object, for use by Uploader/Downloader
286 # (and everybody else who wants to use storage servers)
287 sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
288 self.storage_broker = sb
290 # load static server specifications from tahoe.cfg, if any.
291 # Not quite ready yet.
292 #if self.config.has_section("client-server-selection"):
293 # server_params = {} # maps serverid to dict of parameters
294 # for (name, value) in self.config.items("client-server-selection"):
295 # pieces = name.split(".")
296 # if pieces[0] == "server":
297 # serverid = pieces[1]
298 # if serverid not in server_params:
299 # server_params[serverid] = {}
300 # server_params[serverid][pieces[2]] = value
301 # for serverid, params in server_params.items():
302 # server_type = params.pop("type")
303 # if server_type == "tahoe-foolscap":
304 # s = storage_client.NativeStorageClient(*params)
306 # msg = ("unrecognized server type '%s' in "
307 # "tahoe.cfg [client-server-selection]server.%s.type"
308 # % (server_type, serverid))
309 # raise storage_client.UnknownServerTypeError(msg)
310 # sb.add_server(s.serverid, s)
312 # check to see if we're supposed to use the introducer too
313 if self.get_config("client-server-selection", "use_introducer",
314 default=True, boolean=True):
315 sb.use_introducer(self.introducer_client)
317 def get_storage_broker(self):
318 return self.storage_broker
320 def init_stub_client(self):
322 # we publish an empty object so that the introducer can count how
323 # many clients are connected and see what versions they're
326 furl = self.tub.registerReference(sc)
327 ri_name = RIStubClient.__remote_name__
328 self.introducer_client.publish(furl, "stub_client", ri_name)
329 d = self.when_tub_ready()
330 d.addCallback(_publish)
331 d.addErrback(log.err, facility="tahoe.init",
332 level=log.BAD, umid="OEHq3g")
334 def init_nodemaker(self):
335 self.nodemaker = NodeMaker(self.storage_broker,
338 self.getServiceNamed("uploader"),
340 self.get_encoding_parameters(),
342 default = self.get_config("client", "mutable.format", default="sdmf")
343 if default == "mdmf":
344 self.mutable_file_default = MDMF_VERSION
346 self.mutable_file_default = SDMF_VERSION
348 def get_history(self):
351 def init_control(self):
352 d = self.when_tub_ready()
355 c.setServiceParent(self)
356 control_url = self.tub.registerReference(c)
357 self.write_private_config("control.furl", control_url + "\n")
358 d.addCallback(_publish)
359 d.addErrback(log.err, facility="tahoe.init",
360 level=log.BAD, umid="d3tNXA")
362 def init_helper(self):
363 d = self.when_tub_ready()
365 self.helper = Helper(os.path.join(self.basedir, "helper"),
366 self.storage_broker, self._secret_holder,
367 self.stats_provider, self.history)
368 # TODO: this is confusing. BASEDIR/private/helper.furl is created
369 # by the helper. BASEDIR/helper.furl is consumed by the client
370 # who wants to use the helper. I like having the filename be the
371 # same, since that makes 'cp' work smoothly, but the difference
372 # between config inputs and generated outputs is hard to see.
373 helper_furlfile = os.path.join(self.basedir,
374 "private", "helper.furl").encode(get_filesystem_encoding())
375 self.tub.registerReference(self.helper, furlFile=helper_furlfile)
376 d.addCallback(_publish)
377 d.addErrback(log.err, facility="tahoe.init",
378 level=log.BAD, umid="K0mW5w")
380 def init_key_gen(self, key_gen_furl):
381 d = self.when_tub_ready()
382 def _subscribe(self):
383 self.tub.connectTo(key_gen_furl, self._got_key_generator)
384 d.addCallback(_subscribe)
385 d.addErrback(log.err, facility="tahoe.init",
386 level=log.BAD, umid="z9DMzw")
388 def _got_key_generator(self, key_generator):
389 self._key_generator.set_remote_generator(key_generator)
390 key_generator.notifyOnDisconnect(self._lost_key_generator)
392 def _lost_key_generator(self):
393 self._key_generator.set_remote_generator(None)
395 def set_default_mutable_keysize(self, keysize):
396 self._key_generator.set_default_keysize(keysize)
398 def init_web(self, webport):
399 self.log("init_web(webport=%s)", args=(webport,))
401 from allmydata.webish import WebishServer
402 nodeurl_path = os.path.join(self.basedir, "node.url")
403 staticdir = self.get_config("node", "web.static", "public_html")
404 staticdir = os.path.expanduser(staticdir)
405 ws = WebishServer(self, webport, nodeurl_path, staticdir)
408 def init_ftp_server(self):
409 if self.get_config("ftpd", "enabled", False, boolean=True):
410 accountfile = self.get_config("ftpd", "accounts.file", None)
411 accounturl = self.get_config("ftpd", "accounts.url", None)
412 ftp_portstr = self.get_config("ftpd", "port", "8021")
414 from allmydata.frontends import ftpd
415 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
416 s.setServiceParent(self)
418 def init_sftp_server(self):
419 if self.get_config("sftpd", "enabled", False, boolean=True):
420 accountfile = self.get_config("sftpd", "accounts.file", None)
421 accounturl = self.get_config("sftpd", "accounts.url", None)
422 sftp_portstr = self.get_config("sftpd", "port", "8022")
423 pubkey_file = self.get_config("sftpd", "host_pubkey_file")
424 privkey_file = self.get_config("sftpd", "host_privkey_file")
426 from allmydata.frontends import sftpd
427 s = sftpd.SFTPServer(self, accountfile, accounturl,
428 sftp_portstr, pubkey_file, privkey_file)
429 s.setServiceParent(self)
431 def init_drop_uploader(self):
432 if self.get_config("drop_upload", "enabled", False, boolean=True):
433 upload_dircap = self.get_config("drop_upload", "upload.dircap", None)
434 local_dir_utf8 = self.get_config("drop_upload", "local.directory", None)
436 if upload_dircap and local_dir_utf8:
438 from allmydata.frontends import drop_upload
439 s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
440 s.setServiceParent(self)
443 self.log("couldn't start drop-uploader: %r", args=(e,))
445 self.log("couldn't start drop-uploader: upload.dircap or local.directory not specified")
447 def _check_hotline(self, hotline_file):
448 if os.path.exists(hotline_file):
449 mtime = os.stat(hotline_file)[stat.ST_MTIME]
450 if mtime > time.time() - 120.0:
453 self.log("hotline file too old, shutting down")
455 self.log("hotline file missing, shutting down")
458 def get_encoding_parameters(self):
459 return self.DEFAULT_ENCODING_PARAMETERS
461 def connected_to_introducer(self):
462 if self.introducer_client:
463 return self.introducer_client.connected_to_introducer()
466 def get_renewal_secret(self): # this will go away
467 return self._secret_holder.get_renewal_secret()
469 def get_cancel_secret(self):
470 return self._secret_holder.get_cancel_secret()
472 def debug_wait_for_client_connections(self, num_clients):
473 """Return a Deferred that fires (with None) when we have connections
474 to the given number of peers. Useful for tests that set up a
475 temporary test network and need to know when it is safe to proceed
476 with an upload or download."""
478 return len(self.storage_broker.get_connected_servers()) >= num_clients
479 d = self.poll(_check, 0.5)
480 d.addCallback(lambda res: None)
484 # these four methods are the primitives for creating filenodes and
485 # dirnodes. The first takes a URI and produces a filenode or (new-style)
486 # dirnode. The other three create brand-new filenodes/dirnodes.
488 def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
489 # This returns synchronously.
490 # Note that it does *not* validate the write_uri and read_uri; instead we
491 # may get an opaque node if there were any problems.
492 return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
494 def create_dirnode(self, initial_children={}, version=SDMF_VERSION):
495 d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
498 def create_immutable_dirnode(self, children, convergence=None):
499 return self.nodemaker.create_immutable_directory(children, convergence)
501 def create_mutable_file(self, contents=None, keysize=None, version=None):
503 version = self.mutable_file_default
504 return self.nodemaker.create_mutable_file(contents, keysize,
507 def upload(self, uploadable):
508 uploader = self.getServiceNamed("uploader")
509 return uploader.upload(uploadable, history=self.get_history())