]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
immutable/download.py move recent-downloads history out of Downloader and into a...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1
2 import os, stat, time, weakref
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
5
6 from zope.interface import implements
7 from twisted.internet import reactor
8 from twisted.application.internet import TimerService
9 from foolscap import Referenceable
10 from foolscap.logging import log
11 from pycryptopp.publickey import rsa
12
13 import allmydata
14 from allmydata.storage import StorageServer
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.download import Downloader
17 from allmydata.immutable.filenode import FileNode, LiteralFileNode
18 from allmydata.immutable.offloaded import Helper
19 from allmydata.control import ControlServer
20 from allmydata.introducer.client import IntroducerClient
21 from allmydata.util import hashutil, base32, pollmixin, cachedir
22 from allmydata.util.abbreviate import parse_abbreviated_size
23 from allmydata.uri import LiteralFileURI
24 from allmydata.dirnode import NewDirectoryNode
25 from allmydata.mutable.filenode import MutableFileNode, MutableWatcher
26 from allmydata.stats import StatsProvider
27 from allmydata.history import History
28 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
29      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
30
31 KiB=1024
32 MiB=1024*KiB
33 GiB=1024*MiB
34 TiB=1024*GiB
35 PiB=1024*TiB
36
37 class StubClient(Referenceable):
38     implements(RIStubClient)
39
40 def _make_secret():
41     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
42
43 class Client(node.Node, pollmixin.PollMixin):
44     implements(IStatsProducer)
45
46     PORTNUMFILE = "client.port"
47     STOREDIR = 'storage'
48     NODETYPE = "client"
49     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
50
51     # This means that if a storage server treats me as though I were a
52     # 1.0.0 storage client, it will work as they expect.
53     OLDEST_SUPPORTED_VERSION = "1.0.0"
54
55     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
56     # is the number of shares required to reconstruct a file. 'desired' means
57     # that we will abort an upload unless we can allocate space for at least
58     # this many. 'total' is the total number of shares created by encoding.
59     # If everybody has room then this is is how many we will upload.
60     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
61                                    "happy": 7,
62                                    "n": 10,
63                                    "max_segment_size": 128*KiB,
64                                    }
65
66     def __init__(self, basedir="."):
67         node.Node.__init__(self, basedir)
68         self.started_timestamp = time.time()
69         self.logSource="Client"
70         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
71         self.init_introducer_client()
72         self.init_stats_provider()
73         self.init_lease_secret()
74         self.init_storage()
75         self.init_control()
76         if self.get_config("helper", "enabled", False, boolean=True):
77             self.init_helper()
78         self.init_client()
79         self._key_generator = None
80         key_gen_furl = self.get_config("client", "key_generator.furl", None)
81         if key_gen_furl:
82             self.init_key_gen(key_gen_furl)
83         # ControlServer and Helper are attached after Tub startup
84         self.init_ftp_server()
85         self.init_sftp_server()
86
87         hotline_file = os.path.join(self.basedir,
88                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
89         if os.path.exists(hotline_file):
90             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
91             self.log("hotline file noticed (%ds old), starting timer" % age)
92             hotline = TimerService(1.0, self._check_hotline, hotline_file)
93             hotline.setServiceParent(self)
94
95         webport = self.get_config("node", "web.port", None)
96         if webport:
97             self.init_web(webport) # strports string
98
99     def read_old_config_files(self):
100         node.Node.read_old_config_files(self)
101         copy = self._copy_config_from_file
102         copy("introducer.furl", "client", "introducer.furl")
103         copy("helper.furl", "client", "helper.furl")
104         copy("key_generator.furl", "client", "key_generator.furl")
105         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
106         if os.path.exists(os.path.join(self.basedir, "no_storage")):
107             self.set_config("storage", "enabled", "false")
108         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
109             self.set_config("storage", "readonly", "true")
110         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
111             self.set_config("storage", "debug_discard", "true")
112         if os.path.exists(os.path.join(self.basedir, "run_helper")):
113             self.set_config("helper", "enabled", "true")
114
115     def init_introducer_client(self):
116         self.introducer_furl = self.get_config("client", "introducer.furl")
117         ic = IntroducerClient(self.tub, self.introducer_furl,
118                               self.nickname,
119                               str(allmydata.__version__),
120                               str(self.OLDEST_SUPPORTED_VERSION))
121         self.introducer_client = ic
122         # hold off on starting the IntroducerClient until our tub has been
123         # started, so we'll have a useful address on our RemoteReference, so
124         # that the introducer's status page will show us.
125         d = self.when_tub_ready()
126         def _start_introducer_client(res):
127             ic.setServiceParent(self)
128             # nodes that want to upload and download will need storage servers
129             ic.subscribe_to("storage")
130         d.addCallback(_start_introducer_client)
131         d.addErrback(log.err, facility="tahoe.init",
132                      level=log.BAD, umid="URyI5w")
133
134     def init_stats_provider(self):
135         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
136         self.stats_provider = StatsProvider(self, gatherer_furl)
137         self.add_service(self.stats_provider)
138         self.stats_provider.register_producer(self)
139
140     def get_stats(self):
141         return { 'node.uptime': time.time() - self.started_timestamp }
142
143     def init_lease_secret(self):
144         secret_s = self.get_or_create_private_config("secret", _make_secret)
145         self._lease_secret = base32.a2b(secret_s)
146
147     def init_storage(self):
148         # should we run a storage server (and publish it for others to use)?
149         if not self.get_config("storage", "enabled", True, boolean=True):
150             return
151         readonly = self.get_config("storage", "readonly", False, boolean=True)
152
153         storedir = os.path.join(self.basedir, self.STOREDIR)
154
155         data = self.get_config("storage", "reserved_space", None)
156         reserved = None
157         try:
158             reserved = parse_abbreviated_size(data)
159         except ValueError:
160             log.msg("[storage]reserved_space= contains unparseable value %s"
161                     % data)
162         if reserved is None:
163             reserved = 0
164         discard = self.get_config("storage", "debug_discard", False,
165                                   boolean=True)
166         ss = StorageServer(storedir,
167                            reserved_space=reserved,
168                            discard_storage=discard,
169                            readonly_storage=readonly,
170                            stats_provider=self.stats_provider)
171         self.add_service(ss)
172         d = self.when_tub_ready()
173         # we can't do registerReference until the Tub is ready
174         def _publish(res):
175             furl_file = os.path.join(self.basedir, "private", "storage.furl")
176             furl = self.tub.registerReference(ss, furlFile=furl_file)
177             ri_name = RIStorageServer.__remote_name__
178             self.introducer_client.publish(furl, "storage", ri_name)
179         d.addCallback(_publish)
180         d.addErrback(log.err, facility="tahoe.init",
181                      level=log.BAD, umid="aLGBKw")
182
183     def init_client(self):
184         helper_furl = self.get_config("client", "helper.furl", None)
185         DEP = self.DEFAULT_ENCODING_PARAMETERS
186         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
187         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
188         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
189         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
190         self.convergence = base32.a2b(convergence_s)
191         self._node_cache = weakref.WeakValueDictionary() # uri -> node
192         self.add_service(History())
193         self.add_service(Uploader(helper_furl, self.stats_provider))
194         download_cachedir = os.path.join(self.basedir,
195                                          "private", "cache", "download")
196         self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
197         self.download_cache.setServiceParent(self)
198         self.add_service(Downloader(self.stats_provider))
199         self.add_service(MutableWatcher(self.stats_provider))
200         def _publish(res):
201             # we publish an empty object so that the introducer can count how
202             # many clients are connected and see what versions they're
203             # running.
204             sc = StubClient()
205             furl = self.tub.registerReference(sc)
206             ri_name = RIStubClient.__remote_name__
207             self.introducer_client.publish(furl, "stub_client", ri_name)
208         d = self.when_tub_ready()
209         d.addCallback(_publish)
210         d.addErrback(log.err, facility="tahoe.init",
211                      level=log.BAD, umid="OEHq3g")
212
213     def get_history(self):
214         return self.getServiceNamed("history")
215
216     def init_control(self):
217         d = self.when_tub_ready()
218         def _publish(res):
219             c = ControlServer()
220             c.setServiceParent(self)
221             control_url = self.tub.registerReference(c)
222             self.write_private_config("control.furl", control_url + "\n")
223         d.addCallback(_publish)
224         d.addErrback(log.err, facility="tahoe.init",
225                      level=log.BAD, umid="d3tNXA")
226
227     def init_helper(self):
228         d = self.when_tub_ready()
229         def _publish(self):
230             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
231             h.setServiceParent(self)
232             # TODO: this is confusing. BASEDIR/private/helper.furl is created
233             # by the helper. BASEDIR/helper.furl is consumed by the client
234             # who wants to use the helper. I like having the filename be the
235             # same, since that makes 'cp' work smoothly, but the difference
236             # between config inputs and generated outputs is hard to see.
237             helper_furlfile = os.path.join(self.basedir,
238                                            "private", "helper.furl")
239             self.tub.registerReference(h, furlFile=helper_furlfile)
240         d.addCallback(_publish)
241         d.addErrback(log.err, facility="tahoe.init",
242                      level=log.BAD, umid="K0mW5w")
243
244     def init_key_gen(self, key_gen_furl):
245         d = self.when_tub_ready()
246         def _subscribe(self):
247             self.tub.connectTo(key_gen_furl, self._got_key_generator)
248         d.addCallback(_subscribe)
249         d.addErrback(log.err, facility="tahoe.init",
250                      level=log.BAD, umid="z9DMzw")
251
252     def _got_key_generator(self, key_generator):
253         self._key_generator = key_generator
254         key_generator.notifyOnDisconnect(self._lost_key_generator)
255
256     def _lost_key_generator(self):
257         self._key_generator = None
258
259     def get_servers(self, service_name):
260         """ Return set of (peerid, versioned-rref) """
261         assert isinstance(service_name, str)
262         return self.introducer_client.get_peers(service_name)
263
264     def init_web(self, webport):
265         self.log("init_web(webport=%s)", args=(webport,))
266
267         from allmydata.webish import WebishServer
268         nodeurl_path = os.path.join(self.basedir, "node.url")
269         staticdir = self.get_config("node", "web.static", "public_html")
270         staticdir = os.path.expanduser(staticdir)
271         ws = WebishServer(webport, nodeurl_path, staticdir)
272         self.add_service(ws)
273
274     def init_ftp_server(self):
275         if self.get_config("ftpd", "enabled", False, boolean=True):
276             accountfile = self.get_config("ftpd", "accounts.file", None)
277             accounturl = self.get_config("ftpd", "accounts.url", None)
278             ftp_portstr = self.get_config("ftpd", "port", "8021")
279
280             from allmydata.frontends import ftpd
281             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
282             s.setServiceParent(self)
283
284     def init_sftp_server(self):
285         if self.get_config("sftpd", "enabled", False, boolean=True):
286             accountfile = self.get_config("sftpd", "accounts.file", None)
287             accounturl = self.get_config("sftpd", "accounts.url", None)
288             sftp_portstr = self.get_config("sftpd", "port", "8022")
289             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
290             privkey_file = self.get_config("sftpd", "host_privkey_file")
291
292             from allmydata.frontends import sftpd
293             s = sftpd.SFTPServer(self, accountfile, accounturl,
294                                  sftp_portstr, pubkey_file, privkey_file)
295             s.setServiceParent(self)
296
297     def _check_hotline(self, hotline_file):
298         if os.path.exists(hotline_file):
299             mtime = os.stat(hotline_file)[stat.ST_MTIME]
300             if mtime > time.time() - 40.0:
301                 return
302             else:
303                 self.log("hotline file too old, shutting down")
304         else:
305             self.log("hotline file missing, shutting down")
306         reactor.stop()
307
308     def get_all_peerids(self):
309         return self.introducer_client.get_all_peerids()
310     def get_nickname_for_peerid(self, peerid):
311         return self.introducer_client.get_nickname_for_peerid(peerid)
312
313     def get_permuted_peers(self, service_name, key):
314         """
315         @return: list of (peerid, connection,)
316         """
317         assert isinstance(service_name, str)
318         assert isinstance(key, str)
319         return self.introducer_client.get_permuted_peers(service_name, key)
320
321     def get_encoding_parameters(self):
322         return self.DEFAULT_ENCODING_PARAMETERS
323
324     def connected_to_introducer(self):
325         if self.introducer_client:
326             return self.introducer_client.connected_to_introducer()
327         return False
328
329     def get_renewal_secret(self):
330         return hashutil.my_renewal_secret_hash(self._lease_secret)
331
332     def get_cancel_secret(self):
333         return hashutil.my_cancel_secret_hash(self._lease_secret)
334
335     def debug_wait_for_client_connections(self, num_clients):
336         """Return a Deferred that fires (with None) when we have connections
337         to the given number of peers. Useful for tests that set up a
338         temporary test network and need to know when it is safe to proceed
339         with an upload or download."""
340         def _check():
341             current_clients = list(self.get_all_peerids())
342             return len(current_clients) >= num_clients
343         d = self.poll(_check, 0.5)
344         d.addCallback(lambda res: None)
345         return d
346
347
348     # these four methods are the primitives for creating filenodes and
349     # dirnodes. The first takes a URI and produces a filenode or (new-style)
350     # dirnode. The other three create brand-new filenodes/dirnodes.
351
352     def create_node_from_uri(self, u):
353         # this returns synchronously.
354         u = IURI(u)
355         u_s = u.to_string()
356         if u_s not in self._node_cache:
357             if IReadonlyNewDirectoryURI.providedBy(u):
358                 # new-style read-only dirnodes
359                 node = NewDirectoryNode(self).init_from_uri(u)
360             elif INewDirectoryURI.providedBy(u):
361                 # new-style dirnodes
362                 node = NewDirectoryNode(self).init_from_uri(u)
363             elif IFileURI.providedBy(u):
364                 if isinstance(u, LiteralFileURI):
365                     node = LiteralFileNode(u, self) # LIT
366                 else:
367                     key = base32.b2a(u.storage_index)
368                     cachefile = self.download_cache.get_file(key)
369                     node = FileNode(u, self, cachefile) # CHK
370             else:
371                 assert IMutableFileURI.providedBy(u), u
372                 node = MutableFileNode(self).init_from_uri(u)
373             self._node_cache[u_s] = node
374         return self._node_cache[u_s]
375
376     def notify_publish(self, publish_status, size):
377         self.getServiceNamed("mutable-watcher").notify_publish(publish_status,
378                                                                size)
379     def notify_retrieve(self, retrieve_status):
380         self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
381     def notify_mapupdate(self, update_status):
382         self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status)
383
384     def create_empty_dirnode(self):
385         n = NewDirectoryNode(self)
386         d = n.create(self._generate_pubprivkeys)
387         d.addCallback(lambda res: n)
388         return d
389
390     def create_mutable_file(self, contents=""):
391         n = MutableFileNode(self)
392         d = n.create(contents, self._generate_pubprivkeys)
393         d.addCallback(lambda res: n)
394         return d
395
396     def _generate_pubprivkeys(self, key_size):
397         if self._key_generator:
398             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
399             def make_key_objs((verifying_key, signing_key)):
400                 v = rsa.create_verifying_key_from_string(verifying_key)
401                 s = rsa.create_signing_key_from_string(signing_key)
402                 return v, s
403             d.addCallback(make_key_objs)
404             return d
405         else:
406             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
407             # secs
408             signer = rsa.generate(key_size)
409             verifier = signer.get_verifying_key()
410             return verifier, signer
411
412     def upload(self, uploadable):
413         uploader = self.getServiceNamed("uploader")
414         return uploader.upload(uploadable)
415
416
417     def list_all_upload_statuses(self):
418         uploader = self.getServiceNamed("uploader")
419         return uploader.list_all_upload_statuses()
420
421     def list_all_download_statuses(self):
422         return self.get_history().list_all_download_statuses()
423
424     def list_all_mapupdate_statuses(self):
425         watcher = self.getServiceNamed("mutable-watcher")
426         return watcher.list_all_mapupdate_statuses()
427     def list_all_publish_statuses(self):
428         watcher = self.getServiceNamed("mutable-watcher")
429         return watcher.list_all_publish_statuses()
430     def list_all_retrieve_statuses(self):
431         watcher = self.getServiceNamed("mutable-watcher")
432         return watcher.list_all_retrieve_statuses()
433
434     def list_all_helper_statuses(self):
435         try:
436             helper = self.getServiceNamed("helper")
437         except KeyError:
438             return []
439         return helper.get_all_upload_statuses()
440