2 import os, stat, time, weakref
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
6 from zope.interface import implements
7 from twisted.internet import reactor
8 from twisted.application.internet import TimerService
9 from foolscap import Referenceable
10 from foolscap.logging import log
11 from pycryptopp.publickey import rsa
14 from allmydata.storage import StorageServer
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.download import Downloader
17 from allmydata.immutable.filenode import FileNode, LiteralFileNode
18 from allmydata.offloaded import Helper
19 from allmydata.control import ControlServer
20 from allmydata.introducer.client import IntroducerClient
21 from allmydata.util import hashutil, base32, pollmixin, cachedir
22 from allmydata.util.abbreviate import parse_abbreviated_size
23 from allmydata.uri import LiteralFileURI
24 from allmydata.dirnode import NewDirectoryNode
25 from allmydata.mutable.node import MutableFileNode, MutableWatcher
26 from allmydata.stats import StatsProvider
27 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
28 IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
36 class StubClient(Referenceable):
37 implements(RIStubClient)
40 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
42 class Client(node.Node, pollmixin.PollMixin):
43 implements(IStatsProducer)
45 PORTNUMFILE = "client.port"
48 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
50 # This means that if a storage server treats me as though I were a
51 # 1.0.0 storage client, it will work as they expect.
52 OLDEST_SUPPORTED_VERSION = "1.0.0"
54 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
55 # is the number of shares required to reconstruct a file. 'desired' means
56 # that we will abort an upload unless we can allocate space for at least
57 # this many. 'total' is the total number of shares created by encoding.
58 # If everybody has room then this is is how many we will upload.
59 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
62 "max_segment_size": 128*KiB,
65 def __init__(self, basedir="."):
66 node.Node.__init__(self, basedir)
67 self.started_timestamp = time.time()
68 self.logSource="Client"
69 self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
70 self.init_introducer_client()
71 self.init_stats_provider()
72 self.init_lease_secret()
75 if self.get_config("helper", "enabled", False, boolean=True):
78 self._key_generator = None
79 key_gen_furl = self.get_config("client", "key_generator.furl", None)
81 self.init_key_gen(key_gen_furl)
82 # ControlServer and Helper are attached after Tub startup
83 self.init_ftp_server()
84 self.init_sftp_server()
86 hotline_file = os.path.join(self.basedir,
87 self.SUICIDE_PREVENTION_HOTLINE_FILE)
88 if os.path.exists(hotline_file):
89 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
90 self.log("hotline file noticed (%ds old), starting timer" % age)
91 hotline = TimerService(1.0, self._check_hotline, hotline_file)
92 hotline.setServiceParent(self)
94 webport = self.get_config("node", "web.port", None)
96 self.init_web(webport) # strports string
98 def read_old_config_files(self):
99 node.Node.read_old_config_files(self)
100 copy = self._copy_config_from_file
101 copy("introducer.furl", "client", "introducer.furl")
102 copy("helper.furl", "client", "helper.furl")
103 copy("key_generator.furl", "client", "key_generator.furl")
104 copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
105 if os.path.exists(os.path.join(self.basedir, "no_storage")):
106 self.set_config("storage", "enabled", "false")
107 if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
108 self.set_config("storage", "readonly", "true")
109 if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
110 self.set_config("storage", "debug_discard", "true")
111 if os.path.exists(os.path.join(self.basedir, "run_helper")):
112 self.set_config("helper", "enabled", "true")
114 def init_introducer_client(self):
115 self.introducer_furl = self.get_config("client", "introducer.furl")
116 ic = IntroducerClient(self.tub, self.introducer_furl,
118 str(allmydata.__version__),
119 str(self.OLDEST_SUPPORTED_VERSION))
120 self.introducer_client = ic
121 # hold off on starting the IntroducerClient until our tub has been
122 # started, so we'll have a useful address on our RemoteReference, so
123 # that the introducer's status page will show us.
124 d = self.when_tub_ready()
125 def _start_introducer_client(res):
126 ic.setServiceParent(self)
127 # nodes that want to upload and download will need storage servers
128 ic.subscribe_to("storage")
129 d.addCallback(_start_introducer_client)
130 d.addErrback(log.err, facility="tahoe.init",
131 level=log.BAD, umid="URyI5w")
133 def init_stats_provider(self):
134 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
135 self.stats_provider = StatsProvider(self, gatherer_furl)
136 self.add_service(self.stats_provider)
137 self.stats_provider.register_producer(self)
140 return { 'node.uptime': time.time() - self.started_timestamp }
142 def init_lease_secret(self):
143 secret_s = self.get_or_create_private_config("secret", _make_secret)
144 self._lease_secret = base32.a2b(secret_s)
146 def init_storage(self):
147 # should we run a storage server (and publish it for others to use)?
148 if not self.get_config("storage", "enabled", True, boolean=True):
150 readonly = self.get_config("storage", "readonly", False, boolean=True)
152 storedir = os.path.join(self.basedir, self.STOREDIR)
154 data = self.get_config("storage", "reserved_space", None)
157 reserved = parse_abbreviated_size(data)
159 log.msg("[storage]reserved_space= contains unparseable value %s"
163 discard = self.get_config("storage", "debug_discard", False,
165 ss = StorageServer(storedir,
166 reserved_space=reserved,
167 discard_storage=discard,
168 readonly_storage=readonly,
169 stats_provider=self.stats_provider)
171 d = self.when_tub_ready()
172 # we can't do registerReference until the Tub is ready
174 furl_file = os.path.join(self.basedir, "private", "storage.furl")
175 furl = self.tub.registerReference(ss, furlFile=furl_file)
176 ri_name = RIStorageServer.__remote_name__
177 self.introducer_client.publish(furl, "storage", ri_name)
178 d.addCallback(_publish)
179 d.addErrback(log.err, facility="tahoe.init",
180 level=log.BAD, umid="aLGBKw")
182 def init_client(self):
183 helper_furl = self.get_config("client", "helper.furl", None)
184 DEP = self.DEFAULT_ENCODING_PARAMETERS
185 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
186 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
187 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
188 convergence_s = self.get_or_create_private_config('convergence', _make_secret)
189 self.convergence = base32.a2b(convergence_s)
190 self._node_cache = weakref.WeakValueDictionary() # uri -> node
191 self.add_service(Uploader(helper_furl, self.stats_provider))
192 download_cachedir = os.path.join(self.basedir,
193 "private", "cache", "download")
194 self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
195 self.download_cache.setServiceParent(self)
196 self.add_service(Downloader(self.stats_provider))
197 self.add_service(MutableWatcher(self.stats_provider))
199 # we publish an empty object so that the introducer can count how
200 # many clients are connected and see what versions they're
203 furl = self.tub.registerReference(sc)
204 ri_name = RIStubClient.__remote_name__
205 self.introducer_client.publish(furl, "stub_client", ri_name)
206 d = self.when_tub_ready()
207 d.addCallback(_publish)
208 d.addErrback(log.err, facility="tahoe.init",
209 level=log.BAD, umid="OEHq3g")
211 def init_control(self):
212 d = self.when_tub_ready()
215 c.setServiceParent(self)
216 control_url = self.tub.registerReference(c)
217 self.write_private_config("control.furl", control_url + "\n")
218 d.addCallback(_publish)
219 d.addErrback(log.err, facility="tahoe.init",
220 level=log.BAD, umid="d3tNXA")
222 def init_helper(self):
223 d = self.when_tub_ready()
225 h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
226 h.setServiceParent(self)
227 # TODO: this is confusing. BASEDIR/private/helper.furl is created
228 # by the helper. BASEDIR/helper.furl is consumed by the client
229 # who wants to use the helper. I like having the filename be the
230 # same, since that makes 'cp' work smoothly, but the difference
231 # between config inputs and generated outputs is hard to see.
232 helper_furlfile = os.path.join(self.basedir,
233 "private", "helper.furl")
234 self.tub.registerReference(h, furlFile=helper_furlfile)
235 d.addCallback(_publish)
236 d.addErrback(log.err, facility="tahoe.init",
237 level=log.BAD, umid="K0mW5w")
239 def init_key_gen(self, key_gen_furl):
240 d = self.when_tub_ready()
241 def _subscribe(self):
242 self.tub.connectTo(key_gen_furl, self._got_key_generator)
243 d.addCallback(_subscribe)
244 d.addErrback(log.err, facility="tahoe.init",
245 level=log.BAD, umid="z9DMzw")
247 def _got_key_generator(self, key_generator):
248 self._key_generator = key_generator
249 key_generator.notifyOnDisconnect(self._lost_key_generator)
251 def _lost_key_generator(self):
252 self._key_generator = None
254 def init_web(self, webport):
255 self.log("init_web(webport=%s)", args=(webport,))
257 from allmydata.webish import WebishServer
258 nodeurl_path = os.path.join(self.basedir, "node.url")
259 staticdir = self.get_config("node", "web.static", "public_html")
260 staticdir = os.path.expanduser(staticdir)
261 ws = WebishServer(webport, nodeurl_path, staticdir)
264 def init_ftp_server(self):
265 if self.get_config("ftpd", "enabled", False, boolean=True):
266 accountfile = self.get_config("ftpd", "accounts.file", None)
267 accounturl = self.get_config("ftpd", "accounts.url", None)
268 ftp_portstr = self.get_config("ftpd", "port", "8021")
270 from allmydata.frontends import ftpd
271 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
272 s.setServiceParent(self)
274 def init_sftp_server(self):
275 if self.get_config("sftpd", "enabled", False, boolean=True):
276 accountfile = self.get_config("sftpd", "accounts.file", None)
277 accounturl = self.get_config("sftpd", "accounts.url", None)
278 sftp_portstr = self.get_config("sftpd", "port", "8022")
279 pubkey_file = self.get_config("sftpd", "host_pubkey_file")
280 privkey_file = self.get_config("sftpd", "host_privkey_file")
282 from allmydata.frontends import sftpd
283 s = sftpd.SFTPServer(self, accountfile, accounturl,
284 sftp_portstr, pubkey_file, privkey_file)
285 s.setServiceParent(self)
287 def _check_hotline(self, hotline_file):
288 if os.path.exists(hotline_file):
289 mtime = os.stat(hotline_file)[stat.ST_MTIME]
290 if mtime > time.time() - 20.0:
293 self.log("hotline file too old, shutting down")
295 self.log("hotline file missing, shutting down")
298 def get_all_peerids(self):
299 return self.introducer_client.get_all_peerids()
300 def get_nickname_for_peerid(self, peerid):
301 return self.introducer_client.get_nickname_for_peerid(peerid)
303 def get_permuted_peers(self, service_name, key):
305 @return: list of (peerid, connection,)
307 assert isinstance(service_name, str)
308 assert isinstance(key, str)
309 return self.introducer_client.get_permuted_peers(service_name, key)
311 def get_encoding_parameters(self):
312 return self.DEFAULT_ENCODING_PARAMETERS
314 def connected_to_introducer(self):
315 if self.introducer_client:
316 return self.introducer_client.connected_to_introducer()
319 def get_renewal_secret(self):
320 return hashutil.my_renewal_secret_hash(self._lease_secret)
322 def get_cancel_secret(self):
323 return hashutil.my_cancel_secret_hash(self._lease_secret)
325 def debug_wait_for_client_connections(self, num_clients):
326 """Return a Deferred that fires (with None) when we have connections
327 to the given number of peers. Useful for tests that set up a
328 temporary test network and need to know when it is safe to proceed
329 with an upload or download."""
331 current_clients = list(self.get_all_peerids())
332 return len(current_clients) >= num_clients
333 d = self.poll(_check, 0.5)
334 d.addCallback(lambda res: None)
338 # these four methods are the primitives for creating filenodes and
339 # dirnodes. The first takes a URI and produces a filenode or (new-style)
340 # dirnode. The other three create brand-new filenodes/dirnodes.
342 def create_node_from_uri(self, u):
343 # this returns synchronously.
346 if u_s not in self._node_cache:
347 if IReadonlyNewDirectoryURI.providedBy(u):
348 # new-style read-only dirnodes
349 node = NewDirectoryNode(self).init_from_uri(u)
350 elif INewDirectoryURI.providedBy(u):
352 node = NewDirectoryNode(self).init_from_uri(u)
353 elif IFileURI.providedBy(u):
354 if isinstance(u, LiteralFileURI):
355 node = LiteralFileNode(u, self) # LIT
357 key = base32.b2a(u.storage_index)
358 cachefile = self.download_cache.get_file(key)
359 node = FileNode(u, self, cachefile) # CHK
361 assert IMutableFileURI.providedBy(u), u
362 node = MutableFileNode(self).init_from_uri(u)
363 self._node_cache[u_s] = node
364 return self._node_cache[u_s]
366 def notify_publish(self, publish_status, size):
367 self.getServiceNamed("mutable-watcher").notify_publish(publish_status,
369 def notify_retrieve(self, retrieve_status):
370 self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
371 def notify_mapupdate(self, update_status):
372 self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status)
374 def create_empty_dirnode(self):
375 n = NewDirectoryNode(self)
376 d = n.create(self._generate_pubprivkeys)
377 d.addCallback(lambda res: n)
380 def create_mutable_file(self, contents=""):
381 n = MutableFileNode(self)
382 d = n.create(contents, self._generate_pubprivkeys)
383 d.addCallback(lambda res: n)
386 def _generate_pubprivkeys(self, key_size):
387 if self._key_generator:
388 d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
389 def make_key_objs((verifying_key, signing_key)):
390 v = rsa.create_verifying_key_from_string(verifying_key)
391 s = rsa.create_signing_key_from_string(signing_key)
393 d.addCallback(make_key_objs)
396 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
398 signer = rsa.generate(key_size)
399 verifier = signer.get_verifying_key()
400 return verifier, signer
402 def upload(self, uploadable):
403 uploader = self.getServiceNamed("uploader")
404 return uploader.upload(uploadable)
407 def list_all_upload_statuses(self):
408 uploader = self.getServiceNamed("uploader")
409 return uploader.list_all_upload_statuses()
411 def list_all_download_statuses(self):
412 downloader = self.getServiceNamed("downloader")
413 return downloader.list_all_download_statuses()
415 def list_all_mapupdate_statuses(self):
416 watcher = self.getServiceNamed("mutable-watcher")
417 return watcher.list_all_mapupdate_statuses()
418 def list_all_publish_statuses(self):
419 watcher = self.getServiceNamed("mutable-watcher")
420 return watcher.list_all_publish_statuses()
421 def list_all_retrieve_statuses(self):
422 watcher = self.getServiceNamed("mutable-watcher")
423 return watcher.list_all_retrieve_statuses()
425 def list_all_helper_statuses(self):
427 helper = self.getServiceNamed("helper")
430 return helper.get_all_upload_statuses()