1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
5 from zope.interface import implements
6 from twisted.internet import reactor, defer
7 from twisted.application import service
8 from twisted.application.internet import TimerService
9 from foolscap.api import Referenceable
10 from pycryptopp.publickey import rsa
13 from allmydata.storage.server import StorageServer
14 from allmydata import storage_client
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.offloaded import Helper
17 from allmydata.control import ControlServer
18 from allmydata.introducer.client import IntroducerClient
19 from allmydata.util import hashutil, base32, pollmixin, log
20 from allmydata.util.encodingutil import get_filesystem_encoding
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.stats import StatsProvider
24 from allmydata.history import History
25 from allmydata.interfaces import IStatsProducer, RIStubClient
26 from allmydata.nodemaker import NodeMaker
35 class StubClient(Referenceable):
36 implements(RIStubClient)
39 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
42 def __init__(self, lease_secret, convergence_secret):
43 self._lease_secret = lease_secret
44 self._convergence_secret = convergence_secret
46 def get_renewal_secret(self):
47 return hashutil.my_renewal_secret_hash(self._lease_secret)
49 def get_cancel_secret(self):
50 return hashutil.my_cancel_secret_hash(self._lease_secret)
52 def get_convergence_secret(self):
53 return self._convergence_secret
56 """I create RSA keys for mutable files. Each call to generate() returns a
57 single keypair. The keysize is specified first by the keysize= argument
58 to generate(), then with a default set by set_default_keysize(), then
59 with a built-in default of 2048 bits."""
62 self.default_keysize = 2048
64 def set_remote_generator(self, keygen):
66 def set_default_keysize(self, keysize):
67 """Call this to override the size of the RSA keys created for new
68 mutable files which don't otherwise specify a size. This will affect
69 all subsequent calls to generate() without a keysize= argument. The
70 default size is 2048 bits. Test cases should call this method once
71 during setup, to cause me to create smaller (522 bit) keys, so the
72 unit tests run faster."""
73 self.default_keysize = keysize
75 def generate(self, keysize=None):
76 """I return a Deferred that fires with a (verifyingkey, signingkey)
77 pair. I accept a keysize in bits (522 bit keys are fast for testing,
78 2048 bit keys are standard). If you do not provide a keysize, I will
79 use my default, which is set by a call to set_default_keysize(). If
80 set_default_keysize() has never been called, I will create 2048 bit
82 keysize = keysize or self.default_keysize
84 d = self._remote.callRemote('get_rsa_key_pair', keysize)
85 def make_key_objs((verifying_key, signing_key)):
86 v = rsa.create_verifying_key_from_string(verifying_key)
87 s = rsa.create_signing_key_from_string(signing_key)
89 d.addCallback(make_key_objs)
92 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
94 signer = rsa.generate(keysize)
95 verifier = signer.get_verifying_key()
96 return defer.succeed( (verifier, signer) )
98 class Terminator(service.Service):
100 self._clients = weakref.WeakKeyDictionary()
101 def register(self, c):
102 self._clients[c] = None
103 def stopService(self):
104 for c in self._clients:
106 return service.Service.stopService(self)
109 class Client(node.Node, pollmixin.PollMixin):
110 implements(IStatsProducer)
112 PORTNUMFILE = "client.port"
115 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
117 # This means that if a storage server treats me as though I were a
118 # 1.0.0 storage client, it will work as they expect.
119 OLDEST_SUPPORTED_VERSION = "1.0.0"
121 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
122 # is the number of shares required to reconstruct a file. 'desired' means
123 # that we will abort an upload unless we can allocate space for at least
124 # this many. 'total' is the total number of shares created by encoding.
125 # If everybody has room then this is is how many we will upload.
126 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
129 "max_segment_size": 128*KiB,
132 def __init__(self, basedir="."):
133 node.Node.__init__(self, basedir)
134 self.started_timestamp = time.time()
135 self.logSource="Client"
136 self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
137 self.init_introducer_client()
138 self.init_stats_provider()
143 if self.get_config("helper", "enabled", False, boolean=True):
145 self._key_generator = KeyGenerator()
146 key_gen_furl = self.get_config("client", "key_generator.furl", None)
148 self.init_key_gen(key_gen_furl)
150 # ControlServer and Helper are attached after Tub startup
151 self.init_ftp_server()
152 self.init_sftp_server()
153 self.init_drop_uploader()
155 hotline_file = os.path.join(self.basedir,
156 self.SUICIDE_PREVENTION_HOTLINE_FILE)
157 if os.path.exists(hotline_file):
158 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
159 self.log("hotline file noticed (%ds old), starting timer" % age)
160 hotline = TimerService(1.0, self._check_hotline, hotline_file)
161 hotline.setServiceParent(self)
163 # this needs to happen last, so it can use getServiceNamed() to
164 # acquire references to StorageServer and other web-statusable things
165 webport = self.get_config("node", "web.port", None)
167 self.init_web(webport) # strports string
169 def init_introducer_client(self):
170 self.introducer_furl = self.get_config("client", "introducer.furl")
171 ic = IntroducerClient(self.tub, self.introducer_furl,
173 str(allmydata.__full_version__),
174 str(self.OLDEST_SUPPORTED_VERSION))
175 self.introducer_client = ic
176 # hold off on starting the IntroducerClient until our tub has been
177 # started, so we'll have a useful address on our RemoteReference, so
178 # that the introducer's status page will show us.
179 d = self.when_tub_ready()
180 def _start_introducer_client(res):
181 ic.setServiceParent(self)
182 d.addCallback(_start_introducer_client)
183 d.addErrback(log.err, facility="tahoe.init",
184 level=log.BAD, umid="URyI5w")
186 def init_stats_provider(self):
187 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
188 self.stats_provider = StatsProvider(self, gatherer_furl)
189 self.add_service(self.stats_provider)
190 self.stats_provider.register_producer(self)
193 return { 'node.uptime': time.time() - self.started_timestamp }
195 def init_secrets(self):
196 lease_s = self.get_or_create_private_config("secret", _make_secret)
197 lease_secret = base32.a2b(lease_s)
198 convergence_s = self.get_or_create_private_config('convergence',
200 self.convergence = base32.a2b(convergence_s)
201 self._secret_holder = SecretHolder(lease_secret, self.convergence)
203 def init_storage(self):
204 # should we run a storage server (and publish it for others to use)?
205 if not self.get_config("storage", "enabled", True, boolean=True):
207 readonly = self.get_config("storage", "readonly", False, boolean=True)
209 storedir = os.path.join(self.basedir, self.STOREDIR)
211 data = self.get_config("storage", "reserved_space", None)
214 reserved = parse_abbreviated_size(data)
216 log.msg("[storage]reserved_space= contains unparseable value %s"
220 discard = self.get_config("storage", "debug_discard", False,
223 expire = self.get_config("storage", "expire.enabled", False, boolean=True)
225 mode = self.get_config("storage", "expire.mode") # require a mode
227 mode = self.get_config("storage", "expire.mode", "age")
229 o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
230 if o_l_d is not None:
231 o_l_d = parse_duration(o_l_d)
234 if mode == "cutoff-date":
235 cutoff_date = self.get_config("storage", "expire.cutoff_date")
236 cutoff_date = parse_date(cutoff_date)
239 if self.get_config("storage", "expire.immutable", True, boolean=True):
240 sharetypes.append("immutable")
241 if self.get_config("storage", "expire.mutable", True, boolean=True):
242 sharetypes.append("mutable")
243 expiration_sharetypes = tuple(sharetypes)
245 ss = StorageServer(storedir, self.nodeid,
246 reserved_space=reserved,
247 discard_storage=discard,
248 readonly_storage=readonly,
249 stats_provider=self.stats_provider,
250 expiration_enabled=expire,
251 expiration_mode=mode,
252 expiration_override_lease_duration=o_l_d,
253 expiration_cutoff_date=cutoff_date,
254 expiration_sharetypes=expiration_sharetypes)
257 d = self.when_tub_ready()
258 # we can't do registerReference until the Tub is ready
260 furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
261 furl = self.tub.registerReference(ss, furlFile=furl_file)
262 ri_name = RIStorageServer.__remote_name__
263 self.introducer_client.publish(furl, "storage", ri_name)
264 d.addCallback(_publish)
265 d.addErrback(log.err, facility="tahoe.init",
266 level=log.BAD, umid="aLGBKw")
268 def init_client(self):
269 helper_furl = self.get_config("client", "helper.furl", None)
270 DEP = self.DEFAULT_ENCODING_PARAMETERS
271 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
272 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
273 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
275 self.init_client_storage_broker()
276 self.history = History(self.stats_provider)
277 self.terminator = Terminator()
278 self.terminator.setServiceParent(self)
279 self.add_service(Uploader(helper_furl, self.stats_provider))
280 self.init_stub_client()
281 self.init_nodemaker()
283 def init_client_storage_broker(self):
284 # create a StorageFarmBroker object, for use by Uploader/Downloader
285 # (and everybody else who wants to use storage servers)
286 sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
287 self.storage_broker = sb
289 # load static server specifications from tahoe.cfg, if any.
290 # Not quite ready yet.
291 #if self.config.has_section("client-server-selection"):
292 # server_params = {} # maps serverid to dict of parameters
293 # for (name, value) in self.config.items("client-server-selection"):
294 # pieces = name.split(".")
295 # if pieces[0] == "server":
296 # serverid = pieces[1]
297 # if serverid not in server_params:
298 # server_params[serverid] = {}
299 # server_params[serverid][pieces[2]] = value
300 # for serverid, params in server_params.items():
301 # server_type = params.pop("type")
302 # if server_type == "tahoe-foolscap":
303 # s = storage_client.NativeStorageClient(*params)
305 # msg = ("unrecognized server type '%s' in "
306 # "tahoe.cfg [client-server-selection]server.%s.type"
307 # % (server_type, serverid))
308 # raise storage_client.UnknownServerTypeError(msg)
309 # sb.add_server(s.serverid, s)
311 # check to see if we're supposed to use the introducer too
312 if self.get_config("client-server-selection", "use_introducer",
313 default=True, boolean=True):
314 sb.use_introducer(self.introducer_client)
316 def get_storage_broker(self):
317 return self.storage_broker
319 def init_stub_client(self):
321 # we publish an empty object so that the introducer can count how
322 # many clients are connected and see what versions they're
325 furl = self.tub.registerReference(sc)
326 ri_name = RIStubClient.__remote_name__
327 self.introducer_client.publish(furl, "stub_client", ri_name)
328 d = self.when_tub_ready()
329 d.addCallback(_publish)
330 d.addErrback(log.err, facility="tahoe.init",
331 level=log.BAD, umid="OEHq3g")
333 def init_nodemaker(self):
334 self.nodemaker = NodeMaker(self.storage_broker,
337 self.getServiceNamed("uploader"),
339 self.get_encoding_parameters(),
342 def get_history(self):
345 def init_control(self):
346 d = self.when_tub_ready()
349 c.setServiceParent(self)
350 control_url = self.tub.registerReference(c)
351 self.write_private_config("control.furl", control_url + "\n")
352 d.addCallback(_publish)
353 d.addErrback(log.err, facility="tahoe.init",
354 level=log.BAD, umid="d3tNXA")
356 def init_helper(self):
357 d = self.when_tub_ready()
359 self.helper = Helper(os.path.join(self.basedir, "helper"),
360 self.storage_broker, self._secret_holder,
361 self.stats_provider, self.history)
362 # TODO: this is confusing. BASEDIR/private/helper.furl is created
363 # by the helper. BASEDIR/helper.furl is consumed by the client
364 # who wants to use the helper. I like having the filename be the
365 # same, since that makes 'cp' work smoothly, but the difference
366 # between config inputs and generated outputs is hard to see.
367 helper_furlfile = os.path.join(self.basedir,
368 "private", "helper.furl").encode(get_filesystem_encoding())
369 self.tub.registerReference(self.helper, furlFile=helper_furlfile)
370 d.addCallback(_publish)
371 d.addErrback(log.err, facility="tahoe.init",
372 level=log.BAD, umid="K0mW5w")
374 def init_key_gen(self, key_gen_furl):
375 d = self.when_tub_ready()
376 def _subscribe(self):
377 self.tub.connectTo(key_gen_furl, self._got_key_generator)
378 d.addCallback(_subscribe)
379 d.addErrback(log.err, facility="tahoe.init",
380 level=log.BAD, umid="z9DMzw")
382 def _got_key_generator(self, key_generator):
383 self._key_generator.set_remote_generator(key_generator)
384 key_generator.notifyOnDisconnect(self._lost_key_generator)
386 def _lost_key_generator(self):
387 self._key_generator.set_remote_generator(None)
389 def set_default_mutable_keysize(self, keysize):
390 self._key_generator.set_default_keysize(keysize)
392 def init_web(self, webport):
393 self.log("init_web(webport=%s)", args=(webport,))
395 from allmydata.webish import WebishServer
396 nodeurl_path = os.path.join(self.basedir, "node.url")
397 staticdir = self.get_config("node", "web.static", "public_html")
398 staticdir = os.path.expanduser(staticdir)
399 ws = WebishServer(self, webport, nodeurl_path, staticdir)
402 def init_ftp_server(self):
403 if self.get_config("ftpd", "enabled", False, boolean=True):
404 accountfile = self.get_config("ftpd", "accounts.file", None)
405 accounturl = self.get_config("ftpd", "accounts.url", None)
406 ftp_portstr = self.get_config("ftpd", "port", "8021")
408 from allmydata.frontends import ftpd
409 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
410 s.setServiceParent(self)
412 def init_sftp_server(self):
413 if self.get_config("sftpd", "enabled", False, boolean=True):
414 accountfile = self.get_config("sftpd", "accounts.file", None)
415 accounturl = self.get_config("sftpd", "accounts.url", None)
416 sftp_portstr = self.get_config("sftpd", "port", "8022")
417 pubkey_file = self.get_config("sftpd", "host_pubkey_file")
418 privkey_file = self.get_config("sftpd", "host_privkey_file")
420 from allmydata.frontends import sftpd
421 s = sftpd.SFTPServer(self, accountfile, accounturl,
422 sftp_portstr, pubkey_file, privkey_file)
423 s.setServiceParent(self)
425 def init_drop_uploader(self):
426 if self.get_config("drop_upload", "enabled", False, boolean=True):
427 upload_dircap = self.get_config("drop_upload", "upload.dircap", None)
428 local_dir_utf8 = self.get_config("drop_upload", "local.directory", None)
430 if upload_dircap and local_dir_utf8:
432 from allmydata.frontends import drop_upload
433 s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
434 s.setServiceParent(self)
437 self.log("couldn't start drop-uploader: %r", args=(e,))
439 self.log("couldn't start drop-uploader: upload.dircap or local.directory not specified")
441 def _check_hotline(self, hotline_file):
442 if os.path.exists(hotline_file):
443 mtime = os.stat(hotline_file)[stat.ST_MTIME]
444 if mtime > time.time() - 120.0:
447 self.log("hotline file too old, shutting down")
449 self.log("hotline file missing, shutting down")
452 def get_encoding_parameters(self):
453 return self.DEFAULT_ENCODING_PARAMETERS
455 def connected_to_introducer(self):
456 if self.introducer_client:
457 return self.introducer_client.connected_to_introducer()
460 def get_renewal_secret(self): # this will go away
461 return self._secret_holder.get_renewal_secret()
463 def get_cancel_secret(self):
464 return self._secret_holder.get_cancel_secret()
466 def debug_wait_for_client_connections(self, num_clients):
467 """Return a Deferred that fires (with None) when we have connections
468 to the given number of peers. Useful for tests that set up a
469 temporary test network and need to know when it is safe to proceed
470 with an upload or download."""
472 return len(self.storage_broker.get_connected_servers()) >= num_clients
473 d = self.poll(_check, 0.5)
474 d.addCallback(lambda res: None)
478 # these four methods are the primitives for creating filenodes and
479 # dirnodes. The first takes a URI and produces a filenode or (new-style)
480 # dirnode. The other three create brand-new filenodes/dirnodes.
482 def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
483 # This returns synchronously.
484 # Note that it does *not* validate the write_uri and read_uri; instead we
485 # may get an opaque node if there were any problems.
486 return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
488 def create_dirnode(self, initial_children={}):
489 d = self.nodemaker.create_new_mutable_directory(initial_children)
492 def create_immutable_dirnode(self, children, convergence=None):
493 return self.nodemaker.create_immutable_directory(children, convergence)
495 def create_mutable_file(self, contents=None, keysize=None):
496 return self.nodemaker.create_mutable_file(contents, keysize)
498 def upload(self, uploadable):
499 uploader = self.getServiceNamed("uploader")
500 return uploader.upload(uploadable, history=self.get_history())