2 import os, stat, time, re, weakref
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
6 from zope.interface import implements
7 from twisted.internet import reactor
8 from twisted.application.internet import TimerService
9 from foolscap import Referenceable
10 from foolscap.logging import log
11 from pycryptopp.publickey import rsa
14 from allmydata.storage import StorageServer
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.download import Downloader
17 from allmydata.immutable.filenode import FileNode, LiteralFileNode
18 from allmydata.offloaded import Helper
19 from allmydata.control import ControlServer
20 from allmydata.introducer.client import IntroducerClient
21 from allmydata.util import hashutil, base32, pollmixin, cachedir
22 from allmydata.uri import LiteralFileURI
23 from allmydata.dirnode import NewDirectoryNode
24 from allmydata.mutable.node import MutableFileNode, MutableWatcher
25 from allmydata.stats import StatsProvider
26 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
27 IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
35 class StubClient(Referenceable):
36 implements(RIStubClient)
39 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
41 class Client(node.Node, pollmixin.PollMixin):
42 implements(IStatsProducer)
44 PORTNUMFILE = "client.port"
47 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
49 # This means that if a storage server treats me as though I were a
50 # 1.0.0 storage client, it will work as they expect.
51 OLDEST_SUPPORTED_VERSION = "1.0.0"
53 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
54 # is the number of shares required to reconstruct a file. 'desired' means
55 # that we will abort an upload unless we can allocate space for at least
56 # this many. 'total' is the total number of shares created by encoding.
57 # If everybody has room then this is is how many we will upload.
58 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
61 "max_segment_size": 128*KiB,
64 def __init__(self, basedir="."):
65 node.Node.__init__(self, basedir)
66 self.started_timestamp = time.time()
67 self.logSource="Client"
68 self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
69 self.init_introducer_client()
70 self.init_stats_provider()
71 self.init_lease_secret()
74 if self.get_config("helper", "enabled", False, boolean=True):
77 self._key_generator = None
78 key_gen_furl = self.get_config("client", "key_generator.furl", None)
80 self.init_key_gen(key_gen_furl)
81 # ControlServer and Helper are attached after Tub startup
82 self.init_ftp_server()
83 self.init_sftp_server()
85 hotline_file = os.path.join(self.basedir,
86 self.SUICIDE_PREVENTION_HOTLINE_FILE)
87 if os.path.exists(hotline_file):
88 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
89 self.log("hotline file noticed (%ds old), starting timer" % age)
90 hotline = TimerService(1.0, self._check_hotline, hotline_file)
91 hotline.setServiceParent(self)
93 webport = self.get_config("node", "web.port", None)
95 self.init_web(webport) # strports string
97 def read_old_config_files(self):
98 node.Node.read_old_config_files(self)
99 copy = self._copy_config_from_file
100 copy("introducer.furl", "client", "introducer.furl")
101 copy("helper.furl", "client", "helper.furl")
102 copy("key_generator.furl", "client", "key_generator.furl")
103 copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
104 if os.path.exists(os.path.join(self.basedir, "no_storage")):
105 self.set_config("storage", "enabled", "false")
106 if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
107 self.set_config("storage", "readonly", "true")
108 copy("sizelimit", "storage", "sizelimit")
109 if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
110 self.set_config("storage", "debug_discard", "true")
111 if os.path.exists(os.path.join(self.basedir, "run_helper")):
112 self.set_config("helper", "enabled", "true")
114 def init_introducer_client(self):
115 self.introducer_furl = self.get_config("client", "introducer.furl")
116 ic = IntroducerClient(self.tub, self.introducer_furl,
118 str(allmydata.__version__),
119 str(self.OLDEST_SUPPORTED_VERSION))
120 self.introducer_client = ic
121 # hold off on starting the IntroducerClient until our tub has been
122 # started, so we'll have a useful address on our RemoteReference, so
123 # that the introducer's status page will show us.
124 d = self.when_tub_ready()
125 def _start_introducer_client(res):
126 ic.setServiceParent(self)
127 # nodes that want to upload and download will need storage servers
128 ic.subscribe_to("storage")
129 d.addCallback(_start_introducer_client)
130 d.addErrback(log.err, facility="tahoe.init",
131 level=log.BAD, umid="URyI5w")
133 def init_stats_provider(self):
134 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
135 self.stats_provider = StatsProvider(self, gatherer_furl)
136 self.add_service(self.stats_provider)
137 self.stats_provider.register_producer(self)
140 return { 'node.uptime': time.time() - self.started_timestamp }
142 def init_lease_secret(self):
143 secret_s = self.get_or_create_private_config("secret", _make_secret)
144 self._lease_secret = base32.a2b(secret_s)
146 def init_storage(self):
147 # should we run a storage server (and publish it for others to use)?
148 if not self.get_config("storage", "enabled", True, boolean=True):
150 readonly = self.get_config("storage", "readonly", False, boolean=True)
152 storedir = os.path.join(self.basedir, self.STOREDIR)
155 data = self.get_config("storage", "sizelimit", None)
157 m = re.match(r"^(\d+)([kKmMgG]?[bB]?)$", data)
159 log.msg("SIZELIMIT_FILE contains unparseable value %s" % data)
161 number, suffix = m.groups()
162 suffix = suffix.upper()
163 if suffix.endswith("B"):
168 "G": 1000 * 1000 * 1000,
170 sizelimit = int(number) * multiplier
171 discard = self.get_config("storage", "debug_discard", False,
173 ss = StorageServer(storedir, sizelimit, discard, readonly,
176 d = self.when_tub_ready()
177 # we can't do registerReference until the Tub is ready
179 furl_file = os.path.join(self.basedir, "private", "storage.furl")
180 furl = self.tub.registerReference(ss, furlFile=furl_file)
181 ri_name = RIStorageServer.__remote_name__
182 self.introducer_client.publish(furl, "storage", ri_name)
183 d.addCallback(_publish)
184 d.addErrback(log.err, facility="tahoe.init",
185 level=log.BAD, umid="aLGBKw")
187 def init_client(self):
188 helper_furl = self.get_config("client", "helper.furl", None)
189 DEP = self.DEFAULT_ENCODING_PARAMETERS
190 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
191 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
192 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
193 convergence_s = self.get_or_create_private_config('convergence', _make_secret)
194 self.convergence = base32.a2b(convergence_s)
195 self._node_cache = weakref.WeakValueDictionary() # uri -> node
196 self.add_service(Uploader(helper_furl, self.stats_provider))
197 download_cachedir = os.path.join(self.basedir,
198 "private", "cache", "download")
199 self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
200 self.download_cache.setServiceParent(self)
201 self.add_service(Downloader(self.stats_provider))
202 self.add_service(MutableWatcher(self.stats_provider))
204 # we publish an empty object so that the introducer can count how
205 # many clients are connected and see what versions they're
208 furl = self.tub.registerReference(sc)
209 ri_name = RIStubClient.__remote_name__
210 self.introducer_client.publish(furl, "stub_client", ri_name)
211 d = self.when_tub_ready()
212 d.addCallback(_publish)
213 d.addErrback(log.err, facility="tahoe.init",
214 level=log.BAD, umid="OEHq3g")
216 def init_control(self):
217 d = self.when_tub_ready()
220 c.setServiceParent(self)
221 control_url = self.tub.registerReference(c)
222 self.write_private_config("control.furl", control_url + "\n")
223 d.addCallback(_publish)
224 d.addErrback(log.err, facility="tahoe.init",
225 level=log.BAD, umid="d3tNXA")
227 def init_helper(self):
228 d = self.when_tub_ready()
230 h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
231 h.setServiceParent(self)
232 # TODO: this is confusing. BASEDIR/private/helper.furl is created
233 # by the helper. BASEDIR/helper.furl is consumed by the client
234 # who wants to use the helper. I like having the filename be the
235 # same, since that makes 'cp' work smoothly, but the difference
236 # between config inputs and generated outputs is hard to see.
237 helper_furlfile = os.path.join(self.basedir,
238 "private", "helper.furl")
239 self.tub.registerReference(h, furlFile=helper_furlfile)
240 d.addCallback(_publish)
241 d.addErrback(log.err, facility="tahoe.init",
242 level=log.BAD, umid="K0mW5w")
244 def init_key_gen(self, key_gen_furl):
245 d = self.when_tub_ready()
246 def _subscribe(self):
247 self.tub.connectTo(key_gen_furl, self._got_key_generator)
248 d.addCallback(_subscribe)
249 d.addErrback(log.err, facility="tahoe.init",
250 level=log.BAD, umid="z9DMzw")
252 def _got_key_generator(self, key_generator):
253 self._key_generator = key_generator
254 key_generator.notifyOnDisconnect(self._lost_key_generator)
256 def _lost_key_generator(self):
257 self._key_generator = None
259 def init_web(self, webport):
260 self.log("init_web(webport=%s)", args=(webport,))
262 from allmydata.webish import WebishServer
263 nodeurl_path = os.path.join(self.basedir, "node.url")
264 staticdir = self.get_config("node", "web.static", "public_html")
265 staticdir = os.path.expanduser(staticdir)
266 ws = WebishServer(webport, nodeurl_path, staticdir)
269 def init_ftp_server(self):
270 if self.get_config("ftpd", "enabled", False, boolean=True):
271 accountfile = self.get_config("ftpd", "accounts.file", None)
272 accounturl = self.get_config("ftpd", "accounts.url", None)
273 ftp_portstr = self.get_config("ftpd", "port", "8021")
275 from allmydata.frontends import ftpd
276 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
277 s.setServiceParent(self)
279 def init_sftp_server(self):
280 if self.get_config("sftpd", "enabled", False, boolean=True):
281 accountfile = self.get_config("sftpd", "accounts.file", None)
282 accounturl = self.get_config("sftpd", "accounts.url", None)
283 sftp_portstr = self.get_config("sftpd", "port", "8022")
284 pubkey_file = self.get_config("sftpd", "host_pubkey_file")
285 privkey_file = self.get_config("sftpd", "host_privkey_file")
287 from allmydata.frontends import sftpd
288 s = sftpd.SFTPServer(self, accountfile, accounturl,
289 sftp_portstr, pubkey_file, privkey_file)
290 s.setServiceParent(self)
292 def _check_hotline(self, hotline_file):
293 if os.path.exists(hotline_file):
294 mtime = os.stat(hotline_file)[stat.ST_MTIME]
295 if mtime > time.time() - 20.0:
298 self.log("hotline file too old, shutting down")
300 self.log("hotline file missing, shutting down")
303 def get_all_peerids(self):
304 return self.introducer_client.get_all_peerids()
305 def get_nickname_for_peerid(self, peerid):
306 return self.introducer_client.get_nickname_for_peerid(peerid)
308 def get_permuted_peers(self, service_name, key):
310 @return: list of (peerid, connection,)
312 assert isinstance(service_name, str)
313 assert isinstance(key, str)
314 return self.introducer_client.get_permuted_peers(service_name, key)
316 def get_encoding_parameters(self):
317 return self.DEFAULT_ENCODING_PARAMETERS
319 def connected_to_introducer(self):
320 if self.introducer_client:
321 return self.introducer_client.connected_to_introducer()
324 def get_renewal_secret(self):
325 return hashutil.my_renewal_secret_hash(self._lease_secret)
327 def get_cancel_secret(self):
328 return hashutil.my_cancel_secret_hash(self._lease_secret)
330 def debug_wait_for_client_connections(self, num_clients):
331 """Return a Deferred that fires (with None) when we have connections
332 to the given number of peers. Useful for tests that set up a
333 temporary test network and need to know when it is safe to proceed
334 with an upload or download."""
336 current_clients = list(self.get_all_peerids())
337 return len(current_clients) >= num_clients
338 d = self.poll(_check, 0.5)
339 d.addCallback(lambda res: None)
343 # these four methods are the primitives for creating filenodes and
344 # dirnodes. The first takes a URI and produces a filenode or (new-style)
345 # dirnode. The other three create brand-new filenodes/dirnodes.
347 def create_node_from_uri(self, u):
348 # this returns synchronously.
351 if u_s not in self._node_cache:
352 if IReadonlyNewDirectoryURI.providedBy(u):
353 # new-style read-only dirnodes
354 node = NewDirectoryNode(self).init_from_uri(u)
355 elif INewDirectoryURI.providedBy(u):
357 node = NewDirectoryNode(self).init_from_uri(u)
358 elif IFileURI.providedBy(u):
359 if isinstance(u, LiteralFileURI):
360 node = LiteralFileNode(u, self) # LIT
362 key = base32.b2a(u.storage_index)
363 cachefile = self.download_cache.get_file(key)
364 node = FileNode(u, self, cachefile) # CHK
366 assert IMutableFileURI.providedBy(u), u
367 node = MutableFileNode(self).init_from_uri(u)
368 self._node_cache[u_s] = node
369 return self._node_cache[u_s]
371 def notify_publish(self, publish_status, size):
372 self.getServiceNamed("mutable-watcher").notify_publish(publish_status,
374 def notify_retrieve(self, retrieve_status):
375 self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
376 def notify_mapupdate(self, update_status):
377 self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status)
379 def create_empty_dirnode(self):
380 n = NewDirectoryNode(self)
381 d = n.create(self._generate_pubprivkeys)
382 d.addCallback(lambda res: n)
385 def create_mutable_file(self, contents=""):
386 n = MutableFileNode(self)
387 d = n.create(contents, self._generate_pubprivkeys)
388 d.addCallback(lambda res: n)
391 def _generate_pubprivkeys(self, key_size):
392 if self._key_generator:
393 d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
394 def make_key_objs((verifying_key, signing_key)):
395 v = rsa.create_verifying_key_from_string(verifying_key)
396 s = rsa.create_signing_key_from_string(signing_key)
398 d.addCallback(make_key_objs)
401 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
403 signer = rsa.generate(key_size)
404 verifier = signer.get_verifying_key()
405 return verifier, signer
407 def upload(self, uploadable):
408 uploader = self.getServiceNamed("uploader")
409 return uploader.upload(uploadable)
412 def list_all_upload_statuses(self):
413 uploader = self.getServiceNamed("uploader")
414 return uploader.list_all_upload_statuses()
416 def list_all_download_statuses(self):
417 downloader = self.getServiceNamed("downloader")
418 return downloader.list_all_download_statuses()
420 def list_all_mapupdate_statuses(self):
421 watcher = self.getServiceNamed("mutable-watcher")
422 return watcher.list_all_mapupdate_statuses()
423 def list_all_publish_statuses(self):
424 watcher = self.getServiceNamed("mutable-watcher")
425 return watcher.list_all_publish_statuses()
426 def list_all_retrieve_statuses(self):
427 watcher = self.getServiceNamed("mutable-watcher")
428 return watcher.list_all_retrieve_statuses()
430 def list_all_helper_statuses(self):
432 helper = self.getServiceNamed("helper")
435 return helper.get_all_upload_statuses()