1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
5 from zope.interface import implements
6 from twisted.internet import reactor
7 from twisted.application.internet import TimerService
8 from foolscap.api import Referenceable
9 from foolscap.logging import log
10 from pycryptopp.publickey import rsa
13 from allmydata.storage.server import StorageServer
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.download import Downloader
16 from allmydata.immutable.filenode import FileNode, LiteralFileNode
17 from allmydata.immutable.offloaded import Helper
18 from allmydata.control import ControlServer
19 from allmydata.introducer.client import IntroducerClient
20 from allmydata.util import hashutil, base32, pollmixin, cachedir
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.uri import LiteralFileURI
24 from allmydata.dirnode import NewDirectoryNode
25 from allmydata.mutable.filenode import MutableFileNode
26 from allmydata.stats import StatsProvider
27 from allmydata.history import History
28 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
29 IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
37 class StubClient(Referenceable):
38 implements(RIStubClient)
41 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
43 class Client(node.Node, pollmixin.PollMixin):
44 implements(IStatsProducer)
46 PORTNUMFILE = "client.port"
49 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
51 # This means that if a storage server treats me as though I were a
52 # 1.0.0 storage client, it will work as they expect.
53 OLDEST_SUPPORTED_VERSION = "1.0.0"
55 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
56 # is the number of shares required to reconstruct a file. 'desired' means
57 # that we will abort an upload unless we can allocate space for at least
58 # this many. 'total' is the total number of shares created by encoding.
59 # If everybody has room then this is is how many we will upload.
60 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
63 "max_segment_size": 128*KiB,
66 def __init__(self, basedir="."):
67 node.Node.__init__(self, basedir)
68 self.started_timestamp = time.time()
69 self.logSource="Client"
70 self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
71 self.init_introducer_client()
72 self.init_stats_provider()
73 self.init_lease_secret()
76 if self.get_config("helper", "enabled", False, boolean=True):
79 self._key_generator = None
80 key_gen_furl = self.get_config("client", "key_generator.furl", None)
82 self.init_key_gen(key_gen_furl)
83 # ControlServer and Helper are attached after Tub startup
84 self.init_ftp_server()
85 self.init_sftp_server()
87 hotline_file = os.path.join(self.basedir,
88 self.SUICIDE_PREVENTION_HOTLINE_FILE)
89 if os.path.exists(hotline_file):
90 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
91 self.log("hotline file noticed (%ds old), starting timer" % age)
92 hotline = TimerService(1.0, self._check_hotline, hotline_file)
93 hotline.setServiceParent(self)
95 # this needs to happen last, so it can use getServiceNamed() to
96 # acquire references to StorageServer and other web-statusable things
97 webport = self.get_config("node", "web.port", None)
99 self.init_web(webport) # strports string
101 def read_old_config_files(self):
102 node.Node.read_old_config_files(self)
103 copy = self._copy_config_from_file
104 copy("introducer.furl", "client", "introducer.furl")
105 copy("helper.furl", "client", "helper.furl")
106 copy("key_generator.furl", "client", "key_generator.furl")
107 copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
108 if os.path.exists(os.path.join(self.basedir, "no_storage")):
109 self.set_config("storage", "enabled", "false")
110 if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
111 self.set_config("storage", "readonly", "true")
112 if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
113 self.set_config("storage", "debug_discard", "true")
114 if os.path.exists(os.path.join(self.basedir, "run_helper")):
115 self.set_config("helper", "enabled", "true")
117 def init_introducer_client(self):
118 self.introducer_furl = self.get_config("client", "introducer.furl")
119 ic = IntroducerClient(self.tub, self.introducer_furl,
121 str(allmydata.__full_version__),
122 str(self.OLDEST_SUPPORTED_VERSION))
123 self.introducer_client = ic
124 # hold off on starting the IntroducerClient until our tub has been
125 # started, so we'll have a useful address on our RemoteReference, so
126 # that the introducer's status page will show us.
127 d = self.when_tub_ready()
128 def _start_introducer_client(res):
129 ic.setServiceParent(self)
130 # nodes that want to upload and download will need storage servers
131 ic.subscribe_to("storage")
132 d.addCallback(_start_introducer_client)
133 d.addErrback(log.err, facility="tahoe.init",
134 level=log.BAD, umid="URyI5w")
136 def init_stats_provider(self):
137 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
138 self.stats_provider = StatsProvider(self, gatherer_furl)
139 self.add_service(self.stats_provider)
140 self.stats_provider.register_producer(self)
143 return { 'node.uptime': time.time() - self.started_timestamp }
145 def init_lease_secret(self):
146 secret_s = self.get_or_create_private_config("secret", _make_secret)
147 self._lease_secret = base32.a2b(secret_s)
149 def init_storage(self):
150 # should we run a storage server (and publish it for others to use)?
151 if not self.get_config("storage", "enabled", True, boolean=True):
153 readonly = self.get_config("storage", "readonly", False, boolean=True)
155 storedir = os.path.join(self.basedir, self.STOREDIR)
157 data = self.get_config("storage", "reserved_space", None)
160 reserved = parse_abbreviated_size(data)
162 log.msg("[storage]reserved_space= contains unparseable value %s"
166 discard = self.get_config("storage", "debug_discard", False,
169 expire = self.get_config("storage", "expire.enabled", False, boolean=True)
171 mode = self.get_config("storage", "expire.mode") # require a mode
173 mode = self.get_config("storage", "expire.mode", "age")
175 o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
176 if o_l_d is not None:
177 o_l_d = parse_duration(o_l_d)
180 if mode == "cutoff-date":
181 cutoff_date = self.get_config("storage", "expire.cutoff_date")
182 cutoff_date = parse_date(cutoff_date)
185 if self.get_config("storage", "expire.immutable", True, boolean=True):
186 sharetypes.append("immutable")
187 if self.get_config("storage", "expire.mutable", True, boolean=True):
188 sharetypes.append("mutable")
189 expiration_sharetypes = tuple(sharetypes)
191 ss = StorageServer(storedir, self.nodeid,
192 reserved_space=reserved,
193 discard_storage=discard,
194 readonly_storage=readonly,
195 stats_provider=self.stats_provider,
196 expiration_enabled=expire,
197 expiration_mode=mode,
198 expiration_override_lease_duration=o_l_d,
199 expiration_cutoff_date=cutoff_date,
200 expiration_sharetypes=expiration_sharetypes)
203 d = self.when_tub_ready()
204 # we can't do registerReference until the Tub is ready
206 furl_file = os.path.join(self.basedir, "private", "storage.furl")
207 furl = self.tub.registerReference(ss, furlFile=furl_file)
208 ri_name = RIStorageServer.__remote_name__
209 self.introducer_client.publish(furl, "storage", ri_name)
210 d.addCallback(_publish)
211 d.addErrback(log.err, facility="tahoe.init",
212 level=log.BAD, umid="aLGBKw")
214 def init_client(self):
215 helper_furl = self.get_config("client", "helper.furl", None)
216 DEP = self.DEFAULT_ENCODING_PARAMETERS
217 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
218 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
219 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
220 convergence_s = self.get_or_create_private_config('convergence', _make_secret)
221 self.convergence = base32.a2b(convergence_s)
222 self._node_cache = weakref.WeakValueDictionary() # uri -> node
223 self.add_service(History(self.stats_provider))
224 self.add_service(Uploader(helper_furl, self.stats_provider))
225 download_cachedir = os.path.join(self.basedir,
226 "private", "cache", "download")
227 self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
228 self.download_cache.setServiceParent(self)
229 self.add_service(Downloader(self.stats_provider))
230 self.init_stub_client()
232 def init_stub_client(self):
234 # we publish an empty object so that the introducer can count how
235 # many clients are connected and see what versions they're
238 furl = self.tub.registerReference(sc)
239 ri_name = RIStubClient.__remote_name__
240 self.introducer_client.publish(furl, "stub_client", ri_name)
241 d = self.when_tub_ready()
242 d.addCallback(_publish)
243 d.addErrback(log.err, facility="tahoe.init",
244 level=log.BAD, umid="OEHq3g")
246 def get_history(self):
247 return self.getServiceNamed("history")
249 def init_control(self):
250 d = self.when_tub_ready()
253 c.setServiceParent(self)
254 control_url = self.tub.registerReference(c)
255 self.write_private_config("control.furl", control_url + "\n")
256 d.addCallback(_publish)
257 d.addErrback(log.err, facility="tahoe.init",
258 level=log.BAD, umid="d3tNXA")
260 def init_helper(self):
261 d = self.when_tub_ready()
263 h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
264 h.setServiceParent(self)
265 # TODO: this is confusing. BASEDIR/private/helper.furl is created
266 # by the helper. BASEDIR/helper.furl is consumed by the client
267 # who wants to use the helper. I like having the filename be the
268 # same, since that makes 'cp' work smoothly, but the difference
269 # between config inputs and generated outputs is hard to see.
270 helper_furlfile = os.path.join(self.basedir,
271 "private", "helper.furl")
272 self.tub.registerReference(h, furlFile=helper_furlfile)
273 d.addCallback(_publish)
274 d.addErrback(log.err, facility="tahoe.init",
275 level=log.BAD, umid="K0mW5w")
277 def init_key_gen(self, key_gen_furl):
278 d = self.when_tub_ready()
279 def _subscribe(self):
280 self.tub.connectTo(key_gen_furl, self._got_key_generator)
281 d.addCallback(_subscribe)
282 d.addErrback(log.err, facility="tahoe.init",
283 level=log.BAD, umid="z9DMzw")
285 def _got_key_generator(self, key_generator):
286 self._key_generator = key_generator
287 key_generator.notifyOnDisconnect(self._lost_key_generator)
289 def _lost_key_generator(self):
290 self._key_generator = None
292 def get_servers(self, service_name):
293 """ Return frozenset of (peerid, versioned-rref) """
294 assert isinstance(service_name, str)
295 return self.introducer_client.get_peers(service_name)
297 def init_web(self, webport):
298 self.log("init_web(webport=%s)", args=(webport,))
300 from allmydata.webish import WebishServer
301 nodeurl_path = os.path.join(self.basedir, "node.url")
302 staticdir = self.get_config("node", "web.static", "public_html")
303 staticdir = os.path.expanduser(staticdir)
304 ws = WebishServer(self, webport, nodeurl_path, staticdir)
307 def init_ftp_server(self):
308 if self.get_config("ftpd", "enabled", False, boolean=True):
309 accountfile = self.get_config("ftpd", "accounts.file", None)
310 accounturl = self.get_config("ftpd", "accounts.url", None)
311 ftp_portstr = self.get_config("ftpd", "port", "8021")
313 from allmydata.frontends import ftpd
314 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
315 s.setServiceParent(self)
317 def init_sftp_server(self):
318 if self.get_config("sftpd", "enabled", False, boolean=True):
319 accountfile = self.get_config("sftpd", "accounts.file", None)
320 accounturl = self.get_config("sftpd", "accounts.url", None)
321 sftp_portstr = self.get_config("sftpd", "port", "8022")
322 pubkey_file = self.get_config("sftpd", "host_pubkey_file")
323 privkey_file = self.get_config("sftpd", "host_privkey_file")
325 from allmydata.frontends import sftpd
326 s = sftpd.SFTPServer(self, accountfile, accounturl,
327 sftp_portstr, pubkey_file, privkey_file)
328 s.setServiceParent(self)
330 def _check_hotline(self, hotline_file):
331 if os.path.exists(hotline_file):
332 mtime = os.stat(hotline_file)[stat.ST_MTIME]
333 if mtime > time.time() - 120.0:
336 self.log("hotline file too old, shutting down")
338 self.log("hotline file missing, shutting down")
341 def get_all_peerids(self):
342 return self.introducer_client.get_all_peerids()
343 def get_nickname_for_peerid(self, peerid):
344 return self.introducer_client.get_nickname_for_peerid(peerid)
346 def get_permuted_peers(self, service_name, key):
348 @return: list of (peerid, connection,)
350 assert isinstance(service_name, str)
351 assert isinstance(key, str)
352 return self.introducer_client.get_permuted_peers(service_name, key)
354 def get_encoding_parameters(self):
355 return self.DEFAULT_ENCODING_PARAMETERS
357 def connected_to_introducer(self):
358 if self.introducer_client:
359 return self.introducer_client.connected_to_introducer()
362 def get_renewal_secret(self):
363 return hashutil.my_renewal_secret_hash(self._lease_secret)
365 def get_cancel_secret(self):
366 return hashutil.my_cancel_secret_hash(self._lease_secret)
368 def debug_wait_for_client_connections(self, num_clients):
369 """Return a Deferred that fires (with None) when we have connections
370 to the given number of peers. Useful for tests that set up a
371 temporary test network and need to know when it is safe to proceed
372 with an upload or download."""
374 current_clients = list(self.get_all_peerids())
375 return len(current_clients) >= num_clients
376 d = self.poll(_check, 0.5)
377 d.addCallback(lambda res: None)
381 # these four methods are the primitives for creating filenodes and
382 # dirnodes. The first takes a URI and produces a filenode or (new-style)
383 # dirnode. The other three create brand-new filenodes/dirnodes.
385 def create_node_from_uri(self, u):
386 # this returns synchronously.
389 if u_s not in self._node_cache:
390 if IReadonlyNewDirectoryURI.providedBy(u):
391 # new-style read-only dirnodes
392 node = NewDirectoryNode(self).init_from_uri(u)
393 elif INewDirectoryURI.providedBy(u):
395 node = NewDirectoryNode(self).init_from_uri(u)
396 elif IFileURI.providedBy(u):
397 if isinstance(u, LiteralFileURI):
398 node = LiteralFileNode(u, self) # LIT
400 key = base32.b2a(u.storage_index)
401 cachefile = self.download_cache.get_file(key)
402 node = FileNode(u, self, cachefile) # CHK
404 assert IMutableFileURI.providedBy(u), u
405 node = MutableFileNode(self).init_from_uri(u)
406 self._node_cache[u_s] = node
407 return self._node_cache[u_s]
409 def create_empty_dirnode(self):
410 n = NewDirectoryNode(self)
411 d = n.create(self._generate_pubprivkeys)
412 d.addCallback(lambda res: n)
415 def create_mutable_file(self, contents=""):
416 n = MutableFileNode(self)
417 d = n.create(contents, self._generate_pubprivkeys)
418 d.addCallback(lambda res: n)
421 def _generate_pubprivkeys(self, key_size):
422 if self._key_generator:
423 d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
424 def make_key_objs((verifying_key, signing_key)):
425 v = rsa.create_verifying_key_from_string(verifying_key)
426 s = rsa.create_signing_key_from_string(signing_key)
428 d.addCallback(make_key_objs)
431 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
433 signer = rsa.generate(key_size)
434 verifier = signer.get_verifying_key()
435 return verifier, signer
437 def upload(self, uploadable):
438 uploader = self.getServiceNamed("uploader")
439 return uploader.upload(uploadable, history=self.get_history())
442 def list_all_upload_statuses(self):
443 return self.get_history().list_all_upload_statuses()
445 def list_all_download_statuses(self):
446 return self.get_history().list_all_download_statuses()
448 def list_all_mapupdate_statuses(self):
449 return self.get_history().list_all_mapupdate_statuses()
450 def list_all_publish_statuses(self):
451 return self.get_history().list_all_publish_statuses()
452 def list_all_retrieve_statuses(self):
453 return self.get_history().list_all_retrieve_statuses()
455 def list_all_helper_statuses(self):
457 helper = self.getServiceNamed("helper")
460 return helper.get_all_upload_statuses()