2 import os, stat, time, weakref
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
6 from zope.interface import implements
7 from twisted.internet import reactor
8 from twisted.application.internet import TimerService
9 from foolscap import Referenceable
10 from foolscap.logging import log
11 from pycryptopp.publickey import rsa
14 from allmydata.storage import StorageServer
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.download import Downloader
17 from allmydata.immutable.filenode import FileNode, LiteralFileNode
18 from allmydata.immutable.offloaded import Helper
19 from allmydata.control import ControlServer
20 from allmydata.introducer.client import IntroducerClient
21 from allmydata.util import hashutil, base32, pollmixin, cachedir
22 from allmydata.util.abbreviate import parse_abbreviated_size
23 from allmydata.uri import LiteralFileURI
24 from allmydata.dirnode import NewDirectoryNode
25 from allmydata.mutable.filenode import MutableFileNode
26 from allmydata.stats import StatsProvider
27 from allmydata.history import History
28 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
29 IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
37 class StubClient(Referenceable):
38 implements(RIStubClient)
41 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
43 class Client(node.Node, pollmixin.PollMixin):
44 implements(IStatsProducer)
46 PORTNUMFILE = "client.port"
49 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
51 # This means that if a storage server treats me as though I were a
52 # 1.0.0 storage client, it will work as they expect.
53 OLDEST_SUPPORTED_VERSION = "1.0.0"
55 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
56 # is the number of shares required to reconstruct a file. 'desired' means
57 # that we will abort an upload unless we can allocate space for at least
58 # this many. 'total' is the total number of shares created by encoding.
59 # If everybody has room then this is is how many we will upload.
60 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
63 "max_segment_size": 128*KiB,
66 def __init__(self, basedir="."):
67 node.Node.__init__(self, basedir)
68 self.started_timestamp = time.time()
69 self.logSource="Client"
70 self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
71 self.init_introducer_client()
72 self.init_stats_provider()
73 self.init_lease_secret()
76 if self.get_config("helper", "enabled", False, boolean=True):
79 self._key_generator = None
80 key_gen_furl = self.get_config("client", "key_generator.furl", None)
82 self.init_key_gen(key_gen_furl)
83 # ControlServer and Helper are attached after Tub startup
84 self.init_ftp_server()
85 self.init_sftp_server()
87 hotline_file = os.path.join(self.basedir,
88 self.SUICIDE_PREVENTION_HOTLINE_FILE)
89 if os.path.exists(hotline_file):
90 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
91 self.log("hotline file noticed (%ds old), starting timer" % age)
92 hotline = TimerService(1.0, self._check_hotline, hotline_file)
93 hotline.setServiceParent(self)
95 webport = self.get_config("node", "web.port", None)
97 self.init_web(webport) # strports string
99 def read_old_config_files(self):
100 node.Node.read_old_config_files(self)
101 copy = self._copy_config_from_file
102 copy("introducer.furl", "client", "introducer.furl")
103 copy("helper.furl", "client", "helper.furl")
104 copy("key_generator.furl", "client", "key_generator.furl")
105 copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
106 if os.path.exists(os.path.join(self.basedir, "no_storage")):
107 self.set_config("storage", "enabled", "false")
108 if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
109 self.set_config("storage", "readonly", "true")
110 if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
111 self.set_config("storage", "debug_discard", "true")
112 if os.path.exists(os.path.join(self.basedir, "run_helper")):
113 self.set_config("helper", "enabled", "true")
115 def init_introducer_client(self):
116 self.introducer_furl = self.get_config("client", "introducer.furl")
117 ic = IntroducerClient(self.tub, self.introducer_furl,
119 str(allmydata.__version__),
120 str(self.OLDEST_SUPPORTED_VERSION))
121 self.introducer_client = ic
122 # hold off on starting the IntroducerClient until our tub has been
123 # started, so we'll have a useful address on our RemoteReference, so
124 # that the introducer's status page will show us.
125 d = self.when_tub_ready()
126 def _start_introducer_client(res):
127 ic.setServiceParent(self)
128 # nodes that want to upload and download will need storage servers
129 ic.subscribe_to("storage")
130 d.addCallback(_start_introducer_client)
131 d.addErrback(log.err, facility="tahoe.init",
132 level=log.BAD, umid="URyI5w")
134 def init_stats_provider(self):
135 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
136 self.stats_provider = StatsProvider(self, gatherer_furl)
137 self.add_service(self.stats_provider)
138 self.stats_provider.register_producer(self)
141 return { 'node.uptime': time.time() - self.started_timestamp }
143 def init_lease_secret(self):
144 secret_s = self.get_or_create_private_config("secret", _make_secret)
145 self._lease_secret = base32.a2b(secret_s)
147 def init_storage(self):
148 # should we run a storage server (and publish it for others to use)?
149 if not self.get_config("storage", "enabled", True, boolean=True):
151 readonly = self.get_config("storage", "readonly", False, boolean=True)
153 storedir = os.path.join(self.basedir, self.STOREDIR)
155 data = self.get_config("storage", "reserved_space", None)
158 reserved = parse_abbreviated_size(data)
160 log.msg("[storage]reserved_space= contains unparseable value %s"
164 discard = self.get_config("storage", "debug_discard", False,
166 ss = StorageServer(storedir,
167 reserved_space=reserved,
168 discard_storage=discard,
169 readonly_storage=readonly,
170 stats_provider=self.stats_provider)
172 d = self.when_tub_ready()
173 # we can't do registerReference until the Tub is ready
175 furl_file = os.path.join(self.basedir, "private", "storage.furl")
176 furl = self.tub.registerReference(ss, furlFile=furl_file)
177 ri_name = RIStorageServer.__remote_name__
178 self.introducer_client.publish(furl, "storage", ri_name)
179 d.addCallback(_publish)
180 d.addErrback(log.err, facility="tahoe.init",
181 level=log.BAD, umid="aLGBKw")
183 def init_client(self):
184 helper_furl = self.get_config("client", "helper.furl", None)
185 DEP = self.DEFAULT_ENCODING_PARAMETERS
186 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
187 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
188 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
189 convergence_s = self.get_or_create_private_config('convergence', _make_secret)
190 self.convergence = base32.a2b(convergence_s)
191 self._node_cache = weakref.WeakValueDictionary() # uri -> node
192 self.add_service(History(self.stats_provider))
193 self.add_service(Uploader(helper_furl, self.stats_provider))
194 download_cachedir = os.path.join(self.basedir,
195 "private", "cache", "download")
196 self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
197 self.download_cache.setServiceParent(self)
198 self.add_service(Downloader(self.stats_provider))
200 # we publish an empty object so that the introducer can count how
201 # many clients are connected and see what versions they're
204 furl = self.tub.registerReference(sc)
205 ri_name = RIStubClient.__remote_name__
206 self.introducer_client.publish(furl, "stub_client", ri_name)
207 d = self.when_tub_ready()
208 d.addCallback(_publish)
209 d.addErrback(log.err, facility="tahoe.init",
210 level=log.BAD, umid="OEHq3g")
212 def get_history(self):
213 return self.getServiceNamed("history")
215 def init_control(self):
216 d = self.when_tub_ready()
219 c.setServiceParent(self)
220 control_url = self.tub.registerReference(c)
221 self.write_private_config("control.furl", control_url + "\n")
222 d.addCallback(_publish)
223 d.addErrback(log.err, facility="tahoe.init",
224 level=log.BAD, umid="d3tNXA")
226 def init_helper(self):
227 d = self.when_tub_ready()
229 h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
230 h.setServiceParent(self)
231 # TODO: this is confusing. BASEDIR/private/helper.furl is created
232 # by the helper. BASEDIR/helper.furl is consumed by the client
233 # who wants to use the helper. I like having the filename be the
234 # same, since that makes 'cp' work smoothly, but the difference
235 # between config inputs and generated outputs is hard to see.
236 helper_furlfile = os.path.join(self.basedir,
237 "private", "helper.furl")
238 self.tub.registerReference(h, furlFile=helper_furlfile)
239 d.addCallback(_publish)
240 d.addErrback(log.err, facility="tahoe.init",
241 level=log.BAD, umid="K0mW5w")
243 def init_key_gen(self, key_gen_furl):
244 d = self.when_tub_ready()
245 def _subscribe(self):
246 self.tub.connectTo(key_gen_furl, self._got_key_generator)
247 d.addCallback(_subscribe)
248 d.addErrback(log.err, facility="tahoe.init",
249 level=log.BAD, umid="z9DMzw")
251 def _got_key_generator(self, key_generator):
252 self._key_generator = key_generator
253 key_generator.notifyOnDisconnect(self._lost_key_generator)
255 def _lost_key_generator(self):
256 self._key_generator = None
258 def get_servers(self, service_name):
259 """ Return set of (peerid, versioned-rref) """
260 assert isinstance(service_name, str)
261 return self.introducer_client.get_peers(service_name)
263 def init_web(self, webport):
264 self.log("init_web(webport=%s)", args=(webport,))
266 from allmydata.webish import WebishServer
267 nodeurl_path = os.path.join(self.basedir, "node.url")
268 staticdir = self.get_config("node", "web.static", "public_html")
269 staticdir = os.path.expanduser(staticdir)
270 # should we provide ambient upload authority?
271 ambientUploadAuthority = self.get_config("node", "web.ambient_upload_authority", True, boolean=True)
272 ws = WebishServer(webport, nodeurl_path, staticdir, ambientUploadAuthority)
275 def init_ftp_server(self):
276 if self.get_config("ftpd", "enabled", False, boolean=True):
277 accountfile = self.get_config("ftpd", "accounts.file", None)
278 accounturl = self.get_config("ftpd", "accounts.url", None)
279 ftp_portstr = self.get_config("ftpd", "port", "8021")
281 from allmydata.frontends import ftpd
282 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
283 s.setServiceParent(self)
285 def init_sftp_server(self):
286 if self.get_config("sftpd", "enabled", False, boolean=True):
287 accountfile = self.get_config("sftpd", "accounts.file", None)
288 accounturl = self.get_config("sftpd", "accounts.url", None)
289 sftp_portstr = self.get_config("sftpd", "port", "8022")
290 pubkey_file = self.get_config("sftpd", "host_pubkey_file")
291 privkey_file = self.get_config("sftpd", "host_privkey_file")
293 from allmydata.frontends import sftpd
294 s = sftpd.SFTPServer(self, accountfile, accounturl,
295 sftp_portstr, pubkey_file, privkey_file)
296 s.setServiceParent(self)
298 def _check_hotline(self, hotline_file):
299 if os.path.exists(hotline_file):
300 mtime = os.stat(hotline_file)[stat.ST_MTIME]
301 if mtime > time.time() - 40.0:
304 self.log("hotline file too old, shutting down")
306 self.log("hotline file missing, shutting down")
309 def get_all_peerids(self):
310 return self.introducer_client.get_all_peerids()
311 def get_nickname_for_peerid(self, peerid):
312 return self.introducer_client.get_nickname_for_peerid(peerid)
314 def get_permuted_peers(self, service_name, key):
316 @return: list of (peerid, connection,)
318 assert isinstance(service_name, str)
319 assert isinstance(key, str)
320 return self.introducer_client.get_permuted_peers(service_name, key)
322 def get_encoding_parameters(self):
323 return self.DEFAULT_ENCODING_PARAMETERS
325 def connected_to_introducer(self):
326 if self.introducer_client:
327 return self.introducer_client.connected_to_introducer()
330 def get_renewal_secret(self):
331 return hashutil.my_renewal_secret_hash(self._lease_secret)
333 def get_cancel_secret(self):
334 return hashutil.my_cancel_secret_hash(self._lease_secret)
336 def debug_wait_for_client_connections(self, num_clients):
337 """Return a Deferred that fires (with None) when we have connections
338 to the given number of peers. Useful for tests that set up a
339 temporary test network and need to know when it is safe to proceed
340 with an upload or download."""
342 current_clients = list(self.get_all_peerids())
343 return len(current_clients) >= num_clients
344 d = self.poll(_check, 0.5)
345 d.addCallback(lambda res: None)
349 # these four methods are the primitives for creating filenodes and
350 # dirnodes. The first takes a URI and produces a filenode or (new-style)
351 # dirnode. The other three create brand-new filenodes/dirnodes.
353 def create_node_from_uri(self, u):
354 # this returns synchronously.
357 if u_s not in self._node_cache:
358 if IReadonlyNewDirectoryURI.providedBy(u):
359 # new-style read-only dirnodes
360 node = NewDirectoryNode(self).init_from_uri(u)
361 elif INewDirectoryURI.providedBy(u):
363 node = NewDirectoryNode(self).init_from_uri(u)
364 elif IFileURI.providedBy(u):
365 if isinstance(u, LiteralFileURI):
366 node = LiteralFileNode(u, self) # LIT
368 key = base32.b2a(u.storage_index)
369 cachefile = self.download_cache.get_file(key)
370 node = FileNode(u, self, cachefile) # CHK
372 assert IMutableFileURI.providedBy(u), u
373 node = MutableFileNode(self).init_from_uri(u)
374 self._node_cache[u_s] = node
375 return self._node_cache[u_s]
377 def create_empty_dirnode(self):
378 n = NewDirectoryNode(self)
379 d = n.create(self._generate_pubprivkeys)
380 d.addCallback(lambda res: n)
383 def create_mutable_file(self, contents=""):
384 n = MutableFileNode(self)
385 d = n.create(contents, self._generate_pubprivkeys)
386 d.addCallback(lambda res: n)
389 def _generate_pubprivkeys(self, key_size):
390 if self._key_generator:
391 d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
392 def make_key_objs((verifying_key, signing_key)):
393 v = rsa.create_verifying_key_from_string(verifying_key)
394 s = rsa.create_signing_key_from_string(signing_key)
396 d.addCallback(make_key_objs)
399 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
401 signer = rsa.generate(key_size)
402 verifier = signer.get_verifying_key()
403 return verifier, signer
405 def upload(self, uploadable):
406 uploader = self.getServiceNamed("uploader")
407 return uploader.upload(uploadable, history=self.get_history())
410 def list_all_upload_statuses(self):
411 return self.get_history().list_all_upload_statuses()
413 def list_all_download_statuses(self):
414 return self.get_history().list_all_download_statuses()
416 def list_all_mapupdate_statuses(self):
417 return self.get_history().list_all_mapupdate_statuses()
418 def list_all_publish_statuses(self):
419 return self.get_history().list_all_publish_statuses()
420 def list_all_retrieve_statuses(self):
421 return self.get_history().list_all_retrieve_statuses()
423 def list_all_helper_statuses(self):
425 helper = self.getServiceNamed("helper")
428 return helper.get_all_upload_statuses()