2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
5 from zope.interface import implements
6 from twisted.internet import reactor, defer
7 from twisted.application.internet import TimerService
8 from foolscap.api import Referenceable
9 from pycryptopp.publickey import rsa
12 from allmydata.storage.server import StorageServer
13 from allmydata import storage_client
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.download import Downloader
16 from allmydata.immutable.offloaded import Helper
17 from allmydata.control import ControlServer
18 from allmydata.introducer.client import IntroducerClient
19 from allmydata.util import hashutil, base32, pollmixin, cachedir, log
20 from allmydata.util.abbreviate import parse_abbreviated_size
21 from allmydata.util.time_format import parse_duration, parse_date
22 from allmydata.stats import StatsProvider
23 from allmydata.history import History
24 from allmydata.interfaces import IStatsProducer, RIStubClient
25 from allmydata.nodemaker import NodeMaker
34 class StubClient(Referenceable):
35 implements(RIStubClient)
38 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
41 def __init__(self, lease_secret):
42 self._lease_secret = lease_secret
44 def get_renewal_secret(self):
45 return hashutil.my_renewal_secret_hash(self._lease_secret)
47 def get_cancel_secret(self):
48 return hashutil.my_cancel_secret_hash(self._lease_secret)
53 self.default_keysize = 2048
55 def set_remote_generator(self, keygen):
57 def set_default_keysize(self, keysize):
58 """Call this to override the size of the RSA keys created for new
59 mutable files. The default of None means to let mutable.filenode
60 choose its own size, which means 2048 bits."""
61 self.default_keysize = keysize
63 def generate(self, keysize=None):
64 keysize = keysize or self.default_keysize
66 d = self._remote.callRemote('get_rsa_key_pair', keysize)
67 def make_key_objs((verifying_key, signing_key)):
68 v = rsa.create_verifying_key_from_string(verifying_key)
69 s = rsa.create_signing_key_from_string(signing_key)
71 d.addCallback(make_key_objs)
74 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
76 signer = rsa.generate(keysize)
77 verifier = signer.get_verifying_key()
78 return defer.succeed( (verifier, signer) )
81 class Client(node.Node, pollmixin.PollMixin):
82 implements(IStatsProducer)
84 PORTNUMFILE = "client.port"
87 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
89 # This means that if a storage server treats me as though I were a
90 # 1.0.0 storage client, it will work as they expect.
91 OLDEST_SUPPORTED_VERSION = "1.0.0"
93 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
94 # is the number of shares required to reconstruct a file. 'desired' means
95 # that we will abort an upload unless we can allocate space for at least
96 # this many. 'total' is the total number of shares created by encoding.
97 # If everybody has room then this is is how many we will upload.
98 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
101 "max_segment_size": 128*KiB,
104 def __init__(self, basedir="."):
105 node.Node.__init__(self, basedir)
106 self.started_timestamp = time.time()
107 self.logSource="Client"
108 self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
109 self.init_introducer_client()
110 self.init_stats_provider()
111 self.init_lease_secret()
115 if self.get_config("helper", "enabled", False, boolean=True):
117 self._key_generator = KeyGenerator()
118 key_gen_furl = self.get_config("client", "key_generator.furl", None)
120 self.init_key_gen(key_gen_furl)
122 # ControlServer and Helper are attached after Tub startup
123 self.init_ftp_server()
124 self.init_sftp_server()
126 hotline_file = os.path.join(self.basedir,
127 self.SUICIDE_PREVENTION_HOTLINE_FILE)
128 if os.path.exists(hotline_file):
129 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
130 self.log("hotline file noticed (%ds old), starting timer" % age)
131 hotline = TimerService(1.0, self._check_hotline, hotline_file)
132 hotline.setServiceParent(self)
134 # this needs to happen last, so it can use getServiceNamed() to
135 # acquire references to StorageServer and other web-statusable things
136 webport = self.get_config("node", "web.port", None)
138 self.init_web(webport) # strports string
140 def read_old_config_files(self):
141 node.Node.read_old_config_files(self)
142 copy = self._copy_config_from_file
143 copy("introducer.furl", "client", "introducer.furl")
144 copy("helper.furl", "client", "helper.furl")
145 copy("key_generator.furl", "client", "key_generator.furl")
146 copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
147 if os.path.exists(os.path.join(self.basedir, "no_storage")):
148 self.set_config("storage", "enabled", "false")
149 if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
150 self.set_config("storage", "readonly", "true")
151 if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
152 self.set_config("storage", "debug_discard", "true")
153 if os.path.exists(os.path.join(self.basedir, "run_helper")):
154 self.set_config("helper", "enabled", "true")
156 def init_introducer_client(self):
157 self.introducer_furl = self.get_config("client", "introducer.furl")
158 ic = IntroducerClient(self.tub, self.introducer_furl,
160 str(allmydata.__full_version__),
161 str(self.OLDEST_SUPPORTED_VERSION))
162 self.introducer_client = ic
163 # hold off on starting the IntroducerClient until our tub has been
164 # started, so we'll have a useful address on our RemoteReference, so
165 # that the introducer's status page will show us.
166 d = self.when_tub_ready()
167 def _start_introducer_client(res):
168 ic.setServiceParent(self)
169 d.addCallback(_start_introducer_client)
170 d.addErrback(log.err, facility="tahoe.init",
171 level=log.BAD, umid="URyI5w")
173 def init_stats_provider(self):
174 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
175 self.stats_provider = StatsProvider(self, gatherer_furl)
176 self.add_service(self.stats_provider)
177 self.stats_provider.register_producer(self)
180 return { 'node.uptime': time.time() - self.started_timestamp }
182 def init_lease_secret(self):
183 secret_s = self.get_or_create_private_config("secret", _make_secret)
184 lease_secret = base32.a2b(secret_s)
185 self._secret_holder = SecretHolder(lease_secret)
187 def init_storage(self):
188 # should we run a storage server (and publish it for others to use)?
189 if not self.get_config("storage", "enabled", True, boolean=True):
191 readonly = self.get_config("storage", "readonly", False, boolean=True)
193 storedir = os.path.join(self.basedir, self.STOREDIR)
195 data = self.get_config("storage", "reserved_space", None)
198 reserved = parse_abbreviated_size(data)
200 log.msg("[storage]reserved_space= contains unparseable value %s"
204 discard = self.get_config("storage", "debug_discard", False,
207 expire = self.get_config("storage", "expire.enabled", False, boolean=True)
209 mode = self.get_config("storage", "expire.mode") # require a mode
211 mode = self.get_config("storage", "expire.mode", "age")
213 o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
214 if o_l_d is not None:
215 o_l_d = parse_duration(o_l_d)
218 if mode == "cutoff-date":
219 cutoff_date = self.get_config("storage", "expire.cutoff_date")
220 cutoff_date = parse_date(cutoff_date)
223 if self.get_config("storage", "expire.immutable", True, boolean=True):
224 sharetypes.append("immutable")
225 if self.get_config("storage", "expire.mutable", True, boolean=True):
226 sharetypes.append("mutable")
227 expiration_sharetypes = tuple(sharetypes)
229 ss = StorageServer(storedir, self.nodeid,
230 reserved_space=reserved,
231 discard_storage=discard,
232 readonly_storage=readonly,
233 stats_provider=self.stats_provider,
234 expiration_enabled=expire,
235 expiration_mode=mode,
236 expiration_override_lease_duration=o_l_d,
237 expiration_cutoff_date=cutoff_date,
238 expiration_sharetypes=expiration_sharetypes)
241 d = self.when_tub_ready()
242 # we can't do registerReference until the Tub is ready
244 furl_file = os.path.join(self.basedir, "private", "storage.furl")
245 furl = self.tub.registerReference(ss, furlFile=furl_file)
246 ri_name = RIStorageServer.__remote_name__
247 self.introducer_client.publish(furl, "storage", ri_name)
248 d.addCallback(_publish)
249 d.addErrback(log.err, facility="tahoe.init",
250 level=log.BAD, umid="aLGBKw")
252 def init_client(self):
253 helper_furl = self.get_config("client", "helper.furl", None)
254 DEP = self.DEFAULT_ENCODING_PARAMETERS
255 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
256 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
257 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
258 convergence_s = self.get_or_create_private_config('convergence', _make_secret)
259 self.convergence = base32.a2b(convergence_s)
261 self.init_client_storage_broker()
262 self.history = History(self.stats_provider)
263 self.add_service(Uploader(helper_furl, self.stats_provider))
264 download_cachedir = os.path.join(self.basedir,
265 "private", "cache", "download")
266 self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
267 self.download_cache_dirman.setServiceParent(self)
268 self.downloader = Downloader(self.storage_broker, self.stats_provider)
269 self.init_stub_client()
270 self.init_nodemaker()
272 def init_client_storage_broker(self):
273 # create a StorageFarmBroker object, for use by Uploader/Downloader
274 # (and everybody else who wants to use storage servers)
275 sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
276 self.storage_broker = sb
278 # load static server specifications from tahoe.cfg, if any.
279 # Not quite ready yet.
280 #if self.config.has_section("client-server-selection"):
281 # server_params = {} # maps serverid to dict of parameters
282 # for (name, value) in self.config.items("client-server-selection"):
283 # pieces = name.split(".")
284 # if pieces[0] == "server":
285 # serverid = pieces[1]
286 # if serverid not in server_params:
287 # server_params[serverid] = {}
288 # server_params[serverid][pieces[2]] = value
289 # for serverid, params in server_params.items():
290 # server_type = params.pop("type")
291 # if server_type == "tahoe-foolscap":
292 # s = storage_client.NativeStorageClient(*params)
294 # msg = ("unrecognized server type '%s' in "
295 # "tahoe.cfg [client-server-selection]server.%s.type"
296 # % (server_type, serverid))
297 # raise storage_client.UnknownServerTypeError(msg)
298 # sb.add_server(s.serverid, s)
300 # check to see if we're supposed to use the introducer too
301 if self.get_config("client-server-selection", "use_introducer",
302 default=True, boolean=True):
303 sb.use_introducer(self.introducer_client)
305 def get_storage_broker(self):
306 return self.storage_broker
308 def init_stub_client(self):
310 # we publish an empty object so that the introducer can count how
311 # many clients are connected and see what versions they're
314 furl = self.tub.registerReference(sc)
315 ri_name = RIStubClient.__remote_name__
316 self.introducer_client.publish(furl, "stub_client", ri_name)
317 d = self.when_tub_ready()
318 d.addCallback(_publish)
319 d.addErrback(log.err, facility="tahoe.init",
320 level=log.BAD, umid="OEHq3g")
322 def init_nodemaker(self):
323 self.nodemaker = NodeMaker(self.storage_broker,
326 self.getServiceNamed("uploader"),
328 self.download_cache_dirman,
329 self.get_encoding_parameters(),
332 def get_history(self):
335 def init_control(self):
336 d = self.when_tub_ready()
339 c.setServiceParent(self)
340 control_url = self.tub.registerReference(c)
341 self.write_private_config("control.furl", control_url + "\n")
342 d.addCallback(_publish)
343 d.addErrback(log.err, facility="tahoe.init",
344 level=log.BAD, umid="d3tNXA")
346 def init_helper(self):
347 d = self.when_tub_ready()
349 self.helper = Helper(os.path.join(self.basedir, "helper"),
350 self.storage_broker, self._secret_holder,
351 self.stats_provider, self.history)
352 # TODO: this is confusing. BASEDIR/private/helper.furl is created
353 # by the helper. BASEDIR/helper.furl is consumed by the client
354 # who wants to use the helper. I like having the filename be the
355 # same, since that makes 'cp' work smoothly, but the difference
356 # between config inputs and generated outputs is hard to see.
357 helper_furlfile = os.path.join(self.basedir,
358 "private", "helper.furl")
359 self.tub.registerReference(self.helper, furlFile=helper_furlfile)
360 d.addCallback(_publish)
361 d.addErrback(log.err, facility="tahoe.init",
362 level=log.BAD, umid="K0mW5w")
364 def init_key_gen(self, key_gen_furl):
365 d = self.when_tub_ready()
366 def _subscribe(self):
367 self.tub.connectTo(key_gen_furl, self._got_key_generator)
368 d.addCallback(_subscribe)
369 d.addErrback(log.err, facility="tahoe.init",
370 level=log.BAD, umid="z9DMzw")
372 def _got_key_generator(self, key_generator):
373 self._key_generator.set_remote_generator(key_generator)
374 key_generator.notifyOnDisconnect(self._lost_key_generator)
376 def _lost_key_generator(self):
377 self._key_generator.set_remote_generator(None)
379 def set_default_mutable_keysize(self, keysize):
380 self._key_generator.set_default_keysize(keysize)
382 def init_web(self, webport):
383 self.log("init_web(webport=%s)", args=(webport,))
385 from allmydata.webish import WebishServer
386 nodeurl_path = os.path.join(self.basedir, "node.url")
387 staticdir = self.get_config("node", "web.static", "public_html")
388 staticdir = os.path.expanduser(staticdir)
389 ws = WebishServer(self, webport, nodeurl_path, staticdir)
392 def init_ftp_server(self):
393 if self.get_config("ftpd", "enabled", False, boolean=True):
394 accountfile = self.get_config("ftpd", "accounts.file", None)
395 accounturl = self.get_config("ftpd", "accounts.url", None)
396 ftp_portstr = self.get_config("ftpd", "port", "8021")
398 from allmydata.frontends import ftpd
399 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
400 s.setServiceParent(self)
402 def init_sftp_server(self):
403 if self.get_config("sftpd", "enabled", False, boolean=True):
404 accountfile = self.get_config("sftpd", "accounts.file", None)
405 accounturl = self.get_config("sftpd", "accounts.url", None)
406 sftp_portstr = self.get_config("sftpd", "port", "8022")
407 pubkey_file = self.get_config("sftpd", "host_pubkey_file")
408 privkey_file = self.get_config("sftpd", "host_privkey_file")
410 from allmydata.frontends import sftpd
411 s = sftpd.SFTPServer(self, accountfile, accounturl,
412 sftp_portstr, pubkey_file, privkey_file)
413 s.setServiceParent(self)
415 def _check_hotline(self, hotline_file):
416 if os.path.exists(hotline_file):
417 mtime = os.stat(hotline_file)[stat.ST_MTIME]
418 if mtime > time.time() - 120.0:
421 self.log("hotline file too old, shutting down")
423 self.log("hotline file missing, shutting down")
426 def get_encoding_parameters(self):
427 return self.DEFAULT_ENCODING_PARAMETERS
429 def connected_to_introducer(self):
430 if self.introducer_client:
431 return self.introducer_client.connected_to_introducer()
434 def get_renewal_secret(self): # this will go away
435 return self._secret_holder.get_renewal_secret()
437 def get_cancel_secret(self):
438 return self._secret_holder.get_cancel_secret()
440 def debug_wait_for_client_connections(self, num_clients):
441 """Return a Deferred that fires (with None) when we have connections
442 to the given number of peers. Useful for tests that set up a
443 temporary test network and need to know when it is safe to proceed
444 with an upload or download."""
446 return len(self.storage_broker.get_all_servers()) >= num_clients
447 d = self.poll(_check, 0.5)
448 d.addCallback(lambda res: None)
452 # these four methods are the primitives for creating filenodes and
453 # dirnodes. The first takes a URI and produces a filenode or (new-style)
454 # dirnode. The other three create brand-new filenodes/dirnodes.
456 def create_node_from_uri(self, writecap, readcap=None):
457 # this returns synchronously.
458 return self.nodemaker.create_from_cap(writecap, readcap)
460 def create_dirnode(self, initial_children={}):
461 d = self.nodemaker.create_new_mutable_directory(initial_children)
464 def create_mutable_file(self, contents=None, keysize=None):
465 return self.nodemaker.create_mutable_file(contents, keysize)
467 def upload(self, uploadable):
468 uploader = self.getServiceNamed("uploader")
469 return uploader.upload(uploadable, history=self.get_history())