2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
5 from zope.interface import implements
6 from twisted.internet import reactor, defer
7 from twisted.application.internet import TimerService
8 from foolscap.api import Referenceable
9 from pycryptopp.publickey import rsa
12 from allmydata.storage.server import StorageServer
13 from allmydata import storage_client
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.download import Downloader
16 from allmydata.immutable.offloaded import Helper
17 from allmydata.control import ControlServer
18 from allmydata.introducer.client import IntroducerClient
19 from allmydata.util import hashutil, base32, pollmixin, cachedir, log
20 from allmydata.util.encodingutil import get_filesystem_encoding
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.stats import StatsProvider
24 from allmydata.history import History
25 from allmydata.interfaces import IStatsProducer, RIStubClient
26 from allmydata.nodemaker import NodeMaker
35 class StubClient(Referenceable):
36 implements(RIStubClient)
39 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
42 def __init__(self, lease_secret, convergence_secret):
43 self._lease_secret = lease_secret
44 self._convergence_secret = convergence_secret
46 def get_renewal_secret(self):
47 return hashutil.my_renewal_secret_hash(self._lease_secret)
49 def get_cancel_secret(self):
50 return hashutil.my_cancel_secret_hash(self._lease_secret)
52 def get_convergence_secret(self):
53 return self._convergence_secret
56 """I create RSA keys for mutable files. Each call to generate() returns a
57 single keypair. The keysize is specified first by the keysize= argument
58 to generate(), then with a default set by set_default_keysize(), then
59 with a built-in default of 2048 bits."""
62 self.default_keysize = 2048
64 def set_remote_generator(self, keygen):
66 def set_default_keysize(self, keysize):
67 """Call this to override the size of the RSA keys created for new
68 mutable files which don't otherwise specify a size. This will affect
69 all subsequent calls to generate() without a keysize= argument. The
70 default size is 2048 bits. Test cases should call this method once
71 during setup, to cause me to create smaller (522 bit) keys, so the
72 unit tests run faster."""
73 self.default_keysize = keysize
75 def generate(self, keysize=None):
76 """I return a Deferred that fires with a (verifyingkey, signingkey)
77 pair. I accept a keysize in bits (522 bit keys are fast for testing,
78 2048 bit keys are standard). If you do not provide a keysize, I will
79 use my default, which is set by a call to set_default_keysize(). If
80 set_default_keysize() has never been called, I will create 2048 bit
82 keysize = keysize or self.default_keysize
84 d = self._remote.callRemote('get_rsa_key_pair', keysize)
85 def make_key_objs((verifying_key, signing_key)):
86 v = rsa.create_verifying_key_from_string(verifying_key)
87 s = rsa.create_signing_key_from_string(signing_key)
89 d.addCallback(make_key_objs)
92 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
94 signer = rsa.generate(keysize)
95 verifier = signer.get_verifying_key()
96 return defer.succeed( (verifier, signer) )
99 class Client(node.Node, pollmixin.PollMixin):
100 implements(IStatsProducer)
102 PORTNUMFILE = "client.port"
105 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
107 # This means that if a storage server treats me as though I were a
108 # 1.0.0 storage client, it will work as they expect.
109 OLDEST_SUPPORTED_VERSION = "1.0.0"
111 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
112 # is the number of shares required to reconstruct a file. 'desired' means
113 # that we will abort an upload unless we can allocate space for at least
114 # this many. 'total' is the total number of shares created by encoding.
115 # If everybody has room then this is is how many we will upload.
116 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
119 "max_segment_size": 128*KiB,
122 def __init__(self, basedir="."):
123 node.Node.__init__(self, basedir)
124 self.started_timestamp = time.time()
125 self.logSource="Client"
126 self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
127 self.init_introducer_client()
128 self.init_stats_provider()
133 if self.get_config("helper", "enabled", False, boolean=True):
135 self._key_generator = KeyGenerator()
136 key_gen_furl = self.get_config("client", "key_generator.furl", None)
138 self.init_key_gen(key_gen_furl)
140 # ControlServer and Helper are attached after Tub startup
141 self.init_ftp_server()
142 self.init_sftp_server()
144 hotline_file = os.path.join(self.basedir,
145 self.SUICIDE_PREVENTION_HOTLINE_FILE)
146 if os.path.exists(hotline_file):
147 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
148 self.log("hotline file noticed (%ds old), starting timer" % age)
149 hotline = TimerService(1.0, self._check_hotline, hotline_file)
150 hotline.setServiceParent(self)
152 # this needs to happen last, so it can use getServiceNamed() to
153 # acquire references to StorageServer and other web-statusable things
154 webport = self.get_config("node", "web.port", None)
156 self.init_web(webport) # strports string
158 def read_old_config_files(self):
159 node.Node.read_old_config_files(self)
160 copy = self._copy_config_from_file
161 copy("introducer.furl", "client", "introducer.furl")
162 copy("helper.furl", "client", "helper.furl")
163 copy("key_generator.furl", "client", "key_generator.furl")
164 copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
165 if os.path.exists(os.path.join(self.basedir, "no_storage")):
166 self.set_config("storage", "enabled", "false")
167 if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
168 self.set_config("storage", "readonly", "true")
169 if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
170 self.set_config("storage", "debug_discard", "true")
171 if os.path.exists(os.path.join(self.basedir, "run_helper")):
172 self.set_config("helper", "enabled", "true")
174 def init_introducer_client(self):
175 self.introducer_furl = self.get_config("client", "introducer.furl")
176 ic = IntroducerClient(self.tub, self.introducer_furl,
178 str(allmydata.__full_version__),
179 str(self.OLDEST_SUPPORTED_VERSION))
180 self.introducer_client = ic
181 # hold off on starting the IntroducerClient until our tub has been
182 # started, so we'll have a useful address on our RemoteReference, so
183 # that the introducer's status page will show us.
184 d = self.when_tub_ready()
185 def _start_introducer_client(res):
186 ic.setServiceParent(self)
187 d.addCallback(_start_introducer_client)
188 d.addErrback(log.err, facility="tahoe.init",
189 level=log.BAD, umid="URyI5w")
191 def init_stats_provider(self):
192 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
193 self.stats_provider = StatsProvider(self, gatherer_furl)
194 self.add_service(self.stats_provider)
195 self.stats_provider.register_producer(self)
198 return { 'node.uptime': time.time() - self.started_timestamp }
200 def init_secrets(self):
201 lease_s = self.get_or_create_private_config("secret", _make_secret)
202 lease_secret = base32.a2b(lease_s)
203 convergence_s = self.get_or_create_private_config('convergence',
205 self.convergence = base32.a2b(convergence_s)
206 self._secret_holder = SecretHolder(lease_secret, self.convergence)
208 def init_storage(self):
209 # should we run a storage server (and publish it for others to use)?
210 if not self.get_config("storage", "enabled", True, boolean=True):
212 readonly = self.get_config("storage", "readonly", False, boolean=True)
214 storedir = os.path.join(self.basedir, self.STOREDIR)
216 data = self.get_config("storage", "reserved_space", None)
219 reserved = parse_abbreviated_size(data)
221 log.msg("[storage]reserved_space= contains unparseable value %s"
225 discard = self.get_config("storage", "debug_discard", False,
228 expire = self.get_config("storage", "expire.enabled", False, boolean=True)
230 mode = self.get_config("storage", "expire.mode") # require a mode
232 mode = self.get_config("storage", "expire.mode", "age")
234 o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
235 if o_l_d is not None:
236 o_l_d = parse_duration(o_l_d)
239 if mode == "cutoff-date":
240 cutoff_date = self.get_config("storage", "expire.cutoff_date")
241 cutoff_date = parse_date(cutoff_date)
244 if self.get_config("storage", "expire.immutable", True, boolean=True):
245 sharetypes.append("immutable")
246 if self.get_config("storage", "expire.mutable", True, boolean=True):
247 sharetypes.append("mutable")
248 expiration_sharetypes = tuple(sharetypes)
250 ss = StorageServer(storedir, self.nodeid,
251 reserved_space=reserved,
252 discard_storage=discard,
253 readonly_storage=readonly,
254 stats_provider=self.stats_provider,
255 expiration_enabled=expire,
256 expiration_mode=mode,
257 expiration_override_lease_duration=o_l_d,
258 expiration_cutoff_date=cutoff_date,
259 expiration_sharetypes=expiration_sharetypes)
262 d = self.when_tub_ready()
263 # we can't do registerReference until the Tub is ready
265 furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
266 furl = self.tub.registerReference(ss, furlFile=furl_file)
267 ri_name = RIStorageServer.__remote_name__
268 self.introducer_client.publish(furl, "storage", ri_name)
269 d.addCallback(_publish)
270 d.addErrback(log.err, facility="tahoe.init",
271 level=log.BAD, umid="aLGBKw")
273 def init_client(self):
274 helper_furl = self.get_config("client", "helper.furl", None)
275 DEP = self.DEFAULT_ENCODING_PARAMETERS
276 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
277 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
278 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
280 self.init_client_storage_broker()
281 self.history = History(self.stats_provider)
282 self.add_service(Uploader(helper_furl, self.stats_provider))
283 download_cachedir = os.path.join(self.basedir,
284 "private", "cache", "download")
285 self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
286 self.download_cache_dirman.setServiceParent(self)
287 self.downloader = Downloader(self.storage_broker, self.stats_provider)
288 self.init_stub_client()
289 self.init_nodemaker()
291 def init_client_storage_broker(self):
292 # create a StorageFarmBroker object, for use by Uploader/Downloader
293 # (and everybody else who wants to use storage servers)
294 sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
295 self.storage_broker = sb
297 # load static server specifications from tahoe.cfg, if any.
298 # Not quite ready yet.
299 #if self.config.has_section("client-server-selection"):
300 # server_params = {} # maps serverid to dict of parameters
301 # for (name, value) in self.config.items("client-server-selection"):
302 # pieces = name.split(".")
303 # if pieces[0] == "server":
304 # serverid = pieces[1]
305 # if serverid not in server_params:
306 # server_params[serverid] = {}
307 # server_params[serverid][pieces[2]] = value
308 # for serverid, params in server_params.items():
309 # server_type = params.pop("type")
310 # if server_type == "tahoe-foolscap":
311 # s = storage_client.NativeStorageClient(*params)
313 # msg = ("unrecognized server type '%s' in "
314 # "tahoe.cfg [client-server-selection]server.%s.type"
315 # % (server_type, serverid))
316 # raise storage_client.UnknownServerTypeError(msg)
317 # sb.add_server(s.serverid, s)
319 # check to see if we're supposed to use the introducer too
320 if self.get_config("client-server-selection", "use_introducer",
321 default=True, boolean=True):
322 sb.use_introducer(self.introducer_client)
324 def get_storage_broker(self):
325 return self.storage_broker
327 def init_stub_client(self):
329 # we publish an empty object so that the introducer can count how
330 # many clients are connected and see what versions they're
333 furl = self.tub.registerReference(sc)
334 ri_name = RIStubClient.__remote_name__
335 self.introducer_client.publish(furl, "stub_client", ri_name)
336 d = self.when_tub_ready()
337 d.addCallback(_publish)
338 d.addErrback(log.err, facility="tahoe.init",
339 level=log.BAD, umid="OEHq3g")
341 def init_nodemaker(self):
342 self.nodemaker = NodeMaker(self.storage_broker,
345 self.getServiceNamed("uploader"),
347 self.download_cache_dirman,
348 self.get_encoding_parameters(),
351 def get_history(self):
354 def init_control(self):
355 d = self.when_tub_ready()
358 c.setServiceParent(self)
359 control_url = self.tub.registerReference(c)
360 self.write_private_config("control.furl", control_url + "\n")
361 d.addCallback(_publish)
362 d.addErrback(log.err, facility="tahoe.init",
363 level=log.BAD, umid="d3tNXA")
365 def init_helper(self):
366 d = self.when_tub_ready()
368 self.helper = Helper(os.path.join(self.basedir, "helper"),
369 self.storage_broker, self._secret_holder,
370 self.stats_provider, self.history)
371 # TODO: this is confusing. BASEDIR/private/helper.furl is created
372 # by the helper. BASEDIR/helper.furl is consumed by the client
373 # who wants to use the helper. I like having the filename be the
374 # same, since that makes 'cp' work smoothly, but the difference
375 # between config inputs and generated outputs is hard to see.
376 helper_furlfile = os.path.join(self.basedir,
377 "private", "helper.furl").encode(get_filesystem_encoding())
378 self.tub.registerReference(self.helper, furlFile=helper_furlfile)
379 d.addCallback(_publish)
380 d.addErrback(log.err, facility="tahoe.init",
381 level=log.BAD, umid="K0mW5w")
383 def init_key_gen(self, key_gen_furl):
384 d = self.when_tub_ready()
385 def _subscribe(self):
386 self.tub.connectTo(key_gen_furl, self._got_key_generator)
387 d.addCallback(_subscribe)
388 d.addErrback(log.err, facility="tahoe.init",
389 level=log.BAD, umid="z9DMzw")
391 def _got_key_generator(self, key_generator):
392 self._key_generator.set_remote_generator(key_generator)
393 key_generator.notifyOnDisconnect(self._lost_key_generator)
395 def _lost_key_generator(self):
396 self._key_generator.set_remote_generator(None)
398 def set_default_mutable_keysize(self, keysize):
399 self._key_generator.set_default_keysize(keysize)
401 def init_web(self, webport):
402 self.log("init_web(webport=%s)", args=(webport,))
404 from allmydata.webish import WebishServer
405 nodeurl_path = os.path.join(self.basedir, "node.url")
406 staticdir = self.get_config("node", "web.static", "public_html")
407 staticdir = os.path.expanduser(staticdir)
408 ws = WebishServer(self, webport, nodeurl_path, staticdir)
411 def init_ftp_server(self):
412 if self.get_config("ftpd", "enabled", False, boolean=True):
413 accountfile = self.get_config("ftpd", "accounts.file", None)
414 accounturl = self.get_config("ftpd", "accounts.url", None)
415 ftp_portstr = self.get_config("ftpd", "port", "8021")
417 from allmydata.frontends import ftpd
418 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
419 s.setServiceParent(self)
421 def init_sftp_server(self):
422 if self.get_config("sftpd", "enabled", False, boolean=True):
423 accountfile = self.get_config("sftpd", "accounts.file", None)
424 accounturl = self.get_config("sftpd", "accounts.url", None)
425 sftp_portstr = self.get_config("sftpd", "port", "8022")
426 pubkey_file = self.get_config("sftpd", "host_pubkey_file")
427 privkey_file = self.get_config("sftpd", "host_privkey_file")
429 from allmydata.frontends import sftpd
430 s = sftpd.SFTPServer(self, accountfile, accounturl,
431 sftp_portstr, pubkey_file, privkey_file)
432 s.setServiceParent(self)
434 def _check_hotline(self, hotline_file):
435 if os.path.exists(hotline_file):
436 mtime = os.stat(hotline_file)[stat.ST_MTIME]
437 if mtime > time.time() - 120.0:
440 self.log("hotline file too old, shutting down")
442 self.log("hotline file missing, shutting down")
445 def get_encoding_parameters(self):
446 return self.DEFAULT_ENCODING_PARAMETERS
448 def connected_to_introducer(self):
449 if self.introducer_client:
450 return self.introducer_client.connected_to_introducer()
453 def get_renewal_secret(self): # this will go away
454 return self._secret_holder.get_renewal_secret()
456 def get_cancel_secret(self):
457 return self._secret_holder.get_cancel_secret()
459 def debug_wait_for_client_connections(self, num_clients):
460 """Return a Deferred that fires (with None) when we have connections
461 to the given number of peers. Useful for tests that set up a
462 temporary test network and need to know when it is safe to proceed
463 with an upload or download."""
465 return len(self.storage_broker.get_all_servers()) >= num_clients
466 d = self.poll(_check, 0.5)
467 d.addCallback(lambda res: None)
471 # these four methods are the primitives for creating filenodes and
472 # dirnodes. The first takes a URI and produces a filenode or (new-style)
473 # dirnode. The other three create brand-new filenodes/dirnodes.
475 def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
476 # This returns synchronously.
477 # Note that it does *not* validate the write_uri and read_uri; instead we
478 # may get an opaque node if there were any problems.
479 return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
481 def create_dirnode(self, initial_children={}):
482 d = self.nodemaker.create_new_mutable_directory(initial_children)
485 def create_immutable_dirnode(self, children, convergence=None):
486 return self.nodemaker.create_immutable_directory(children, convergence)
488 def create_mutable_file(self, contents=None, keysize=None):
489 return self.nodemaker.create_mutable_file(contents, keysize)
491 def upload(self, uploadable):
492 uploader = self.getServiceNamed("uploader")
493 return uploader.upload(uploadable, history=self.get_history())