1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
5 from zope.interface import implements
6 from twisted.internet import reactor
7 from twisted.application.internet import TimerService
8 from foolscap.api import Referenceable
9 from foolscap.logging import log
10 from pycryptopp.publickey import rsa
13 from allmydata.storage.server import StorageServer
14 from allmydata import storage_client
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.download import Downloader
17 from allmydata.immutable.filenode import FileNode, LiteralFileNode
18 from allmydata.immutable.offloaded import Helper
19 from allmydata.control import ControlServer
20 from allmydata.introducer.client import IntroducerClient
21 from allmydata.util import hashutil, base32, pollmixin, cachedir
22 from allmydata.util.abbreviate import parse_abbreviated_size
23 from allmydata.util.time_format import parse_duration, parse_date
24 from allmydata.uri import LiteralFileURI
25 from allmydata.dirnode import NewDirectoryNode
26 from allmydata.mutable.filenode import MutableFileNode
27 from allmydata.stats import StatsProvider
28 from allmydata.history import History
29 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
30 IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
38 class StubClient(Referenceable):
39 implements(RIStubClient)
42 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
44 class Client(node.Node, pollmixin.PollMixin):
45 implements(IStatsProducer)
47 PORTNUMFILE = "client.port"
50 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
52 # This means that if a storage server treats me as though I were a
53 # 1.0.0 storage client, it will work as they expect.
54 OLDEST_SUPPORTED_VERSION = "1.0.0"
56 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
57 # is the number of shares required to reconstruct a file. 'desired' means
58 # that we will abort an upload unless we can allocate space for at least
59 # this many. 'total' is the total number of shares created by encoding.
60 # If everybody has room then this is is how many we will upload.
61 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
64 "max_segment_size": 128*KiB,
67 def __init__(self, basedir="."):
68 node.Node.__init__(self, basedir)
69 self.started_timestamp = time.time()
70 self.logSource="Client"
71 self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
72 self.init_introducer_client()
73 self.init_stats_provider()
74 self.init_lease_secret()
77 if self.get_config("helper", "enabled", False, boolean=True):
80 self._key_generator = None
81 key_gen_furl = self.get_config("client", "key_generator.furl", None)
83 self.init_key_gen(key_gen_furl)
84 # ControlServer and Helper are attached after Tub startup
85 self.init_ftp_server()
86 self.init_sftp_server()
88 hotline_file = os.path.join(self.basedir,
89 self.SUICIDE_PREVENTION_HOTLINE_FILE)
90 if os.path.exists(hotline_file):
91 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
92 self.log("hotline file noticed (%ds old), starting timer" % age)
93 hotline = TimerService(1.0, self._check_hotline, hotline_file)
94 hotline.setServiceParent(self)
96 # this needs to happen last, so it can use getServiceNamed() to
97 # acquire references to StorageServer and other web-statusable things
98 webport = self.get_config("node", "web.port", None)
100 self.init_web(webport) # strports string
102 def read_old_config_files(self):
103 node.Node.read_old_config_files(self)
104 copy = self._copy_config_from_file
105 copy("introducer.furl", "client", "introducer.furl")
106 copy("helper.furl", "client", "helper.furl")
107 copy("key_generator.furl", "client", "key_generator.furl")
108 copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
109 if os.path.exists(os.path.join(self.basedir, "no_storage")):
110 self.set_config("storage", "enabled", "false")
111 if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
112 self.set_config("storage", "readonly", "true")
113 if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
114 self.set_config("storage", "debug_discard", "true")
115 if os.path.exists(os.path.join(self.basedir, "run_helper")):
116 self.set_config("helper", "enabled", "true")
118 def init_introducer_client(self):
119 self.introducer_furl = self.get_config("client", "introducer.furl")
120 ic = IntroducerClient(self.tub, self.introducer_furl,
122 str(allmydata.__full_version__),
123 str(self.OLDEST_SUPPORTED_VERSION))
124 self.introducer_client = ic
125 # hold off on starting the IntroducerClient until our tub has been
126 # started, so we'll have a useful address on our RemoteReference, so
127 # that the introducer's status page will show us.
128 d = self.when_tub_ready()
129 def _start_introducer_client(res):
130 ic.setServiceParent(self)
131 # nodes that want to upload and download will need storage servers
132 ic.subscribe_to("storage")
133 d.addCallback(_start_introducer_client)
134 d.addErrback(log.err, facility="tahoe.init",
135 level=log.BAD, umid="URyI5w")
137 def init_stats_provider(self):
138 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
139 self.stats_provider = StatsProvider(self, gatherer_furl)
140 self.add_service(self.stats_provider)
141 self.stats_provider.register_producer(self)
144 return { 'node.uptime': time.time() - self.started_timestamp }
146 def init_lease_secret(self):
147 secret_s = self.get_or_create_private_config("secret", _make_secret)
148 self._lease_secret = base32.a2b(secret_s)
150 def init_storage(self):
151 # should we run a storage server (and publish it for others to use)?
152 if not self.get_config("storage", "enabled", True, boolean=True):
154 readonly = self.get_config("storage", "readonly", False, boolean=True)
156 storedir = os.path.join(self.basedir, self.STOREDIR)
158 data = self.get_config("storage", "reserved_space", None)
161 reserved = parse_abbreviated_size(data)
163 log.msg("[storage]reserved_space= contains unparseable value %s"
167 discard = self.get_config("storage", "debug_discard", False,
170 expire = self.get_config("storage", "expire.enabled", False, boolean=True)
172 mode = self.get_config("storage", "expire.mode") # require a mode
174 mode = self.get_config("storage", "expire.mode", "age")
176 o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
177 if o_l_d is not None:
178 o_l_d = parse_duration(o_l_d)
181 if mode == "cutoff-date":
182 cutoff_date = self.get_config("storage", "expire.cutoff_date")
183 cutoff_date = parse_date(cutoff_date)
186 if self.get_config("storage", "expire.immutable", True, boolean=True):
187 sharetypes.append("immutable")
188 if self.get_config("storage", "expire.mutable", True, boolean=True):
189 sharetypes.append("mutable")
190 expiration_sharetypes = tuple(sharetypes)
192 ss = StorageServer(storedir, self.nodeid,
193 reserved_space=reserved,
194 discard_storage=discard,
195 readonly_storage=readonly,
196 stats_provider=self.stats_provider,
197 expiration_enabled=expire,
198 expiration_mode=mode,
199 expiration_override_lease_duration=o_l_d,
200 expiration_cutoff_date=cutoff_date,
201 expiration_sharetypes=expiration_sharetypes)
204 d = self.when_tub_ready()
205 # we can't do registerReference until the Tub is ready
207 furl_file = os.path.join(self.basedir, "private", "storage.furl")
208 furl = self.tub.registerReference(ss, furlFile=furl_file)
209 ri_name = RIStorageServer.__remote_name__
210 self.introducer_client.publish(furl, "storage", ri_name)
211 d.addCallback(_publish)
212 d.addErrback(log.err, facility="tahoe.init",
213 level=log.BAD, umid="aLGBKw")
215 def init_client(self):
216 helper_furl = self.get_config("client", "helper.furl", None)
217 DEP = self.DEFAULT_ENCODING_PARAMETERS
218 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
219 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
220 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
221 convergence_s = self.get_or_create_private_config('convergence', _make_secret)
222 self.convergence = base32.a2b(convergence_s)
223 self._node_cache = weakref.WeakValueDictionary() # uri -> node
225 self.init_client_storage_broker()
226 self.add_service(History(self.stats_provider))
227 self.add_service(Uploader(helper_furl, self.stats_provider))
228 download_cachedir = os.path.join(self.basedir,
229 "private", "cache", "download")
230 self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
231 self.download_cache.setServiceParent(self)
232 self.add_service(Downloader(self.stats_provider))
233 self.init_stub_client()
235 def init_client_storage_broker(self):
236 # create a StorageFarmBroker object, for use by Uploader/Downloader
237 # (and everybody else who wants to use storage servers)
238 self.storage_broker = sb = storage_client.StorageFarmBroker()
240 # load static server specifications from tahoe.cfg, if any
241 #if self.config.has_section("client-server-selection"):
242 # server_params = {} # maps serverid to dict of parameters
243 # for (name, value) in self.config.items("client-server-selection"):
244 # pieces = name.split(".")
245 # if pieces[0] == "server":
246 # serverid = pieces[1]
247 # if serverid not in server_params:
248 # server_params[serverid] = {}
249 # server_params[serverid][pieces[2]] = value
250 # for serverid, params in server_params.items():
251 # server_type = params.pop("type")
252 # if server_type == "tahoe-foolscap":
253 # s = storage_client.NativeStorageClient(*params)
255 # msg = ("unrecognized server type '%s' in "
256 # "tahoe.cfg [client-server-selection]server.%s.type"
257 # % (server_type, serverid))
258 # raise storage_client.UnknownServerTypeError(msg)
259 # sb.add_server(s.serverid, s)
261 # check to see if we're supposed to use the introducer too
262 if self.get_config("client-server-selection", "use_introducer",
263 default=True, boolean=True):
264 sb.use_introducer(self.introducer_client)
266 def get_storage_broker(self):
267 return self.storage_broker
269 def init_stub_client(self):
271 # we publish an empty object so that the introducer can count how
272 # many clients are connected and see what versions they're
275 furl = self.tub.registerReference(sc)
276 ri_name = RIStubClient.__remote_name__
277 self.introducer_client.publish(furl, "stub_client", ri_name)
278 d = self.when_tub_ready()
279 d.addCallback(_publish)
280 d.addErrback(log.err, facility="tahoe.init",
281 level=log.BAD, umid="OEHq3g")
283 def get_history(self):
284 return self.getServiceNamed("history")
286 def init_control(self):
287 d = self.when_tub_ready()
290 c.setServiceParent(self)
291 control_url = self.tub.registerReference(c)
292 self.write_private_config("control.furl", control_url + "\n")
293 d.addCallback(_publish)
294 d.addErrback(log.err, facility="tahoe.init",
295 level=log.BAD, umid="d3tNXA")
297 def init_helper(self):
298 d = self.when_tub_ready()
300 h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
301 h.setServiceParent(self)
302 # TODO: this is confusing. BASEDIR/private/helper.furl is created
303 # by the helper. BASEDIR/helper.furl is consumed by the client
304 # who wants to use the helper. I like having the filename be the
305 # same, since that makes 'cp' work smoothly, but the difference
306 # between config inputs and generated outputs is hard to see.
307 helper_furlfile = os.path.join(self.basedir,
308 "private", "helper.furl")
309 self.tub.registerReference(h, furlFile=helper_furlfile)
310 d.addCallback(_publish)
311 d.addErrback(log.err, facility="tahoe.init",
312 level=log.BAD, umid="K0mW5w")
314 def init_key_gen(self, key_gen_furl):
315 d = self.when_tub_ready()
316 def _subscribe(self):
317 self.tub.connectTo(key_gen_furl, self._got_key_generator)
318 d.addCallback(_subscribe)
319 d.addErrback(log.err, facility="tahoe.init",
320 level=log.BAD, umid="z9DMzw")
322 def _got_key_generator(self, key_generator):
323 self._key_generator = key_generator
324 key_generator.notifyOnDisconnect(self._lost_key_generator)
326 def _lost_key_generator(self):
327 self._key_generator = None
329 def get_servers(self, service_name):
330 """ Return frozenset of (peerid, versioned-rref) """
331 assert isinstance(service_name, str)
332 return self.introducer_client.get_peers(service_name)
334 def init_web(self, webport):
335 self.log("init_web(webport=%s)", args=(webport,))
337 from allmydata.webish import WebishServer
338 nodeurl_path = os.path.join(self.basedir, "node.url")
339 staticdir = self.get_config("node", "web.static", "public_html")
340 staticdir = os.path.expanduser(staticdir)
341 ws = WebishServer(self, webport, nodeurl_path, staticdir)
344 def init_ftp_server(self):
345 if self.get_config("ftpd", "enabled", False, boolean=True):
346 accountfile = self.get_config("ftpd", "accounts.file", None)
347 accounturl = self.get_config("ftpd", "accounts.url", None)
348 ftp_portstr = self.get_config("ftpd", "port", "8021")
350 from allmydata.frontends import ftpd
351 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
352 s.setServiceParent(self)
354 def init_sftp_server(self):
355 if self.get_config("sftpd", "enabled", False, boolean=True):
356 accountfile = self.get_config("sftpd", "accounts.file", None)
357 accounturl = self.get_config("sftpd", "accounts.url", None)
358 sftp_portstr = self.get_config("sftpd", "port", "8022")
359 pubkey_file = self.get_config("sftpd", "host_pubkey_file")
360 privkey_file = self.get_config("sftpd", "host_privkey_file")
362 from allmydata.frontends import sftpd
363 s = sftpd.SFTPServer(self, accountfile, accounturl,
364 sftp_portstr, pubkey_file, privkey_file)
365 s.setServiceParent(self)
367 def _check_hotline(self, hotline_file):
368 if os.path.exists(hotline_file):
369 mtime = os.stat(hotline_file)[stat.ST_MTIME]
370 if mtime > time.time() - 120.0:
373 self.log("hotline file too old, shutting down")
375 self.log("hotline file missing, shutting down")
378 def get_encoding_parameters(self):
379 return self.DEFAULT_ENCODING_PARAMETERS
381 def connected_to_introducer(self):
382 if self.introducer_client:
383 return self.introducer_client.connected_to_introducer()
386 def get_renewal_secret(self):
387 return hashutil.my_renewal_secret_hash(self._lease_secret)
389 def get_cancel_secret(self):
390 return hashutil.my_cancel_secret_hash(self._lease_secret)
392 def debug_wait_for_client_connections(self, num_clients):
393 """Return a Deferred that fires (with None) when we have connections
394 to the given number of peers. Useful for tests that set up a
395 temporary test network and need to know when it is safe to proceed
396 with an upload or download."""
398 current_clients = list(self.storage_broker.get_all_serverids())
399 return len(current_clients) >= num_clients
400 d = self.poll(_check, 0.5)
401 d.addCallback(lambda res: None)
405 # these four methods are the primitives for creating filenodes and
406 # dirnodes. The first takes a URI and produces a filenode or (new-style)
407 # dirnode. The other three create brand-new filenodes/dirnodes.
409 def create_node_from_uri(self, u):
410 # this returns synchronously.
413 if u_s not in self._node_cache:
414 if IReadonlyNewDirectoryURI.providedBy(u):
415 # new-style read-only dirnodes
416 node = NewDirectoryNode(self).init_from_uri(u)
417 elif INewDirectoryURI.providedBy(u):
419 node = NewDirectoryNode(self).init_from_uri(u)
420 elif IFileURI.providedBy(u):
421 if isinstance(u, LiteralFileURI):
422 node = LiteralFileNode(u, self) # LIT
424 key = base32.b2a(u.storage_index)
425 cachefile = self.download_cache.get_file(key)
426 node = FileNode(u, self, cachefile) # CHK
428 assert IMutableFileURI.providedBy(u), u
429 node = MutableFileNode(self).init_from_uri(u)
430 self._node_cache[u_s] = node
431 return self._node_cache[u_s]
433 def create_empty_dirnode(self):
434 n = NewDirectoryNode(self)
435 d = n.create(self._generate_pubprivkeys)
436 d.addCallback(lambda res: n)
439 def create_mutable_file(self, contents=""):
440 n = MutableFileNode(self)
441 d = n.create(contents, self._generate_pubprivkeys)
442 d.addCallback(lambda res: n)
445 def _generate_pubprivkeys(self, key_size):
446 if self._key_generator:
447 d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
448 def make_key_objs((verifying_key, signing_key)):
449 v = rsa.create_verifying_key_from_string(verifying_key)
450 s = rsa.create_signing_key_from_string(signing_key)
452 d.addCallback(make_key_objs)
455 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
457 signer = rsa.generate(key_size)
458 verifier = signer.get_verifying_key()
459 return verifier, signer
461 def upload(self, uploadable):
462 uploader = self.getServiceNamed("uploader")
463 return uploader.upload(uploadable, history=self.get_history())
466 def list_all_upload_statuses(self):
467 return self.get_history().list_all_upload_statuses()
469 def list_all_download_statuses(self):
470 return self.get_history().list_all_download_statuses()
472 def list_all_mapupdate_statuses(self):
473 return self.get_history().list_all_mapupdate_statuses()
474 def list_all_publish_statuses(self):
475 return self.get_history().list_all_publish_statuses()
476 def list_all_retrieve_statuses(self):
477 return self.get_history().list_all_retrieve_statuses()
479 def list_all_helper_statuses(self):
481 helper = self.getServiceNamed("helper")
484 return helper.get_all_upload_statuses()