2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
5 from zope.interface import implements
6 from twisted.internet import reactor, defer
7 from twisted.application.internet import TimerService
8 from foolscap.api import Referenceable
9 from pycryptopp.publickey import rsa
12 from allmydata.storage.server import StorageServer
13 from allmydata import storage_client
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.download import Downloader
16 from allmydata.immutable.offloaded import Helper
17 from allmydata.control import ControlServer
18 from allmydata.introducer.client import IntroducerClient
19 from allmydata.util import hashutil, base32, pollmixin, cachedir, log
20 from allmydata.util.abbreviate import parse_abbreviated_size
21 from allmydata.util.time_format import parse_duration, parse_date
22 from allmydata.stats import StatsProvider
23 from allmydata.history import History
24 from allmydata.interfaces import IStatsProducer, RIStubClient
25 from allmydata.nodemaker import NodeMaker
34 class StubClient(Referenceable):
35 implements(RIStubClient)
38 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
41 def __init__(self, lease_secret):
42 self._lease_secret = lease_secret
44 def get_renewal_secret(self):
45 return hashutil.my_renewal_secret_hash(self._lease_secret)
47 def get_cancel_secret(self):
48 return hashutil.my_cancel_secret_hash(self._lease_secret)
53 self.default_keysize = 2048
55 def set_remote_generator(self, keygen):
57 def set_default_keysize(self, keysize):
58 """Call this to override the size of the RSA keys created for new
59 mutable files. The default of None means to let mutable.filenode
60 choose its own size, which means 2048 bits."""
61 self.default_keysize = keysize
63 def generate(self, keysize=None):
64 keysize = keysize or self.default_keysize
66 d = self._remote.callRemote('get_rsa_key_pair', keysize)
67 def make_key_objs((verifying_key, signing_key)):
68 v = rsa.create_verifying_key_from_string(verifying_key)
69 s = rsa.create_signing_key_from_string(signing_key)
71 d.addCallback(make_key_objs)
74 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
76 signer = rsa.generate(keysize)
77 verifier = signer.get_verifying_key()
78 return defer.succeed( (verifier, signer) )
81 class Client(node.Node, pollmixin.PollMixin):
82 implements(IStatsProducer)
84 PORTNUMFILE = "client.port"
87 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
89 # This means that if a storage server treats me as though I were a
90 # 1.0.0 storage client, it will work as they expect.
91 OLDEST_SUPPORTED_VERSION = "1.0.0"
93 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
94 # is the number of shares required to reconstruct a file. 'desired' means
95 # that we will abort an upload unless we can allocate space for at least
96 # this many. 'total' is the total number of shares created by encoding.
97 # If everybody has room then this is is how many we will upload.
98 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
101 "max_segment_size": 128*KiB,
104 def __init__(self, basedir="."):
105 node.Node.__init__(self, basedir)
106 self.started_timestamp = time.time()
107 self.logSource="Client"
108 self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
109 self.init_introducer_client()
110 self.init_stats_provider()
111 self.init_lease_secret()
114 if self.get_config("helper", "enabled", False, boolean=True):
116 self._key_generator = KeyGenerator()
117 key_gen_furl = self.get_config("client", "key_generator.furl", None)
119 self.init_key_gen(key_gen_furl)
121 # ControlServer and Helper are attached after Tub startup
122 self.init_ftp_server()
123 self.init_sftp_server()
125 hotline_file = os.path.join(self.basedir,
126 self.SUICIDE_PREVENTION_HOTLINE_FILE)
127 if os.path.exists(hotline_file):
128 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
129 self.log("hotline file noticed (%ds old), starting timer" % age)
130 hotline = TimerService(1.0, self._check_hotline, hotline_file)
131 hotline.setServiceParent(self)
133 # this needs to happen last, so it can use getServiceNamed() to
134 # acquire references to StorageServer and other web-statusable things
135 webport = self.get_config("node", "web.port", None)
137 self.init_web(webport) # strports string
139 def read_old_config_files(self):
140 node.Node.read_old_config_files(self)
141 copy = self._copy_config_from_file
142 copy("introducer.furl", "client", "introducer.furl")
143 copy("helper.furl", "client", "helper.furl")
144 copy("key_generator.furl", "client", "key_generator.furl")
145 copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
146 if os.path.exists(os.path.join(self.basedir, "no_storage")):
147 self.set_config("storage", "enabled", "false")
148 if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
149 self.set_config("storage", "readonly", "true")
150 if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
151 self.set_config("storage", "debug_discard", "true")
152 if os.path.exists(os.path.join(self.basedir, "run_helper")):
153 self.set_config("helper", "enabled", "true")
155 def init_introducer_client(self):
156 self.introducer_furl = self.get_config("client", "introducer.furl")
157 ic = IntroducerClient(self.tub, self.introducer_furl,
159 str(allmydata.__full_version__),
160 str(self.OLDEST_SUPPORTED_VERSION))
161 self.introducer_client = ic
162 # hold off on starting the IntroducerClient until our tub has been
163 # started, so we'll have a useful address on our RemoteReference, so
164 # that the introducer's status page will show us.
165 d = self.when_tub_ready()
166 def _start_introducer_client(res):
167 ic.setServiceParent(self)
168 d.addCallback(_start_introducer_client)
169 d.addErrback(log.err, facility="tahoe.init",
170 level=log.BAD, umid="URyI5w")
172 def init_stats_provider(self):
173 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
174 self.stats_provider = StatsProvider(self, gatherer_furl)
175 self.add_service(self.stats_provider)
176 self.stats_provider.register_producer(self)
179 return { 'node.uptime': time.time() - self.started_timestamp }
181 def init_lease_secret(self):
182 secret_s = self.get_or_create_private_config("secret", _make_secret)
183 lease_secret = base32.a2b(secret_s)
184 self._secret_holder = SecretHolder(lease_secret)
186 def init_storage(self):
187 # should we run a storage server (and publish it for others to use)?
188 if not self.get_config("storage", "enabled", True, boolean=True):
190 readonly = self.get_config("storage", "readonly", False, boolean=True)
192 storedir = os.path.join(self.basedir, self.STOREDIR)
194 data = self.get_config("storage", "reserved_space", None)
197 reserved = parse_abbreviated_size(data)
199 log.msg("[storage]reserved_space= contains unparseable value %s"
203 discard = self.get_config("storage", "debug_discard", False,
206 expire = self.get_config("storage", "expire.enabled", False, boolean=True)
208 mode = self.get_config("storage", "expire.mode") # require a mode
210 mode = self.get_config("storage", "expire.mode", "age")
212 o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
213 if o_l_d is not None:
214 o_l_d = parse_duration(o_l_d)
217 if mode == "cutoff-date":
218 cutoff_date = self.get_config("storage", "expire.cutoff_date")
219 cutoff_date = parse_date(cutoff_date)
222 if self.get_config("storage", "expire.immutable", True, boolean=True):
223 sharetypes.append("immutable")
224 if self.get_config("storage", "expire.mutable", True, boolean=True):
225 sharetypes.append("mutable")
226 expiration_sharetypes = tuple(sharetypes)
228 ss = StorageServer(storedir, self.nodeid,
229 reserved_space=reserved,
230 discard_storage=discard,
231 readonly_storage=readonly,
232 stats_provider=self.stats_provider,
233 expiration_enabled=expire,
234 expiration_mode=mode,
235 expiration_override_lease_duration=o_l_d,
236 expiration_cutoff_date=cutoff_date,
237 expiration_sharetypes=expiration_sharetypes)
240 d = self.when_tub_ready()
241 # we can't do registerReference until the Tub is ready
243 furl_file = os.path.join(self.basedir, "private", "storage.furl")
244 furl = self.tub.registerReference(ss, furlFile=furl_file)
245 ri_name = RIStorageServer.__remote_name__
246 self.introducer_client.publish(furl, "storage", ri_name)
247 d.addCallback(_publish)
248 d.addErrback(log.err, facility="tahoe.init",
249 level=log.BAD, umid="aLGBKw")
251 def init_client(self):
252 helper_furl = self.get_config("client", "helper.furl", None)
253 DEP = self.DEFAULT_ENCODING_PARAMETERS
254 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
255 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
256 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
257 convergence_s = self.get_or_create_private_config('convergence', _make_secret)
258 self.convergence = base32.a2b(convergence_s)
260 self.init_client_storage_broker()
261 self.history = self.add_service(History(self.stats_provider))
262 self.add_service(Uploader(helper_furl, self.stats_provider))
263 download_cachedir = os.path.join(self.basedir,
264 "private", "cache", "download")
265 self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
266 self.download_cache_dirman.setServiceParent(self)
267 self.add_service(Downloader(self.stats_provider))
268 self.init_stub_client()
269 self.init_nodemaker()
271 def init_client_storage_broker(self):
272 # create a StorageFarmBroker object, for use by Uploader/Downloader
273 # (and everybody else who wants to use storage servers)
274 sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
275 self.storage_broker = sb
277 # load static server specifications from tahoe.cfg, if any.
278 # Not quite ready yet.
279 #if self.config.has_section("client-server-selection"):
280 # server_params = {} # maps serverid to dict of parameters
281 # for (name, value) in self.config.items("client-server-selection"):
282 # pieces = name.split(".")
283 # if pieces[0] == "server":
284 # serverid = pieces[1]
285 # if serverid not in server_params:
286 # server_params[serverid] = {}
287 # server_params[serverid][pieces[2]] = value
288 # for serverid, params in server_params.items():
289 # server_type = params.pop("type")
290 # if server_type == "tahoe-foolscap":
291 # s = storage_client.NativeStorageClient(*params)
293 # msg = ("unrecognized server type '%s' in "
294 # "tahoe.cfg [client-server-selection]server.%s.type"
295 # % (server_type, serverid))
296 # raise storage_client.UnknownServerTypeError(msg)
297 # sb.add_server(s.serverid, s)
299 # check to see if we're supposed to use the introducer too
300 if self.get_config("client-server-selection", "use_introducer",
301 default=True, boolean=True):
302 sb.use_introducer(self.introducer_client)
304 def get_storage_broker(self):
305 return self.storage_broker
307 def init_stub_client(self):
309 # we publish an empty object so that the introducer can count how
310 # many clients are connected and see what versions they're
313 furl = self.tub.registerReference(sc)
314 ri_name = RIStubClient.__remote_name__
315 self.introducer_client.publish(furl, "stub_client", ri_name)
316 d = self.when_tub_ready()
317 d.addCallback(_publish)
318 d.addErrback(log.err, facility="tahoe.init",
319 level=log.BAD, umid="OEHq3g")
321 def init_nodemaker(self):
322 self.nodemaker = NodeMaker(self.storage_broker,
325 self.getServiceNamed("uploader"),
326 self.getServiceNamed("downloader"),
327 self.download_cache_dirman,
328 self.get_encoding_parameters(),
331 def get_history(self):
332 return self.getServiceNamed("history")
334 def init_control(self):
335 d = self.when_tub_ready()
338 c.setServiceParent(self)
339 control_url = self.tub.registerReference(c)
340 self.write_private_config("control.furl", control_url + "\n")
341 d.addCallback(_publish)
342 d.addErrback(log.err, facility="tahoe.init",
343 level=log.BAD, umid="d3tNXA")
345 def init_helper(self):
346 d = self.when_tub_ready()
348 h = Helper(os.path.join(self.basedir, "helper"),
349 self.stats_provider, self.history)
350 h.setServiceParent(self)
351 # TODO: this is confusing. BASEDIR/private/helper.furl is created
352 # by the helper. BASEDIR/helper.furl is consumed by the client
353 # who wants to use the helper. I like having the filename be the
354 # same, since that makes 'cp' work smoothly, but the difference
355 # between config inputs and generated outputs is hard to see.
356 helper_furlfile = os.path.join(self.basedir,
357 "private", "helper.furl")
358 self.tub.registerReference(h, furlFile=helper_furlfile)
359 d.addCallback(_publish)
360 d.addErrback(log.err, facility="tahoe.init",
361 level=log.BAD, umid="K0mW5w")
363 def init_key_gen(self, key_gen_furl):
364 d = self.when_tub_ready()
365 def _subscribe(self):
366 self.tub.connectTo(key_gen_furl, self._got_key_generator)
367 d.addCallback(_subscribe)
368 d.addErrback(log.err, facility="tahoe.init",
369 level=log.BAD, umid="z9DMzw")
371 def _got_key_generator(self, key_generator):
372 self._key_generator.set_remote_generator(key_generator)
373 key_generator.notifyOnDisconnect(self._lost_key_generator)
375 def _lost_key_generator(self):
376 self._key_generator.set_remote_generator(None)
378 def set_default_mutable_keysize(self, keysize):
379 self._key_generator.set_default_keysize(keysize)
381 def init_web(self, webport):
382 self.log("init_web(webport=%s)", args=(webport,))
384 from allmydata.webish import WebishServer
385 nodeurl_path = os.path.join(self.basedir, "node.url")
386 staticdir = self.get_config("node", "web.static", "public_html")
387 staticdir = os.path.expanduser(staticdir)
388 ws = WebishServer(self, webport, nodeurl_path, staticdir)
391 def init_ftp_server(self):
392 if self.get_config("ftpd", "enabled", False, boolean=True):
393 accountfile = self.get_config("ftpd", "accounts.file", None)
394 accounturl = self.get_config("ftpd", "accounts.url", None)
395 ftp_portstr = self.get_config("ftpd", "port", "8021")
397 from allmydata.frontends import ftpd
398 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
399 s.setServiceParent(self)
401 def init_sftp_server(self):
402 if self.get_config("sftpd", "enabled", False, boolean=True):
403 accountfile = self.get_config("sftpd", "accounts.file", None)
404 accounturl = self.get_config("sftpd", "accounts.url", None)
405 sftp_portstr = self.get_config("sftpd", "port", "8022")
406 pubkey_file = self.get_config("sftpd", "host_pubkey_file")
407 privkey_file = self.get_config("sftpd", "host_privkey_file")
409 from allmydata.frontends import sftpd
410 s = sftpd.SFTPServer(self, accountfile, accounturl,
411 sftp_portstr, pubkey_file, privkey_file)
412 s.setServiceParent(self)
414 def _check_hotline(self, hotline_file):
415 if os.path.exists(hotline_file):
416 mtime = os.stat(hotline_file)[stat.ST_MTIME]
417 if mtime > time.time() - 120.0:
420 self.log("hotline file too old, shutting down")
422 self.log("hotline file missing, shutting down")
425 def get_encoding_parameters(self):
426 return self.DEFAULT_ENCODING_PARAMETERS
428 def connected_to_introducer(self):
429 if self.introducer_client:
430 return self.introducer_client.connected_to_introducer()
433 def get_renewal_secret(self): # this will go away
434 return self._secret_holder.get_renewal_secret()
436 def get_cancel_secret(self):
437 return self._secret_holder.get_cancel_secret()
439 def debug_wait_for_client_connections(self, num_clients):
440 """Return a Deferred that fires (with None) when we have connections
441 to the given number of peers. Useful for tests that set up a
442 temporary test network and need to know when it is safe to proceed
443 with an upload or download."""
445 return len(self.storage_broker.get_all_servers()) >= num_clients
446 d = self.poll(_check, 0.5)
447 d.addCallback(lambda res: None)
451 # these four methods are the primitives for creating filenodes and
452 # dirnodes. The first takes a URI and produces a filenode or (new-style)
453 # dirnode. The other three create brand-new filenodes/dirnodes.
455 def create_node_from_uri(self, writecap, readcap=None):
456 # this returns synchronously.
457 return self.nodemaker.create_from_cap(writecap, readcap)
459 def create_empty_dirnode(self):
460 return self.nodemaker.create_new_mutable_directory()
462 def create_mutable_file(self, contents="", keysize=None):
463 return self.nodemaker.create_mutable_file(contents, keysize)
465 def upload(self, uploadable):
466 uploader = self.getServiceNamed("uploader")
467 return uploader.upload(uploadable, history=self.get_history())