]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
immutable: Make more parts of download use logging mixins and know what their "parent...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1
2 import os, stat, time, weakref
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
5
6 from zope.interface import implements
7 from twisted.internet import reactor
8 from twisted.application.internet import TimerService
9 from foolscap import Referenceable
10 from foolscap.logging import log
11 from pycryptopp.publickey import rsa
12
13 import allmydata
14 from allmydata.storage import StorageServer
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.download import Downloader
17 from allmydata.immutable.filenode import FileNode, LiteralFileNode
18 from allmydata.immutable.offloaded import Helper
19 from allmydata.control import ControlServer
20 from allmydata.introducer.client import IntroducerClient
21 from allmydata.util import hashutil, base32, pollmixin, cachedir
22 from allmydata.util.abbreviate import parse_abbreviated_size
23 from allmydata.uri import LiteralFileURI
24 from allmydata.dirnode import NewDirectoryNode
25 from allmydata.mutable.filenode import MutableFileNode, MutableWatcher
26 from allmydata.stats import StatsProvider
27 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
28      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
29
30 KiB=1024
31 MiB=1024*KiB
32 GiB=1024*MiB
33 TiB=1024*GiB
34 PiB=1024*TiB
35
36 class StubClient(Referenceable):
37     implements(RIStubClient)
38
39 def _make_secret():
40     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
41
42 class Client(node.Node, pollmixin.PollMixin):
43     implements(IStatsProducer)
44
45     PORTNUMFILE = "client.port"
46     STOREDIR = 'storage'
47     NODETYPE = "client"
48     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
49
50     # This means that if a storage server treats me as though I were a
51     # 1.0.0 storage client, it will work as they expect.
52     OLDEST_SUPPORTED_VERSION = "1.0.0"
53
54     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
55     # is the number of shares required to reconstruct a file. 'desired' means
56     # that we will abort an upload unless we can allocate space for at least
57     # this many. 'total' is the total number of shares created by encoding.
58     # If everybody has room then this is is how many we will upload.
59     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
60                                    "happy": 7,
61                                    "n": 10,
62                                    "max_segment_size": 128*KiB,
63                                    }
64
65     def __init__(self, basedir="."):
66         node.Node.__init__(self, basedir)
67         self.started_timestamp = time.time()
68         self.logSource="Client"
69         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
70         self.init_introducer_client()
71         self.init_stats_provider()
72         self.init_lease_secret()
73         self.init_storage()
74         self.init_control()
75         if self.get_config("helper", "enabled", False, boolean=True):
76             self.init_helper()
77         self.init_client()
78         self._key_generator = None
79         key_gen_furl = self.get_config("client", "key_generator.furl", None)
80         if key_gen_furl:
81             self.init_key_gen(key_gen_furl)
82         # ControlServer and Helper are attached after Tub startup
83         self.init_ftp_server()
84         self.init_sftp_server()
85
86         hotline_file = os.path.join(self.basedir,
87                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
88         if os.path.exists(hotline_file):
89             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
90             self.log("hotline file noticed (%ds old), starting timer" % age)
91             hotline = TimerService(1.0, self._check_hotline, hotline_file)
92             hotline.setServiceParent(self)
93
94         webport = self.get_config("node", "web.port", None)
95         if webport:
96             self.init_web(webport) # strports string
97
98     def read_old_config_files(self):
99         node.Node.read_old_config_files(self)
100         copy = self._copy_config_from_file
101         copy("introducer.furl", "client", "introducer.furl")
102         copy("helper.furl", "client", "helper.furl")
103         copy("key_generator.furl", "client", "key_generator.furl")
104         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
105         if os.path.exists(os.path.join(self.basedir, "no_storage")):
106             self.set_config("storage", "enabled", "false")
107         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
108             self.set_config("storage", "readonly", "true")
109         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
110             self.set_config("storage", "debug_discard", "true")
111         if os.path.exists(os.path.join(self.basedir, "run_helper")):
112             self.set_config("helper", "enabled", "true")
113
114     def init_introducer_client(self):
115         self.introducer_furl = self.get_config("client", "introducer.furl")
116         ic = IntroducerClient(self.tub, self.introducer_furl,
117                               self.nickname,
118                               str(allmydata.__version__),
119                               str(self.OLDEST_SUPPORTED_VERSION))
120         self.introducer_client = ic
121         # hold off on starting the IntroducerClient until our tub has been
122         # started, so we'll have a useful address on our RemoteReference, so
123         # that the introducer's status page will show us.
124         d = self.when_tub_ready()
125         def _start_introducer_client(res):
126             ic.setServiceParent(self)
127             # nodes that want to upload and download will need storage servers
128             ic.subscribe_to("storage")
129         d.addCallback(_start_introducer_client)
130         d.addErrback(log.err, facility="tahoe.init",
131                      level=log.BAD, umid="URyI5w")
132
133     def init_stats_provider(self):
134         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
135         self.stats_provider = StatsProvider(self, gatherer_furl)
136         self.add_service(self.stats_provider)
137         self.stats_provider.register_producer(self)
138
139     def get_stats(self):
140         return { 'node.uptime': time.time() - self.started_timestamp }
141
142     def init_lease_secret(self):
143         secret_s = self.get_or_create_private_config("secret", _make_secret)
144         self._lease_secret = base32.a2b(secret_s)
145
146     def init_storage(self):
147         # should we run a storage server (and publish it for others to use)?
148         if not self.get_config("storage", "enabled", True, boolean=True):
149             return
150         readonly = self.get_config("storage", "readonly", False, boolean=True)
151
152         storedir = os.path.join(self.basedir, self.STOREDIR)
153
154         data = self.get_config("storage", "reserved_space", None)
155         reserved = None
156         try:
157             reserved = parse_abbreviated_size(data)
158         except ValueError:
159             log.msg("[storage]reserved_space= contains unparseable value %s"
160                     % data)
161         if reserved is None:
162             reserved = 0
163         discard = self.get_config("storage", "debug_discard", False,
164                                   boolean=True)
165         ss = StorageServer(storedir,
166                            reserved_space=reserved,
167                            discard_storage=discard,
168                            readonly_storage=readonly,
169                            stats_provider=self.stats_provider)
170         self.add_service(ss)
171         d = self.when_tub_ready()
172         # we can't do registerReference until the Tub is ready
173         def _publish(res):
174             furl_file = os.path.join(self.basedir, "private", "storage.furl")
175             furl = self.tub.registerReference(ss, furlFile=furl_file)
176             ri_name = RIStorageServer.__remote_name__
177             self.introducer_client.publish(furl, "storage", ri_name)
178         d.addCallback(_publish)
179         d.addErrback(log.err, facility="tahoe.init",
180                      level=log.BAD, umid="aLGBKw")
181
182     def init_client(self):
183         helper_furl = self.get_config("client", "helper.furl", None)
184         DEP = self.DEFAULT_ENCODING_PARAMETERS
185         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
186         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
187         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
188         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
189         self.convergence = base32.a2b(convergence_s)
190         self._node_cache = weakref.WeakValueDictionary() # uri -> node
191         self.add_service(Uploader(helper_furl, self.stats_provider))
192         download_cachedir = os.path.join(self.basedir,
193                                          "private", "cache", "download")
194         self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
195         self.download_cache.setServiceParent(self)
196         self.add_service(Downloader(self.stats_provider))
197         self.add_service(MutableWatcher(self.stats_provider))
198         def _publish(res):
199             # we publish an empty object so that the introducer can count how
200             # many clients are connected and see what versions they're
201             # running.
202             sc = StubClient()
203             furl = self.tub.registerReference(sc)
204             ri_name = RIStubClient.__remote_name__
205             self.introducer_client.publish(furl, "stub_client", ri_name)
206         d = self.when_tub_ready()
207         d.addCallback(_publish)
208         d.addErrback(log.err, facility="tahoe.init",
209                      level=log.BAD, umid="OEHq3g")
210
211     def init_control(self):
212         d = self.when_tub_ready()
213         def _publish(res):
214             c = ControlServer()
215             c.setServiceParent(self)
216             control_url = self.tub.registerReference(c)
217             self.write_private_config("control.furl", control_url + "\n")
218         d.addCallback(_publish)
219         d.addErrback(log.err, facility="tahoe.init",
220                      level=log.BAD, umid="d3tNXA")
221
222     def init_helper(self):
223         d = self.when_tub_ready()
224         def _publish(self):
225             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
226             h.setServiceParent(self)
227             # TODO: this is confusing. BASEDIR/private/helper.furl is created
228             # by the helper. BASEDIR/helper.furl is consumed by the client
229             # who wants to use the helper. I like having the filename be the
230             # same, since that makes 'cp' work smoothly, but the difference
231             # between config inputs and generated outputs is hard to see.
232             helper_furlfile = os.path.join(self.basedir,
233                                            "private", "helper.furl")
234             self.tub.registerReference(h, furlFile=helper_furlfile)
235         d.addCallback(_publish)
236         d.addErrback(log.err, facility="tahoe.init",
237                      level=log.BAD, umid="K0mW5w")
238
239     def init_key_gen(self, key_gen_furl):
240         d = self.when_tub_ready()
241         def _subscribe(self):
242             self.tub.connectTo(key_gen_furl, self._got_key_generator)
243         d.addCallback(_subscribe)
244         d.addErrback(log.err, facility="tahoe.init",
245                      level=log.BAD, umid="z9DMzw")
246
247     def _got_key_generator(self, key_generator):
248         self._key_generator = key_generator
249         key_generator.notifyOnDisconnect(self._lost_key_generator)
250
251     def _lost_key_generator(self):
252         self._key_generator = None
253
254     def get_servers(self, service_name):
255         """ Return set of (peerid, versioned-rref) """
256         assert isinstance(service_name, str)
257         return self.introducer_client.get_peers(service_name)
258
259     def init_web(self, webport):
260         self.log("init_web(webport=%s)", args=(webport,))
261
262         from allmydata.webish import WebishServer
263         nodeurl_path = os.path.join(self.basedir, "node.url")
264         staticdir = self.get_config("node", "web.static", "public_html")
265         staticdir = os.path.expanduser(staticdir)
266         ws = WebishServer(webport, nodeurl_path, staticdir)
267         self.add_service(ws)
268
269     def init_ftp_server(self):
270         if self.get_config("ftpd", "enabled", False, boolean=True):
271             accountfile = self.get_config("ftpd", "accounts.file", None)
272             accounturl = self.get_config("ftpd", "accounts.url", None)
273             ftp_portstr = self.get_config("ftpd", "port", "8021")
274
275             from allmydata.frontends import ftpd
276             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
277             s.setServiceParent(self)
278
279     def init_sftp_server(self):
280         if self.get_config("sftpd", "enabled", False, boolean=True):
281             accountfile = self.get_config("sftpd", "accounts.file", None)
282             accounturl = self.get_config("sftpd", "accounts.url", None)
283             sftp_portstr = self.get_config("sftpd", "port", "8022")
284             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
285             privkey_file = self.get_config("sftpd", "host_privkey_file")
286
287             from allmydata.frontends import sftpd
288             s = sftpd.SFTPServer(self, accountfile, accounturl,
289                                  sftp_portstr, pubkey_file, privkey_file)
290             s.setServiceParent(self)
291
292     def _check_hotline(self, hotline_file):
293         if os.path.exists(hotline_file):
294             mtime = os.stat(hotline_file)[stat.ST_MTIME]
295             if mtime > time.time() - 40.0:
296                 return
297             else:
298                 self.log("hotline file too old, shutting down")
299         else:
300             self.log("hotline file missing, shutting down")
301         reactor.stop()
302
303     def get_all_peerids(self):
304         return self.introducer_client.get_all_peerids()
305     def get_nickname_for_peerid(self, peerid):
306         return self.introducer_client.get_nickname_for_peerid(peerid)
307
308     def get_permuted_peers(self, service_name, key):
309         """
310         @return: list of (peerid, connection,)
311         """
312         assert isinstance(service_name, str)
313         assert isinstance(key, str)
314         return self.introducer_client.get_permuted_peers(service_name, key)
315
316     def get_encoding_parameters(self):
317         return self.DEFAULT_ENCODING_PARAMETERS
318
319     def connected_to_introducer(self):
320         if self.introducer_client:
321             return self.introducer_client.connected_to_introducer()
322         return False
323
324     def get_renewal_secret(self):
325         return hashutil.my_renewal_secret_hash(self._lease_secret)
326
327     def get_cancel_secret(self):
328         return hashutil.my_cancel_secret_hash(self._lease_secret)
329
330     def debug_wait_for_client_connections(self, num_clients):
331         """Return a Deferred that fires (with None) when we have connections
332         to the given number of peers. Useful for tests that set up a
333         temporary test network and need to know when it is safe to proceed
334         with an upload or download."""
335         def _check():
336             current_clients = list(self.get_all_peerids())
337             return len(current_clients) >= num_clients
338         d = self.poll(_check, 0.5)
339         d.addCallback(lambda res: None)
340         return d
341
342
343     # these four methods are the primitives for creating filenodes and
344     # dirnodes. The first takes a URI and produces a filenode or (new-style)
345     # dirnode. The other three create brand-new filenodes/dirnodes.
346
347     def create_node_from_uri(self, u):
348         # this returns synchronously.
349         u = IURI(u)
350         u_s = u.to_string()
351         if u_s not in self._node_cache:
352             if IReadonlyNewDirectoryURI.providedBy(u):
353                 # new-style read-only dirnodes
354                 node = NewDirectoryNode(self).init_from_uri(u)
355             elif INewDirectoryURI.providedBy(u):
356                 # new-style dirnodes
357                 node = NewDirectoryNode(self).init_from_uri(u)
358             elif IFileURI.providedBy(u):
359                 if isinstance(u, LiteralFileURI):
360                     node = LiteralFileNode(u, self) # LIT
361                 else:
362                     key = base32.b2a(u.storage_index)
363                     cachefile = self.download_cache.get_file(key)
364                     node = FileNode(u.to_string(), self, cachefile) # CHK
365             else:
366                 assert IMutableFileURI.providedBy(u), u
367                 node = MutableFileNode(self).init_from_uri(u)
368             self._node_cache[u_s] = node
369         return self._node_cache[u_s]
370
371     def notify_publish(self, publish_status, size):
372         self.getServiceNamed("mutable-watcher").notify_publish(publish_status,
373                                                                size)
374     def notify_retrieve(self, retrieve_status):
375         self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
376     def notify_mapupdate(self, update_status):
377         self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status)
378
379     def create_empty_dirnode(self):
380         n = NewDirectoryNode(self)
381         d = n.create(self._generate_pubprivkeys)
382         d.addCallback(lambda res: n)
383         return d
384
385     def create_mutable_file(self, contents=""):
386         n = MutableFileNode(self)
387         d = n.create(contents, self._generate_pubprivkeys)
388         d.addCallback(lambda res: n)
389         return d
390
391     def _generate_pubprivkeys(self, key_size):
392         if self._key_generator:
393             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
394             def make_key_objs((verifying_key, signing_key)):
395                 v = rsa.create_verifying_key_from_string(verifying_key)
396                 s = rsa.create_signing_key_from_string(signing_key)
397                 return v, s
398             d.addCallback(make_key_objs)
399             return d
400         else:
401             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
402             # secs
403             signer = rsa.generate(key_size)
404             verifier = signer.get_verifying_key()
405             return verifier, signer
406
407     def upload(self, uploadable):
408         uploader = self.getServiceNamed("uploader")
409         return uploader.upload(uploadable)
410
411
412     def list_all_upload_statuses(self):
413         uploader = self.getServiceNamed("uploader")
414         return uploader.list_all_upload_statuses()
415
416     def list_all_download_statuses(self):
417         downloader = self.getServiceNamed("downloader")
418         return downloader.list_all_download_statuses()
419
420     def list_all_mapupdate_statuses(self):
421         watcher = self.getServiceNamed("mutable-watcher")
422         return watcher.list_all_mapupdate_statuses()
423     def list_all_publish_statuses(self):
424         watcher = self.getServiceNamed("mutable-watcher")
425         return watcher.list_all_publish_statuses()
426     def list_all_retrieve_statuses(self):
427         watcher = self.getServiceNamed("mutable-watcher")
428         return watcher.list_all_retrieve_statuses()
429
430     def list_all_helper_statuses(self):
431         try:
432             helper = self.getServiceNamed("helper")
433         except KeyError:
434             return []
435         return helper.get_all_upload_statuses()
436