]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
nodeadmin: node stops itself if a hotline file hasn't been touched in 120 seconds...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
4
5 from zope.interface import implements
6 from twisted.internet import reactor
7 from twisted.application.internet import TimerService
8 from foolscap import Referenceable
9 from foolscap.logging import log
10 from pycryptopp.publickey import rsa
11
12 import allmydata
13 from allmydata.storage.server import StorageServer
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.download import Downloader
16 from allmydata.immutable.filenode import FileNode, LiteralFileNode
17 from allmydata.immutable.offloaded import Helper
18 from allmydata.control import ControlServer
19 from allmydata.introducer.client import IntroducerClient
20 from allmydata.util import hashutil, base32, pollmixin, cachedir
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.uri import LiteralFileURI
23 from allmydata.dirnode import NewDirectoryNode
24 from allmydata.mutable.filenode import MutableFileNode
25 from allmydata.stats import StatsProvider
26 from allmydata.history import History
27 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
28      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
29
30 KiB=1024
31 MiB=1024*KiB
32 GiB=1024*MiB
33 TiB=1024*GiB
34 PiB=1024*TiB
35
36 class StubClient(Referenceable):
37     implements(RIStubClient)
38
39 def _make_secret():
40     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
41
42 class Client(node.Node, pollmixin.PollMixin):
43     implements(IStatsProducer)
44
45     PORTNUMFILE = "client.port"
46     STOREDIR = 'storage'
47     NODETYPE = "client"
48     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
49
50     # This means that if a storage server treats me as though I were a
51     # 1.0.0 storage client, it will work as they expect.
52     OLDEST_SUPPORTED_VERSION = "1.0.0"
53
54     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
55     # is the number of shares required to reconstruct a file. 'desired' means
56     # that we will abort an upload unless we can allocate space for at least
57     # this many. 'total' is the total number of shares created by encoding.
58     # If everybody has room then this is is how many we will upload.
59     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
60                                    "happy": 7,
61                                    "n": 10,
62                                    "max_segment_size": 128*KiB,
63                                    }
64
65     def __init__(self, basedir="."):
66         node.Node.__init__(self, basedir)
67         self.started_timestamp = time.time()
68         self.logSource="Client"
69         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
70         self.init_introducer_client()
71         self.init_stats_provider()
72         self.init_lease_secret()
73         self.init_storage()
74         self.init_control()
75         if self.get_config("helper", "enabled", False, boolean=True):
76             self.init_helper()
77         self.init_client()
78         self._key_generator = None
79         key_gen_furl = self.get_config("client", "key_generator.furl", None)
80         if key_gen_furl:
81             self.init_key_gen(key_gen_furl)
82         # ControlServer and Helper are attached after Tub startup
83         self.init_ftp_server()
84         self.init_sftp_server()
85
86         hotline_file = os.path.join(self.basedir,
87                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
88         if os.path.exists(hotline_file):
89             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
90             self.log("hotline file noticed (%ds old), starting timer" % age)
91             hotline = TimerService(1.0, self._check_hotline, hotline_file)
92             hotline.setServiceParent(self)
93
94         # this needs to happen last, so it can use getServiceNamed() to
95         # acquire references to StorageServer and other web-statusable things
96         webport = self.get_config("node", "web.port", None)
97         if webport:
98             self.init_web(webport) # strports string
99
100     def read_old_config_files(self):
101         node.Node.read_old_config_files(self)
102         copy = self._copy_config_from_file
103         copy("introducer.furl", "client", "introducer.furl")
104         copy("helper.furl", "client", "helper.furl")
105         copy("key_generator.furl", "client", "key_generator.furl")
106         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
107         if os.path.exists(os.path.join(self.basedir, "no_storage")):
108             self.set_config("storage", "enabled", "false")
109         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
110             self.set_config("storage", "readonly", "true")
111         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
112             self.set_config("storage", "debug_discard", "true")
113         if os.path.exists(os.path.join(self.basedir, "run_helper")):
114             self.set_config("helper", "enabled", "true")
115
116     def init_introducer_client(self):
117         self.introducer_furl = self.get_config("client", "introducer.furl")
118         ic = IntroducerClient(self.tub, self.introducer_furl,
119                               self.nickname,
120                               str(allmydata.__full_version__),
121                               str(self.OLDEST_SUPPORTED_VERSION))
122         self.introducer_client = ic
123         # hold off on starting the IntroducerClient until our tub has been
124         # started, so we'll have a useful address on our RemoteReference, so
125         # that the introducer's status page will show us.
126         d = self.when_tub_ready()
127         def _start_introducer_client(res):
128             ic.setServiceParent(self)
129             # nodes that want to upload and download will need storage servers
130             ic.subscribe_to("storage")
131         d.addCallback(_start_introducer_client)
132         d.addErrback(log.err, facility="tahoe.init",
133                      level=log.BAD, umid="URyI5w")
134
135     def init_stats_provider(self):
136         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
137         self.stats_provider = StatsProvider(self, gatherer_furl)
138         self.add_service(self.stats_provider)
139         self.stats_provider.register_producer(self)
140
141     def get_stats(self):
142         return { 'node.uptime': time.time() - self.started_timestamp }
143
144     def init_lease_secret(self):
145         secret_s = self.get_or_create_private_config("secret", _make_secret)
146         self._lease_secret = base32.a2b(secret_s)
147
148     def init_storage(self):
149         # should we run a storage server (and publish it for others to use)?
150         if not self.get_config("storage", "enabled", True, boolean=True):
151             return
152         readonly = self.get_config("storage", "readonly", False, boolean=True)
153
154         storedir = os.path.join(self.basedir, self.STOREDIR)
155
156         data = self.get_config("storage", "reserved_space", None)
157         reserved = None
158         try:
159             reserved = parse_abbreviated_size(data)
160         except ValueError:
161             log.msg("[storage]reserved_space= contains unparseable value %s"
162                     % data)
163         if reserved is None:
164             reserved = 0
165         discard = self.get_config("storage", "debug_discard", False,
166                                   boolean=True)
167         ss = StorageServer(storedir, self.nodeid,
168                            reserved_space=reserved,
169                            discard_storage=discard,
170                            readonly_storage=readonly,
171                            stats_provider=self.stats_provider)
172         self.add_service(ss)
173         d = self.when_tub_ready()
174         # we can't do registerReference until the Tub is ready
175         def _publish(res):
176             furl_file = os.path.join(self.basedir, "private", "storage.furl")
177             furl = self.tub.registerReference(ss, furlFile=furl_file)
178             ri_name = RIStorageServer.__remote_name__
179             self.introducer_client.publish(furl, "storage", ri_name)
180         d.addCallback(_publish)
181         d.addErrback(log.err, facility="tahoe.init",
182                      level=log.BAD, umid="aLGBKw")
183
184     def init_client(self):
185         helper_furl = self.get_config("client", "helper.furl", None)
186         DEP = self.DEFAULT_ENCODING_PARAMETERS
187         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
188         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
189         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
190         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
191         self.convergence = base32.a2b(convergence_s)
192         self._node_cache = weakref.WeakValueDictionary() # uri -> node
193         self.add_service(History(self.stats_provider))
194         self.add_service(Uploader(helper_furl, self.stats_provider))
195         download_cachedir = os.path.join(self.basedir,
196                                          "private", "cache", "download")
197         self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
198         self.download_cache.setServiceParent(self)
199         self.add_service(Downloader(self.stats_provider))
200         self.init_stub_client()
201
202     def init_stub_client(self):
203         def _publish(res):
204             # we publish an empty object so that the introducer can count how
205             # many clients are connected and see what versions they're
206             # running.
207             sc = StubClient()
208             furl = self.tub.registerReference(sc)
209             ri_name = RIStubClient.__remote_name__
210             self.introducer_client.publish(furl, "stub_client", ri_name)
211         d = self.when_tub_ready()
212         d.addCallback(_publish)
213         d.addErrback(log.err, facility="tahoe.init",
214                      level=log.BAD, umid="OEHq3g")
215
216     def get_history(self):
217         return self.getServiceNamed("history")
218
219     def init_control(self):
220         d = self.when_tub_ready()
221         def _publish(res):
222             c = ControlServer()
223             c.setServiceParent(self)
224             control_url = self.tub.registerReference(c)
225             self.write_private_config("control.furl", control_url + "\n")
226         d.addCallback(_publish)
227         d.addErrback(log.err, facility="tahoe.init",
228                      level=log.BAD, umid="d3tNXA")
229
230     def init_helper(self):
231         d = self.when_tub_ready()
232         def _publish(self):
233             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
234             h.setServiceParent(self)
235             # TODO: this is confusing. BASEDIR/private/helper.furl is created
236             # by the helper. BASEDIR/helper.furl is consumed by the client
237             # who wants to use the helper. I like having the filename be the
238             # same, since that makes 'cp' work smoothly, but the difference
239             # between config inputs and generated outputs is hard to see.
240             helper_furlfile = os.path.join(self.basedir,
241                                            "private", "helper.furl")
242             self.tub.registerReference(h, furlFile=helper_furlfile)
243         d.addCallback(_publish)
244         d.addErrback(log.err, facility="tahoe.init",
245                      level=log.BAD, umid="K0mW5w")
246
247     def init_key_gen(self, key_gen_furl):
248         d = self.when_tub_ready()
249         def _subscribe(self):
250             self.tub.connectTo(key_gen_furl, self._got_key_generator)
251         d.addCallback(_subscribe)
252         d.addErrback(log.err, facility="tahoe.init",
253                      level=log.BAD, umid="z9DMzw")
254
255     def _got_key_generator(self, key_generator):
256         self._key_generator = key_generator
257         key_generator.notifyOnDisconnect(self._lost_key_generator)
258
259     def _lost_key_generator(self):
260         self._key_generator = None
261
262     def get_servers(self, service_name):
263         """ Return frozenset of (peerid, versioned-rref) """
264         assert isinstance(service_name, str)
265         return self.introducer_client.get_peers(service_name)
266
267     def init_web(self, webport):
268         self.log("init_web(webport=%s)", args=(webport,))
269
270         from allmydata.webish import WebishServer
271         nodeurl_path = os.path.join(self.basedir, "node.url")
272         staticdir = self.get_config("node", "web.static", "public_html")
273         staticdir = os.path.expanduser(staticdir)
274         ws = WebishServer(self, webport, nodeurl_path, staticdir)
275         self.add_service(ws)
276
277     def init_ftp_server(self):
278         if self.get_config("ftpd", "enabled", False, boolean=True):
279             accountfile = self.get_config("ftpd", "accounts.file", None)
280             accounturl = self.get_config("ftpd", "accounts.url", None)
281             ftp_portstr = self.get_config("ftpd", "port", "8021")
282
283             from allmydata.frontends import ftpd
284             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
285             s.setServiceParent(self)
286
287     def init_sftp_server(self):
288         if self.get_config("sftpd", "enabled", False, boolean=True):
289             accountfile = self.get_config("sftpd", "accounts.file", None)
290             accounturl = self.get_config("sftpd", "accounts.url", None)
291             sftp_portstr = self.get_config("sftpd", "port", "8022")
292             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
293             privkey_file = self.get_config("sftpd", "host_privkey_file")
294
295             from allmydata.frontends import sftpd
296             s = sftpd.SFTPServer(self, accountfile, accounturl,
297                                  sftp_portstr, pubkey_file, privkey_file)
298             s.setServiceParent(self)
299
300     def _check_hotline(self, hotline_file):
301         if os.path.exists(hotline_file):
302             mtime = os.stat(hotline_file)[stat.ST_MTIME]
303             if mtime > time.time() - 120.0:
304                 return
305             else:
306                 self.log("hotline file too old, shutting down")
307         else:
308             self.log("hotline file missing, shutting down")
309         reactor.stop()
310
311     def get_all_peerids(self):
312         return self.introducer_client.get_all_peerids()
313     def get_nickname_for_peerid(self, peerid):
314         return self.introducer_client.get_nickname_for_peerid(peerid)
315
316     def get_permuted_peers(self, service_name, key):
317         """
318         @return: list of (peerid, connection,)
319         """
320         assert isinstance(service_name, str)
321         assert isinstance(key, str)
322         return self.introducer_client.get_permuted_peers(service_name, key)
323
324     def get_encoding_parameters(self):
325         return self.DEFAULT_ENCODING_PARAMETERS
326
327     def connected_to_introducer(self):
328         if self.introducer_client:
329             return self.introducer_client.connected_to_introducer()
330         return False
331
332     def get_renewal_secret(self):
333         return hashutil.my_renewal_secret_hash(self._lease_secret)
334
335     def get_cancel_secret(self):
336         return hashutil.my_cancel_secret_hash(self._lease_secret)
337
338     def debug_wait_for_client_connections(self, num_clients):
339         """Return a Deferred that fires (with None) when we have connections
340         to the given number of peers. Useful for tests that set up a
341         temporary test network and need to know when it is safe to proceed
342         with an upload or download."""
343         def _check():
344             current_clients = list(self.get_all_peerids())
345             return len(current_clients) >= num_clients
346         d = self.poll(_check, 0.5)
347         d.addCallback(lambda res: None)
348         return d
349
350
351     # these four methods are the primitives for creating filenodes and
352     # dirnodes. The first takes a URI and produces a filenode or (new-style)
353     # dirnode. The other three create brand-new filenodes/dirnodes.
354
355     def create_node_from_uri(self, u):
356         # this returns synchronously.
357         u = IURI(u)
358         u_s = u.to_string()
359         if u_s not in self._node_cache:
360             if IReadonlyNewDirectoryURI.providedBy(u):
361                 # new-style read-only dirnodes
362                 node = NewDirectoryNode(self).init_from_uri(u)
363             elif INewDirectoryURI.providedBy(u):
364                 # new-style dirnodes
365                 node = NewDirectoryNode(self).init_from_uri(u)
366             elif IFileURI.providedBy(u):
367                 if isinstance(u, LiteralFileURI):
368                     node = LiteralFileNode(u, self) # LIT
369                 else:
370                     key = base32.b2a(u.storage_index)
371                     cachefile = self.download_cache.get_file(key)
372                     node = FileNode(u, self, cachefile) # CHK
373             else:
374                 assert IMutableFileURI.providedBy(u), u
375                 node = MutableFileNode(self).init_from_uri(u)
376             self._node_cache[u_s] = node
377         return self._node_cache[u_s]
378
379     def create_empty_dirnode(self):
380         n = NewDirectoryNode(self)
381         d = n.create(self._generate_pubprivkeys)
382         d.addCallback(lambda res: n)
383         return d
384
385     def create_mutable_file(self, contents=""):
386         n = MutableFileNode(self)
387         d = n.create(contents, self._generate_pubprivkeys)
388         d.addCallback(lambda res: n)
389         return d
390
391     def _generate_pubprivkeys(self, key_size):
392         if self._key_generator:
393             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
394             def make_key_objs((verifying_key, signing_key)):
395                 v = rsa.create_verifying_key_from_string(verifying_key)
396                 s = rsa.create_signing_key_from_string(signing_key)
397                 return v, s
398             d.addCallback(make_key_objs)
399             return d
400         else:
401             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
402             # secs
403             signer = rsa.generate(key_size)
404             verifier = signer.get_verifying_key()
405             return verifier, signer
406
407     def upload(self, uploadable):
408         uploader = self.getServiceNamed("uploader")
409         return uploader.upload(uploadable, history=self.get_history())
410
411
412     def list_all_upload_statuses(self):
413         return self.get_history().list_all_upload_statuses()
414
415     def list_all_download_statuses(self):
416         return self.get_history().list_all_download_statuses()
417
418     def list_all_mapupdate_statuses(self):
419         return self.get_history().list_all_mapupdate_statuses()
420     def list_all_publish_statuses(self):
421         return self.get_history().list_all_publish_statuses()
422     def list_all_retrieve_statuses(self):
423         return self.get_history().list_all_retrieve_statuses()
424
425     def list_all_helper_statuses(self):
426         try:
427             helper = self.getServiceNamed("helper")
428         except KeyError:
429             return []
430         return helper.get_all_upload_statuses()