2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
5 from zope.interface import implements
6 from twisted.internet import reactor, defer
7 from twisted.application.internet import TimerService
8 from foolscap.api import Referenceable
9 from pycryptopp.publickey import rsa
12 from allmydata.storage.server import StorageServer
13 from allmydata import storage_client
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.download import Downloader
16 from allmydata.immutable.offloaded import Helper
17 from allmydata.control import ControlServer
18 from allmydata.introducer.client import IntroducerClient
19 from allmydata.util import hashutil, base32, pollmixin, cachedir, log
20 from allmydata.util.abbreviate import parse_abbreviated_size
21 from allmydata.util.time_format import parse_duration, parse_date
22 from allmydata.stats import StatsProvider
23 from allmydata.history import History
24 from allmydata.interfaces import IStatsProducer, RIStubClient
25 from allmydata.nodemaker import NodeMaker
34 class StubClient(Referenceable):
35 implements(RIStubClient)
38 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
41 def __init__(self, lease_secret, convergence_secret):
42 self._lease_secret = lease_secret
43 self._convergence_secret = convergence_secret
45 def get_renewal_secret(self):
46 return hashutil.my_renewal_secret_hash(self._lease_secret)
48 def get_cancel_secret(self):
49 return hashutil.my_cancel_secret_hash(self._lease_secret)
51 def get_convergence_secret(self):
52 return self._convergence_secret
55 """I create RSA keys for mutable files. Each call to generate() returns a
56 single keypair. The keysize is specified first by the keysize= argument
57 to generate(), then with a default set by set_default_keysize(), then
58 with a built-in default of 2048 bits."""
61 self.default_keysize = 2048
63 def set_remote_generator(self, keygen):
65 def set_default_keysize(self, keysize):
66 """Call this to override the size of the RSA keys created for new
67 mutable files which don't otherwise specify a size. This will affect
68 all subsequent calls to generate() without a keysize= argument. The
69 default size is 2048 bits. Test cases should call this method once
70 during setup, to cause me to create smaller (522 bit) keys, so the
71 unit tests run faster."""
72 self.default_keysize = keysize
74 def generate(self, keysize=None):
75 """I return a Deferred that fires with a (verifyingkey, signingkey)
76 pair. I accept a keysize in bits (522 bit keys are fast for testing,
77 2048 bit keys are standard). If you do not provide a keysize, I will
78 use my default, which is set by a call to set_default_keysize(). If
79 set_default_keysize() has never been called, I will create 2048 bit
81 keysize = keysize or self.default_keysize
83 d = self._remote.callRemote('get_rsa_key_pair', keysize)
84 def make_key_objs((verifying_key, signing_key)):
85 v = rsa.create_verifying_key_from_string(verifying_key)
86 s = rsa.create_signing_key_from_string(signing_key)
88 d.addCallback(make_key_objs)
91 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
93 signer = rsa.generate(keysize)
94 verifier = signer.get_verifying_key()
95 return defer.succeed( (verifier, signer) )
98 class Client(node.Node, pollmixin.PollMixin):
99 implements(IStatsProducer)
101 PORTNUMFILE = "client.port"
104 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
106 # This means that if a storage server treats me as though I were a
107 # 1.0.0 storage client, it will work as they expect.
108 OLDEST_SUPPORTED_VERSION = "1.0.0"
110 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
111 # is the number of shares required to reconstruct a file. 'desired' means
112 # that we will abort an upload unless we can allocate space for at least
113 # this many. 'total' is the total number of shares created by encoding.
114 # If everybody has room then this is is how many we will upload.
115 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
118 "max_segment_size": 128*KiB,
121 def __init__(self, basedir="."):
122 node.Node.__init__(self, basedir)
123 self.started_timestamp = time.time()
124 self.logSource="Client"
125 self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
126 self.init_introducer_client()
127 self.init_stats_provider()
132 if self.get_config("helper", "enabled", False, boolean=True):
134 self._key_generator = KeyGenerator()
135 key_gen_furl = self.get_config("client", "key_generator.furl", None)
137 self.init_key_gen(key_gen_furl)
139 # ControlServer and Helper are attached after Tub startup
140 self.init_ftp_server()
141 self.init_sftp_server()
143 hotline_file = os.path.join(self.basedir,
144 self.SUICIDE_PREVENTION_HOTLINE_FILE)
145 if os.path.exists(hotline_file):
146 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
147 self.log("hotline file noticed (%ds old), starting timer" % age)
148 hotline = TimerService(1.0, self._check_hotline, hotline_file)
149 hotline.setServiceParent(self)
151 # this needs to happen last, so it can use getServiceNamed() to
152 # acquire references to StorageServer and other web-statusable things
153 webport = self.get_config("node", "web.port", None)
155 self.init_web(webport) # strports string
157 def read_old_config_files(self):
158 node.Node.read_old_config_files(self)
159 copy = self._copy_config_from_file
160 copy("introducer.furl", "client", "introducer.furl")
161 copy("helper.furl", "client", "helper.furl")
162 copy("key_generator.furl", "client", "key_generator.furl")
163 copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
164 if os.path.exists(os.path.join(self.basedir, "no_storage")):
165 self.set_config("storage", "enabled", "false")
166 if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
167 self.set_config("storage", "readonly", "true")
168 if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
169 self.set_config("storage", "debug_discard", "true")
170 if os.path.exists(os.path.join(self.basedir, "run_helper")):
171 self.set_config("helper", "enabled", "true")
173 def init_introducer_client(self):
174 self.introducer_furl = self.get_config("client", "introducer.furl")
175 ic = IntroducerClient(self.tub, self.introducer_furl,
177 str(allmydata.__full_version__),
178 str(self.OLDEST_SUPPORTED_VERSION))
179 self.introducer_client = ic
180 # hold off on starting the IntroducerClient until our tub has been
181 # started, so we'll have a useful address on our RemoteReference, so
182 # that the introducer's status page will show us.
183 d = self.when_tub_ready()
184 def _start_introducer_client(res):
185 ic.setServiceParent(self)
186 d.addCallback(_start_introducer_client)
187 d.addErrback(log.err, facility="tahoe.init",
188 level=log.BAD, umid="URyI5w")
190 def init_stats_provider(self):
191 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
192 self.stats_provider = StatsProvider(self, gatherer_furl)
193 self.add_service(self.stats_provider)
194 self.stats_provider.register_producer(self)
197 return { 'node.uptime': time.time() - self.started_timestamp }
199 def init_secrets(self):
200 lease_s = self.get_or_create_private_config("secret", _make_secret)
201 lease_secret = base32.a2b(lease_s)
202 convergence_s = self.get_or_create_private_config('convergence',
204 self.convergence = base32.a2b(convergence_s)
205 self._secret_holder = SecretHolder(lease_secret, self.convergence)
207 def init_storage(self):
208 # should we run a storage server (and publish it for others to use)?
209 if not self.get_config("storage", "enabled", True, boolean=True):
211 readonly = self.get_config("storage", "readonly", False, boolean=True)
213 storedir = os.path.join(self.basedir, self.STOREDIR)
215 data = self.get_config("storage", "reserved_space", None)
218 reserved = parse_abbreviated_size(data)
220 log.msg("[storage]reserved_space= contains unparseable value %s"
224 discard = self.get_config("storage", "debug_discard", False,
227 expire = self.get_config("storage", "expire.enabled", False, boolean=True)
229 mode = self.get_config("storage", "expire.mode") # require a mode
231 mode = self.get_config("storage", "expire.mode", "age")
233 o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
234 if o_l_d is not None:
235 o_l_d = parse_duration(o_l_d)
238 if mode == "cutoff-date":
239 cutoff_date = self.get_config("storage", "expire.cutoff_date")
240 cutoff_date = parse_date(cutoff_date)
243 if self.get_config("storage", "expire.immutable", True, boolean=True):
244 sharetypes.append("immutable")
245 if self.get_config("storage", "expire.mutable", True, boolean=True):
246 sharetypes.append("mutable")
247 expiration_sharetypes = tuple(sharetypes)
249 ss = StorageServer(storedir, self.nodeid,
250 reserved_space=reserved,
251 discard_storage=discard,
252 readonly_storage=readonly,
253 stats_provider=self.stats_provider,
254 expiration_enabled=expire,
255 expiration_mode=mode,
256 expiration_override_lease_duration=o_l_d,
257 expiration_cutoff_date=cutoff_date,
258 expiration_sharetypes=expiration_sharetypes)
261 d = self.when_tub_ready()
262 # we can't do registerReference until the Tub is ready
264 furl_file = os.path.join(self.basedir, "private", "storage.furl")
265 furl = self.tub.registerReference(ss, furlFile=furl_file)
266 ri_name = RIStorageServer.__remote_name__
267 self.introducer_client.publish(furl, "storage", ri_name)
268 d.addCallback(_publish)
269 d.addErrback(log.err, facility="tahoe.init",
270 level=log.BAD, umid="aLGBKw")
272 def init_client(self):
273 helper_furl = self.get_config("client", "helper.furl", None)
274 DEP = self.DEFAULT_ENCODING_PARAMETERS
275 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
276 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
277 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
279 self.init_client_storage_broker()
280 self.history = History(self.stats_provider)
281 self.add_service(Uploader(helper_furl, self.stats_provider))
282 download_cachedir = os.path.join(self.basedir,
283 "private", "cache", "download")
284 self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
285 self.download_cache_dirman.setServiceParent(self)
286 self.downloader = Downloader(self.storage_broker, self.stats_provider)
287 self.init_stub_client()
288 self.init_nodemaker()
290 def init_client_storage_broker(self):
291 # create a StorageFarmBroker object, for use by Uploader/Downloader
292 # (and everybody else who wants to use storage servers)
293 sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
294 self.storage_broker = sb
296 # load static server specifications from tahoe.cfg, if any.
297 # Not quite ready yet.
298 #if self.config.has_section("client-server-selection"):
299 # server_params = {} # maps serverid to dict of parameters
300 # for (name, value) in self.config.items("client-server-selection"):
301 # pieces = name.split(".")
302 # if pieces[0] == "server":
303 # serverid = pieces[1]
304 # if serverid not in server_params:
305 # server_params[serverid] = {}
306 # server_params[serverid][pieces[2]] = value
307 # for serverid, params in server_params.items():
308 # server_type = params.pop("type")
309 # if server_type == "tahoe-foolscap":
310 # s = storage_client.NativeStorageClient(*params)
312 # msg = ("unrecognized server type '%s' in "
313 # "tahoe.cfg [client-server-selection]server.%s.type"
314 # % (server_type, serverid))
315 # raise storage_client.UnknownServerTypeError(msg)
316 # sb.add_server(s.serverid, s)
318 # check to see if we're supposed to use the introducer too
319 if self.get_config("client-server-selection", "use_introducer",
320 default=True, boolean=True):
321 sb.use_introducer(self.introducer_client)
323 def get_storage_broker(self):
324 return self.storage_broker
326 def init_stub_client(self):
328 # we publish an empty object so that the introducer can count how
329 # many clients are connected and see what versions they're
332 furl = self.tub.registerReference(sc)
333 ri_name = RIStubClient.__remote_name__
334 self.introducer_client.publish(furl, "stub_client", ri_name)
335 d = self.when_tub_ready()
336 d.addCallback(_publish)
337 d.addErrback(log.err, facility="tahoe.init",
338 level=log.BAD, umid="OEHq3g")
340 def init_nodemaker(self):
341 self.nodemaker = NodeMaker(self.storage_broker,
344 self.getServiceNamed("uploader"),
346 self.download_cache_dirman,
347 self.get_encoding_parameters(),
350 def get_history(self):
353 def init_control(self):
354 d = self.when_tub_ready()
357 c.setServiceParent(self)
358 control_url = self.tub.registerReference(c)
359 self.write_private_config("control.furl", control_url + "\n")
360 d.addCallback(_publish)
361 d.addErrback(log.err, facility="tahoe.init",
362 level=log.BAD, umid="d3tNXA")
364 def init_helper(self):
365 d = self.when_tub_ready()
367 self.helper = Helper(os.path.join(self.basedir, "helper"),
368 self.storage_broker, self._secret_holder,
369 self.stats_provider, self.history)
370 # TODO: this is confusing. BASEDIR/private/helper.furl is created
371 # by the helper. BASEDIR/helper.furl is consumed by the client
372 # who wants to use the helper. I like having the filename be the
373 # same, since that makes 'cp' work smoothly, but the difference
374 # between config inputs and generated outputs is hard to see.
375 helper_furlfile = os.path.join(self.basedir,
376 "private", "helper.furl")
377 self.tub.registerReference(self.helper, furlFile=helper_furlfile)
378 d.addCallback(_publish)
379 d.addErrback(log.err, facility="tahoe.init",
380 level=log.BAD, umid="K0mW5w")
382 def init_key_gen(self, key_gen_furl):
383 d = self.when_tub_ready()
384 def _subscribe(self):
385 self.tub.connectTo(key_gen_furl, self._got_key_generator)
386 d.addCallback(_subscribe)
387 d.addErrback(log.err, facility="tahoe.init",
388 level=log.BAD, umid="z9DMzw")
390 def _got_key_generator(self, key_generator):
391 self._key_generator.set_remote_generator(key_generator)
392 key_generator.notifyOnDisconnect(self._lost_key_generator)
394 def _lost_key_generator(self):
395 self._key_generator.set_remote_generator(None)
397 def set_default_mutable_keysize(self, keysize):
398 self._key_generator.set_default_keysize(keysize)
400 def init_web(self, webport):
401 self.log("init_web(webport=%s)", args=(webport,))
403 from allmydata.webish import WebishServer
404 nodeurl_path = os.path.join(self.basedir, "node.url")
405 staticdir = self.get_config("node", "web.static", "public_html")
406 staticdir = os.path.expanduser(staticdir)
407 ws = WebishServer(self, webport, nodeurl_path, staticdir)
410 def init_ftp_server(self):
411 if self.get_config("ftpd", "enabled", False, boolean=True):
412 accountfile = self.get_config("ftpd", "accounts.file", None)
413 accounturl = self.get_config("ftpd", "accounts.url", None)
414 ftp_portstr = self.get_config("ftpd", "port", "8021")
416 from allmydata.frontends import ftpd
417 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
418 s.setServiceParent(self)
420 def init_sftp_server(self):
421 if self.get_config("sftpd", "enabled", False, boolean=True):
422 accountfile = self.get_config("sftpd", "accounts.file", None)
423 accounturl = self.get_config("sftpd", "accounts.url", None)
424 sftp_portstr = self.get_config("sftpd", "port", "8022")
425 pubkey_file = self.get_config("sftpd", "host_pubkey_file")
426 privkey_file = self.get_config("sftpd", "host_privkey_file")
428 from allmydata.frontends import sftpd
429 s = sftpd.SFTPServer(self, accountfile, accounturl,
430 sftp_portstr, pubkey_file, privkey_file)
431 s.setServiceParent(self)
433 def _check_hotline(self, hotline_file):
434 if os.path.exists(hotline_file):
435 mtime = os.stat(hotline_file)[stat.ST_MTIME]
436 if mtime > time.time() - 120.0:
439 self.log("hotline file too old, shutting down")
441 self.log("hotline file missing, shutting down")
444 def get_encoding_parameters(self):
445 return self.DEFAULT_ENCODING_PARAMETERS
447 def connected_to_introducer(self):
448 if self.introducer_client:
449 return self.introducer_client.connected_to_introducer()
452 def get_renewal_secret(self): # this will go away
453 return self._secret_holder.get_renewal_secret()
455 def get_cancel_secret(self):
456 return self._secret_holder.get_cancel_secret()
458 def debug_wait_for_client_connections(self, num_clients):
459 """Return a Deferred that fires (with None) when we have connections
460 to the given number of peers. Useful for tests that set up a
461 temporary test network and need to know when it is safe to proceed
462 with an upload or download."""
464 return len(self.storage_broker.get_all_servers()) >= num_clients
465 d = self.poll(_check, 0.5)
466 d.addCallback(lambda res: None)
470 # these four methods are the primitives for creating filenodes and
471 # dirnodes. The first takes a URI and produces a filenode or (new-style)
472 # dirnode. The other three create brand-new filenodes/dirnodes.
474 def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
475 # This returns synchronously.
476 # Note that it does *not* validate the write_uri and read_uri; instead we
477 # may get an opaque node if there were any problems.
478 return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
480 def create_dirnode(self, initial_children={}):
481 d = self.nodemaker.create_new_mutable_directory(initial_children)
484 def create_immutable_dirnode(self, children, convergence=None):
485 return self.nodemaker.create_immutable_directory(children, convergence)
487 def create_mutable_file(self, contents=None, keysize=None):
488 return self.nodemaker.create_mutable_file(contents, keysize)
490 def upload(self, uploadable):
491 uploader = self.getServiceNamed("uploader")
492 return uploader.upload(uploadable, history=self.get_history())