1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
5 from zope.interface import implements
6 from twisted.internet import reactor
7 from twisted.application.internet import TimerService
8 from foolscap import Referenceable
9 from foolscap.logging import log
10 from pycryptopp.publickey import rsa
13 from allmydata.storage.server import StorageServer
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.download import Downloader
16 from allmydata.immutable.filenode import FileNode, LiteralFileNode
17 from allmydata.immutable.offloaded import Helper
18 from allmydata.control import ControlServer
19 from allmydata.introducer.client import IntroducerClient
20 from allmydata.util import hashutil, base32, pollmixin, cachedir
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.uri import LiteralFileURI
23 from allmydata.dirnode import NewDirectoryNode
24 from allmydata.mutable.filenode import MutableFileNode
25 from allmydata.stats import StatsProvider
26 from allmydata.history import History
27 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
28 IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
36 class StubClient(Referenceable):
37 implements(RIStubClient)
40 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
42 class Client(node.Node, pollmixin.PollMixin):
43 implements(IStatsProducer)
45 PORTNUMFILE = "client.port"
48 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
50 # This means that if a storage server treats me as though I were a
51 # 1.0.0 storage client, it will work as they expect.
52 OLDEST_SUPPORTED_VERSION = "1.0.0"
54 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
55 # is the number of shares required to reconstruct a file. 'desired' means
56 # that we will abort an upload unless we can allocate space for at least
57 # this many. 'total' is the total number of shares created by encoding.
58 # If everybody has room then this is is how many we will upload.
59 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
62 "max_segment_size": 128*KiB,
65 def __init__(self, basedir="."):
66 node.Node.__init__(self, basedir)
67 self.started_timestamp = time.time()
68 self.logSource="Client"
69 self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
70 self.init_introducer_client()
71 self.init_stats_provider()
72 self.init_lease_secret()
75 if self.get_config("helper", "enabled", False, boolean=True):
78 self._key_generator = None
79 key_gen_furl = self.get_config("client", "key_generator.furl", None)
81 self.init_key_gen(key_gen_furl)
82 # ControlServer and Helper are attached after Tub startup
83 self.init_ftp_server()
84 self.init_sftp_server()
86 hotline_file = os.path.join(self.basedir,
87 self.SUICIDE_PREVENTION_HOTLINE_FILE)
88 if os.path.exists(hotline_file):
89 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
90 self.log("hotline file noticed (%ds old), starting timer" % age)
91 hotline = TimerService(1.0, self._check_hotline, hotline_file)
92 hotline.setServiceParent(self)
94 # this needs to happen last, so it can use getServiceNamed() to
95 # acquire references to StorageServer and other web-statusable things
96 webport = self.get_config("node", "web.port", None)
98 self.init_web(webport) # strports string
100 def read_old_config_files(self):
101 node.Node.read_old_config_files(self)
102 copy = self._copy_config_from_file
103 copy("introducer.furl", "client", "introducer.furl")
104 copy("helper.furl", "client", "helper.furl")
105 copy("key_generator.furl", "client", "key_generator.furl")
106 copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
107 if os.path.exists(os.path.join(self.basedir, "no_storage")):
108 self.set_config("storage", "enabled", "false")
109 if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
110 self.set_config("storage", "readonly", "true")
111 if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
112 self.set_config("storage", "debug_discard", "true")
113 if os.path.exists(os.path.join(self.basedir, "run_helper")):
114 self.set_config("helper", "enabled", "true")
116 def init_introducer_client(self):
117 self.introducer_furl = self.get_config("client", "introducer.furl")
118 ic = IntroducerClient(self.tub, self.introducer_furl,
120 str(allmydata.__full_version__),
121 str(self.OLDEST_SUPPORTED_VERSION))
122 self.introducer_client = ic
123 # hold off on starting the IntroducerClient until our tub has been
124 # started, so we'll have a useful address on our RemoteReference, so
125 # that the introducer's status page will show us.
126 d = self.when_tub_ready()
127 def _start_introducer_client(res):
128 ic.setServiceParent(self)
129 # nodes that want to upload and download will need storage servers
130 ic.subscribe_to("storage")
131 d.addCallback(_start_introducer_client)
132 d.addErrback(log.err, facility="tahoe.init",
133 level=log.BAD, umid="URyI5w")
135 def init_stats_provider(self):
136 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
137 self.stats_provider = StatsProvider(self, gatherer_furl)
138 self.add_service(self.stats_provider)
139 self.stats_provider.register_producer(self)
142 return { 'node.uptime': time.time() - self.started_timestamp }
144 def init_lease_secret(self):
145 secret_s = self.get_or_create_private_config("secret", _make_secret)
146 self._lease_secret = base32.a2b(secret_s)
148 def init_storage(self):
149 # should we run a storage server (and publish it for others to use)?
150 if not self.get_config("storage", "enabled", True, boolean=True):
152 readonly = self.get_config("storage", "readonly", False, boolean=True)
154 storedir = os.path.join(self.basedir, self.STOREDIR)
156 data = self.get_config("storage", "reserved_space", None)
159 reserved = parse_abbreviated_size(data)
161 log.msg("[storage]reserved_space= contains unparseable value %s"
165 discard = self.get_config("storage", "debug_discard", False,
167 ss = StorageServer(storedir, self.nodeid,
168 reserved_space=reserved,
169 discard_storage=discard,
170 readonly_storage=readonly,
171 stats_provider=self.stats_provider)
173 d = self.when_tub_ready()
174 # we can't do registerReference until the Tub is ready
176 furl_file = os.path.join(self.basedir, "private", "storage.furl")
177 furl = self.tub.registerReference(ss, furlFile=furl_file)
178 ri_name = RIStorageServer.__remote_name__
179 self.introducer_client.publish(furl, "storage", ri_name)
180 d.addCallback(_publish)
181 d.addErrback(log.err, facility="tahoe.init",
182 level=log.BAD, umid="aLGBKw")
184 def init_client(self):
185 helper_furl = self.get_config("client", "helper.furl", None)
186 DEP = self.DEFAULT_ENCODING_PARAMETERS
187 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
188 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
189 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
190 convergence_s = self.get_or_create_private_config('convergence', _make_secret)
191 self.convergence = base32.a2b(convergence_s)
192 self._node_cache = weakref.WeakValueDictionary() # uri -> node
193 self.add_service(History(self.stats_provider))
194 self.add_service(Uploader(helper_furl, self.stats_provider))
195 download_cachedir = os.path.join(self.basedir,
196 "private", "cache", "download")
197 self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
198 self.download_cache.setServiceParent(self)
199 self.add_service(Downloader(self.stats_provider))
200 self.init_stub_client()
202 def init_stub_client(self):
204 # we publish an empty object so that the introducer can count how
205 # many clients are connected and see what versions they're
208 furl = self.tub.registerReference(sc)
209 ri_name = RIStubClient.__remote_name__
210 self.introducer_client.publish(furl, "stub_client", ri_name)
211 d = self.when_tub_ready()
212 d.addCallback(_publish)
213 d.addErrback(log.err, facility="tahoe.init",
214 level=log.BAD, umid="OEHq3g")
216 def get_history(self):
217 return self.getServiceNamed("history")
219 def init_control(self):
220 d = self.when_tub_ready()
223 c.setServiceParent(self)
224 control_url = self.tub.registerReference(c)
225 self.write_private_config("control.furl", control_url + "\n")
226 d.addCallback(_publish)
227 d.addErrback(log.err, facility="tahoe.init",
228 level=log.BAD, umid="d3tNXA")
230 def init_helper(self):
231 d = self.when_tub_ready()
233 h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
234 h.setServiceParent(self)
235 # TODO: this is confusing. BASEDIR/private/helper.furl is created
236 # by the helper. BASEDIR/helper.furl is consumed by the client
237 # who wants to use the helper. I like having the filename be the
238 # same, since that makes 'cp' work smoothly, but the difference
239 # between config inputs and generated outputs is hard to see.
240 helper_furlfile = os.path.join(self.basedir,
241 "private", "helper.furl")
242 self.tub.registerReference(h, furlFile=helper_furlfile)
243 d.addCallback(_publish)
244 d.addErrback(log.err, facility="tahoe.init",
245 level=log.BAD, umid="K0mW5w")
247 def init_key_gen(self, key_gen_furl):
248 d = self.when_tub_ready()
249 def _subscribe(self):
250 self.tub.connectTo(key_gen_furl, self._got_key_generator)
251 d.addCallback(_subscribe)
252 d.addErrback(log.err, facility="tahoe.init",
253 level=log.BAD, umid="z9DMzw")
255 def _got_key_generator(self, key_generator):
256 self._key_generator = key_generator
257 key_generator.notifyOnDisconnect(self._lost_key_generator)
259 def _lost_key_generator(self):
260 self._key_generator = None
262 def get_servers(self, service_name):
263 """ Return frozenset of (peerid, versioned-rref) """
264 assert isinstance(service_name, str)
265 return self.introducer_client.get_peers(service_name)
267 def init_web(self, webport):
268 self.log("init_web(webport=%s)", args=(webport,))
270 from allmydata.webish import WebishServer
271 nodeurl_path = os.path.join(self.basedir, "node.url")
272 staticdir = self.get_config("node", "web.static", "public_html")
273 staticdir = os.path.expanduser(staticdir)
274 ws = WebishServer(self, webport, nodeurl_path, staticdir)
277 def init_ftp_server(self):
278 if self.get_config("ftpd", "enabled", False, boolean=True):
279 accountfile = self.get_config("ftpd", "accounts.file", None)
280 accounturl = self.get_config("ftpd", "accounts.url", None)
281 ftp_portstr = self.get_config("ftpd", "port", "8021")
283 from allmydata.frontends import ftpd
284 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
285 s.setServiceParent(self)
287 def init_sftp_server(self):
288 if self.get_config("sftpd", "enabled", False, boolean=True):
289 accountfile = self.get_config("sftpd", "accounts.file", None)
290 accounturl = self.get_config("sftpd", "accounts.url", None)
291 sftp_portstr = self.get_config("sftpd", "port", "8022")
292 pubkey_file = self.get_config("sftpd", "host_pubkey_file")
293 privkey_file = self.get_config("sftpd", "host_privkey_file")
295 from allmydata.frontends import sftpd
296 s = sftpd.SFTPServer(self, accountfile, accounturl,
297 sftp_portstr, pubkey_file, privkey_file)
298 s.setServiceParent(self)
300 def _check_hotline(self, hotline_file):
301 if os.path.exists(hotline_file):
302 mtime = os.stat(hotline_file)[stat.ST_MTIME]
303 if mtime > time.time() - 60.0:
306 self.log("hotline file too old, shutting down")
308 self.log("hotline file missing, shutting down")
311 def get_all_peerids(self):
312 return self.introducer_client.get_all_peerids()
313 def get_nickname_for_peerid(self, peerid):
314 return self.introducer_client.get_nickname_for_peerid(peerid)
316 def get_permuted_peers(self, service_name, key):
318 @return: list of (peerid, connection,)
320 assert isinstance(service_name, str)
321 assert isinstance(key, str)
322 return self.introducer_client.get_permuted_peers(service_name, key)
324 def get_encoding_parameters(self):
325 return self.DEFAULT_ENCODING_PARAMETERS
327 def connected_to_introducer(self):
328 if self.introducer_client:
329 return self.introducer_client.connected_to_introducer()
332 def get_renewal_secret(self):
333 return hashutil.my_renewal_secret_hash(self._lease_secret)
335 def get_cancel_secret(self):
336 return hashutil.my_cancel_secret_hash(self._lease_secret)
338 def debug_wait_for_client_connections(self, num_clients):
339 """Return a Deferred that fires (with None) when we have connections
340 to the given number of peers. Useful for tests that set up a
341 temporary test network and need to know when it is safe to proceed
342 with an upload or download."""
344 current_clients = list(self.get_all_peerids())
345 return len(current_clients) >= num_clients
346 d = self.poll(_check, 0.5)
347 d.addCallback(lambda res: None)
351 # these four methods are the primitives for creating filenodes and
352 # dirnodes. The first takes a URI and produces a filenode or (new-style)
353 # dirnode. The other three create brand-new filenodes/dirnodes.
355 def create_node_from_uri(self, u):
356 # this returns synchronously.
359 if u_s not in self._node_cache:
360 if IReadonlyNewDirectoryURI.providedBy(u):
361 # new-style read-only dirnodes
362 node = NewDirectoryNode(self).init_from_uri(u)
363 elif INewDirectoryURI.providedBy(u):
365 node = NewDirectoryNode(self).init_from_uri(u)
366 elif IFileURI.providedBy(u):
367 if isinstance(u, LiteralFileURI):
368 node = LiteralFileNode(u, self) # LIT
370 key = base32.b2a(u.storage_index)
371 cachefile = self.download_cache.get_file(key)
372 node = FileNode(u, self, cachefile) # CHK
374 assert IMutableFileURI.providedBy(u), u
375 node = MutableFileNode(self).init_from_uri(u)
376 self._node_cache[u_s] = node
377 return self._node_cache[u_s]
379 def create_empty_dirnode(self):
380 n = NewDirectoryNode(self)
381 d = n.create(self._generate_pubprivkeys)
382 d.addCallback(lambda res: n)
385 def create_mutable_file(self, contents=""):
386 n = MutableFileNode(self)
387 d = n.create(contents, self._generate_pubprivkeys)
388 d.addCallback(lambda res: n)
391 def _generate_pubprivkeys(self, key_size):
392 if self._key_generator:
393 d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
394 def make_key_objs((verifying_key, signing_key)):
395 v = rsa.create_verifying_key_from_string(verifying_key)
396 s = rsa.create_signing_key_from_string(signing_key)
398 d.addCallback(make_key_objs)
401 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
403 signer = rsa.generate(key_size)
404 verifier = signer.get_verifying_key()
405 return verifier, signer
407 def upload(self, uploadable):
408 uploader = self.getServiceNamed("uploader")
409 return uploader.upload(uploadable, history=self.get_history())
412 def list_all_upload_statuses(self):
413 return self.get_history().list_all_upload_statuses()
415 def list_all_download_statuses(self):
416 return self.get_history().list_all_download_statuses()
418 def list_all_mapupdate_statuses(self):
419 return self.get_history().list_all_mapupdate_statuses()
420 def list_all_publish_statuses(self):
421 return self.get_history().list_all_publish_statuses()
422 def list_all_retrieve_statuses(self):
423 return self.get_history().list_all_retrieve_statuses()
425 def list_all_helper_statuses(self):
427 helper = self.getServiceNamed("helper")
430 return helper.get_all_upload_statuses()