]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
versioning: include an "appname" in the application version string in the versioning...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
4
5 from zope.interface import implements
6 from twisted.internet import reactor
7 from twisted.application.internet import TimerService
8 from foolscap import Referenceable
9 from foolscap.logging import log
10 from pycryptopp.publickey import rsa
11
12 import allmydata
13 from allmydata.storage import StorageServer
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.download import Downloader
16 from allmydata.immutable.filenode import FileNode, LiteralFileNode
17 from allmydata.immutable.offloaded import Helper
18 from allmydata.control import ControlServer
19 from allmydata.introducer.client import IntroducerClient
20 from allmydata.util import hashutil, base32, pollmixin, cachedir
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.uri import LiteralFileURI
23 from allmydata.dirnode import NewDirectoryNode
24 from allmydata.mutable.filenode import MutableFileNode
25 from allmydata.stats import StatsProvider
26 from allmydata.history import History
27 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
28      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
29
30 KiB=1024
31 MiB=1024*KiB
32 GiB=1024*MiB
33 TiB=1024*GiB
34 PiB=1024*TiB
35
36 class StubClient(Referenceable):
37     implements(RIStubClient)
38
39 def _make_secret():
40     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
41
42 class Client(node.Node, pollmixin.PollMixin):
43     implements(IStatsProducer)
44
45     PORTNUMFILE = "client.port"
46     STOREDIR = 'storage'
47     NODETYPE = "client"
48     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
49
50     # This means that if a storage server treats me as though I were a
51     # 1.0.0 storage client, it will work as they expect.
52     OLDEST_SUPPORTED_VERSION = "1.0.0"
53
54     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
55     # is the number of shares required to reconstruct a file. 'desired' means
56     # that we will abort an upload unless we can allocate space for at least
57     # this many. 'total' is the total number of shares created by encoding.
58     # If everybody has room then this is is how many we will upload.
59     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
60                                    "happy": 7,
61                                    "n": 10,
62                                    "max_segment_size": 128*KiB,
63                                    }
64
65     def __init__(self, basedir="."):
66         node.Node.__init__(self, basedir)
67         self.started_timestamp = time.time()
68         self.logSource="Client"
69         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
70         self.init_introducer_client()
71         self.init_stats_provider()
72         self.init_lease_secret()
73         self.init_storage()
74         self.init_control()
75         if self.get_config("helper", "enabled", False, boolean=True):
76             self.init_helper()
77         self.init_client()
78         self._key_generator = None
79         key_gen_furl = self.get_config("client", "key_generator.furl", None)
80         if key_gen_furl:
81             self.init_key_gen(key_gen_furl)
82         # ControlServer and Helper are attached after Tub startup
83         self.init_ftp_server()
84         self.init_sftp_server()
85
86         hotline_file = os.path.join(self.basedir,
87                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
88         if os.path.exists(hotline_file):
89             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
90             self.log("hotline file noticed (%ds old), starting timer" % age)
91             hotline = TimerService(1.0, self._check_hotline, hotline_file)
92             hotline.setServiceParent(self)
93
94         webport = self.get_config("node", "web.port", None)
95         if webport:
96             self.init_web(webport) # strports string
97
98     def read_old_config_files(self):
99         node.Node.read_old_config_files(self)
100         copy = self._copy_config_from_file
101         copy("introducer.furl", "client", "introducer.furl")
102         copy("helper.furl", "client", "helper.furl")
103         copy("key_generator.furl", "client", "key_generator.furl")
104         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
105         if os.path.exists(os.path.join(self.basedir, "no_storage")):
106             self.set_config("storage", "enabled", "false")
107         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
108             self.set_config("storage", "readonly", "true")
109         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
110             self.set_config("storage", "debug_discard", "true")
111         if os.path.exists(os.path.join(self.basedir, "run_helper")):
112             self.set_config("helper", "enabled", "true")
113
114     def init_introducer_client(self):
115         self.introducer_furl = self.get_config("client", "introducer.furl")
116         ic = IntroducerClient(self.tub, self.introducer_furl,
117                               self.nickname,
118                               str(allmydata.__full_version__),
119                               str(self.OLDEST_SUPPORTED_VERSION))
120         self.introducer_client = ic
121         # hold off on starting the IntroducerClient until our tub has been
122         # started, so we'll have a useful address on our RemoteReference, so
123         # that the introducer's status page will show us.
124         d = self.when_tub_ready()
125         def _start_introducer_client(res):
126             ic.setServiceParent(self)
127             # nodes that want to upload and download will need storage servers
128             ic.subscribe_to("storage")
129         d.addCallback(_start_introducer_client)
130         d.addErrback(log.err, facility="tahoe.init",
131                      level=log.BAD, umid="URyI5w")
132
133     def init_stats_provider(self):
134         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
135         self.stats_provider = StatsProvider(self, gatherer_furl)
136         self.add_service(self.stats_provider)
137         self.stats_provider.register_producer(self)
138
139     def get_stats(self):
140         return { 'node.uptime': time.time() - self.started_timestamp }
141
142     def init_lease_secret(self):
143         secret_s = self.get_or_create_private_config("secret", _make_secret)
144         self._lease_secret = base32.a2b(secret_s)
145
146     def init_storage(self):
147         # should we run a storage server (and publish it for others to use)?
148         if not self.get_config("storage", "enabled", True, boolean=True):
149             return
150         readonly = self.get_config("storage", "readonly", False, boolean=True)
151
152         storedir = os.path.join(self.basedir, self.STOREDIR)
153
154         data = self.get_config("storage", "reserved_space", None)
155         reserved = None
156         try:
157             reserved = parse_abbreviated_size(data)
158         except ValueError:
159             log.msg("[storage]reserved_space= contains unparseable value %s"
160                     % data)
161         if reserved is None:
162             reserved = 0
163         discard = self.get_config("storage", "debug_discard", False,
164                                   boolean=True)
165         ss = StorageServer(storedir,
166                            reserved_space=reserved,
167                            discard_storage=discard,
168                            readonly_storage=readonly,
169                            stats_provider=self.stats_provider)
170         self.add_service(ss)
171         d = self.when_tub_ready()
172         # we can't do registerReference until the Tub is ready
173         def _publish(res):
174             furl_file = os.path.join(self.basedir, "private", "storage.furl")
175             furl = self.tub.registerReference(ss, furlFile=furl_file)
176             ri_name = RIStorageServer.__remote_name__
177             self.introducer_client.publish(furl, "storage", ri_name)
178         d.addCallback(_publish)
179         d.addErrback(log.err, facility="tahoe.init",
180                      level=log.BAD, umid="aLGBKw")
181
182     def init_client(self):
183         helper_furl = self.get_config("client", "helper.furl", None)
184         DEP = self.DEFAULT_ENCODING_PARAMETERS
185         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
186         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
187         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
188         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
189         self.convergence = base32.a2b(convergence_s)
190         self._node_cache = weakref.WeakValueDictionary() # uri -> node
191         self.add_service(History(self.stats_provider))
192         self.add_service(Uploader(helper_furl, self.stats_provider))
193         download_cachedir = os.path.join(self.basedir,
194                                          "private", "cache", "download")
195         self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
196         self.download_cache.setServiceParent(self)
197         self.add_service(Downloader(self.stats_provider))
198         def _publish(res):
199             # we publish an empty object so that the introducer can count how
200             # many clients are connected and see what versions they're
201             # running.
202             sc = StubClient()
203             furl = self.tub.registerReference(sc)
204             ri_name = RIStubClient.__remote_name__
205             self.introducer_client.publish(furl, "stub_client", ri_name)
206         d = self.when_tub_ready()
207         d.addCallback(_publish)
208         d.addErrback(log.err, facility="tahoe.init",
209                      level=log.BAD, umid="OEHq3g")
210
211     def get_history(self):
212         return self.getServiceNamed("history")
213
214     def init_control(self):
215         d = self.when_tub_ready()
216         def _publish(res):
217             c = ControlServer()
218             c.setServiceParent(self)
219             control_url = self.tub.registerReference(c)
220             self.write_private_config("control.furl", control_url + "\n")
221         d.addCallback(_publish)
222         d.addErrback(log.err, facility="tahoe.init",
223                      level=log.BAD, umid="d3tNXA")
224
225     def init_helper(self):
226         d = self.when_tub_ready()
227         def _publish(self):
228             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
229             h.setServiceParent(self)
230             # TODO: this is confusing. BASEDIR/private/helper.furl is created
231             # by the helper. BASEDIR/helper.furl is consumed by the client
232             # who wants to use the helper. I like having the filename be the
233             # same, since that makes 'cp' work smoothly, but the difference
234             # between config inputs and generated outputs is hard to see.
235             helper_furlfile = os.path.join(self.basedir,
236                                            "private", "helper.furl")
237             self.tub.registerReference(h, furlFile=helper_furlfile)
238         d.addCallback(_publish)
239         d.addErrback(log.err, facility="tahoe.init",
240                      level=log.BAD, umid="K0mW5w")
241
242     def init_key_gen(self, key_gen_furl):
243         d = self.when_tub_ready()
244         def _subscribe(self):
245             self.tub.connectTo(key_gen_furl, self._got_key_generator)
246         d.addCallback(_subscribe)
247         d.addErrback(log.err, facility="tahoe.init",
248                      level=log.BAD, umid="z9DMzw")
249
250     def _got_key_generator(self, key_generator):
251         self._key_generator = key_generator
252         key_generator.notifyOnDisconnect(self._lost_key_generator)
253
254     def _lost_key_generator(self):
255         self._key_generator = None
256
257     def get_servers(self, service_name):
258         """ Return set of (peerid, versioned-rref) """
259         assert isinstance(service_name, str)
260         return self.introducer_client.get_peers(service_name)
261
262     def init_web(self, webport):
263         self.log("init_web(webport=%s)", args=(webport,))
264
265         from allmydata.webish import WebishServer
266         nodeurl_path = os.path.join(self.basedir, "node.url")
267         staticdir = self.get_config("node", "web.static", "public_html")
268         staticdir = os.path.expanduser(staticdir)
269         ws = WebishServer(webport, nodeurl_path, staticdir)
270         self.add_service(ws)
271
272     def init_ftp_server(self):
273         if self.get_config("ftpd", "enabled", False, boolean=True):
274             accountfile = self.get_config("ftpd", "accounts.file", None)
275             accounturl = self.get_config("ftpd", "accounts.url", None)
276             ftp_portstr = self.get_config("ftpd", "port", "8021")
277
278             from allmydata.frontends import ftpd
279             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
280             s.setServiceParent(self)
281
282     def init_sftp_server(self):
283         if self.get_config("sftpd", "enabled", False, boolean=True):
284             accountfile = self.get_config("sftpd", "accounts.file", None)
285             accounturl = self.get_config("sftpd", "accounts.url", None)
286             sftp_portstr = self.get_config("sftpd", "port", "8022")
287             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
288             privkey_file = self.get_config("sftpd", "host_privkey_file")
289
290             from allmydata.frontends import sftpd
291             s = sftpd.SFTPServer(self, accountfile, accounturl,
292                                  sftp_portstr, pubkey_file, privkey_file)
293             s.setServiceParent(self)
294
295     def _check_hotline(self, hotline_file):
296         if os.path.exists(hotline_file):
297             mtime = os.stat(hotline_file)[stat.ST_MTIME]
298             if mtime > time.time() - 40.0:
299                 return
300             else:
301                 self.log("hotline file too old, shutting down")
302         else:
303             self.log("hotline file missing, shutting down")
304         reactor.stop()
305
306     def get_all_peerids(self):
307         return self.introducer_client.get_all_peerids()
308     def get_nickname_for_peerid(self, peerid):
309         return self.introducer_client.get_nickname_for_peerid(peerid)
310
311     def get_permuted_peers(self, service_name, key):
312         """
313         @return: list of (peerid, connection,)
314         """
315         assert isinstance(service_name, str)
316         assert isinstance(key, str)
317         return self.introducer_client.get_permuted_peers(service_name, key)
318
319     def get_encoding_parameters(self):
320         return self.DEFAULT_ENCODING_PARAMETERS
321
322     def connected_to_introducer(self):
323         if self.introducer_client:
324             return self.introducer_client.connected_to_introducer()
325         return False
326
327     def get_renewal_secret(self):
328         return hashutil.my_renewal_secret_hash(self._lease_secret)
329
330     def get_cancel_secret(self):
331         return hashutil.my_cancel_secret_hash(self._lease_secret)
332
333     def debug_wait_for_client_connections(self, num_clients):
334         """Return a Deferred that fires (with None) when we have connections
335         to the given number of peers. Useful for tests that set up a
336         temporary test network and need to know when it is safe to proceed
337         with an upload or download."""
338         def _check():
339             current_clients = list(self.get_all_peerids())
340             return len(current_clients) >= num_clients
341         d = self.poll(_check, 0.5)
342         d.addCallback(lambda res: None)
343         return d
344
345
346     # these four methods are the primitives for creating filenodes and
347     # dirnodes. The first takes a URI and produces a filenode or (new-style)
348     # dirnode. The other three create brand-new filenodes/dirnodes.
349
350     def create_node_from_uri(self, u):
351         # this returns synchronously.
352         u = IURI(u)
353         u_s = u.to_string()
354         if u_s not in self._node_cache:
355             if IReadonlyNewDirectoryURI.providedBy(u):
356                 # new-style read-only dirnodes
357                 node = NewDirectoryNode(self).init_from_uri(u)
358             elif INewDirectoryURI.providedBy(u):
359                 # new-style dirnodes
360                 node = NewDirectoryNode(self).init_from_uri(u)
361             elif IFileURI.providedBy(u):
362                 if isinstance(u, LiteralFileURI):
363                     node = LiteralFileNode(u, self) # LIT
364                 else:
365                     key = base32.b2a(u.storage_index)
366                     cachefile = self.download_cache.get_file(key)
367                     node = FileNode(u, self, cachefile) # CHK
368             else:
369                 assert IMutableFileURI.providedBy(u), u
370                 node = MutableFileNode(self).init_from_uri(u)
371             self._node_cache[u_s] = node
372         return self._node_cache[u_s]
373
374     def create_empty_dirnode(self):
375         n = NewDirectoryNode(self)
376         d = n.create(self._generate_pubprivkeys)
377         d.addCallback(lambda res: n)
378         return d
379
380     def create_mutable_file(self, contents=""):
381         n = MutableFileNode(self)
382         d = n.create(contents, self._generate_pubprivkeys)
383         d.addCallback(lambda res: n)
384         return d
385
386     def _generate_pubprivkeys(self, key_size):
387         if self._key_generator:
388             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
389             def make_key_objs((verifying_key, signing_key)):
390                 v = rsa.create_verifying_key_from_string(verifying_key)
391                 s = rsa.create_signing_key_from_string(signing_key)
392                 return v, s
393             d.addCallback(make_key_objs)
394             return d
395         else:
396             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
397             # secs
398             signer = rsa.generate(key_size)
399             verifier = signer.get_verifying_key()
400             return verifier, signer
401
402     def upload(self, uploadable):
403         uploader = self.getServiceNamed("uploader")
404         return uploader.upload(uploadable, history=self.get_history())
405
406
407     def list_all_upload_statuses(self):
408         return self.get_history().list_all_upload_statuses()
409
410     def list_all_download_statuses(self):
411         return self.get_history().list_all_download_statuses()
412
413     def list_all_mapupdate_statuses(self):
414         return self.get_history().list_all_mapupdate_statuses()
415     def list_all_publish_statuses(self):
416         return self.get_history().list_all_publish_statuses()
417     def list_all_retrieve_statuses(self):
418         return self.get_history().list_all_retrieve_statuses()
419
420     def list_all_helper_statuses(self):
421         try:
422             helper = self.getServiceNamed("helper")
423         except KeyError:
424             return []
425         return helper.get_all_upload_statuses()