]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
mutable: rename mutable/node.py to mutable/filenode.py and mutable/repair.py to mutab...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1
2 import os, stat, time, weakref
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
5
6 from zope.interface import implements
7 from twisted.internet import reactor
8 from twisted.application.internet import TimerService
9 from foolscap import Referenceable
10 from foolscap.logging import log
11 from pycryptopp.publickey import rsa
12
13 import allmydata
14 from allmydata.storage import StorageServer
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.download import Downloader
17 from allmydata.immutable.filenode import FileNode, LiteralFileNode
18 from allmydata.offloaded import Helper
19 from allmydata.control import ControlServer
20 from allmydata.introducer.client import IntroducerClient
21 from allmydata.util import hashutil, base32, pollmixin, cachedir
22 from allmydata.util.abbreviate import parse_abbreviated_size
23 from allmydata.uri import LiteralFileURI
24 from allmydata.dirnode import NewDirectoryNode
25 from allmydata.mutable.filenode import MutableFileNode, MutableWatcher
26 from allmydata.stats import StatsProvider
27 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
28      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
29
30 KiB=1024
31 MiB=1024*KiB
32 GiB=1024*MiB
33 TiB=1024*GiB
34 PiB=1024*TiB
35
36 class StubClient(Referenceable):
37     implements(RIStubClient)
38
39 def _make_secret():
40     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
41
42 class Client(node.Node, pollmixin.PollMixin):
43     implements(IStatsProducer)
44
45     PORTNUMFILE = "client.port"
46     STOREDIR = 'storage'
47     NODETYPE = "client"
48     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
49
50     # This means that if a storage server treats me as though I were a
51     # 1.0.0 storage client, it will work as they expect.
52     OLDEST_SUPPORTED_VERSION = "1.0.0"
53
54     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
55     # is the number of shares required to reconstruct a file. 'desired' means
56     # that we will abort an upload unless we can allocate space for at least
57     # this many. 'total' is the total number of shares created by encoding.
58     # If everybody has room then this is is how many we will upload.
59     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
60                                    "happy": 7,
61                                    "n": 10,
62                                    "max_segment_size": 128*KiB,
63                                    }
64
65     def __init__(self, basedir="."):
66         node.Node.__init__(self, basedir)
67         self.started_timestamp = time.time()
68         self.logSource="Client"
69         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
70         self.init_introducer_client()
71         self.init_stats_provider()
72         self.init_lease_secret()
73         self.init_storage()
74         self.init_control()
75         if self.get_config("helper", "enabled", False, boolean=True):
76             self.init_helper()
77         self.init_client()
78         self._key_generator = None
79         key_gen_furl = self.get_config("client", "key_generator.furl", None)
80         if key_gen_furl:
81             self.init_key_gen(key_gen_furl)
82         # ControlServer and Helper are attached after Tub startup
83         self.init_ftp_server()
84         self.init_sftp_server()
85
86         hotline_file = os.path.join(self.basedir,
87                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
88         if os.path.exists(hotline_file):
89             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
90             self.log("hotline file noticed (%ds old), starting timer" % age)
91             hotline = TimerService(1.0, self._check_hotline, hotline_file)
92             hotline.setServiceParent(self)
93
94         webport = self.get_config("node", "web.port", None)
95         if webport:
96             self.init_web(webport) # strports string
97
98     def read_old_config_files(self):
99         node.Node.read_old_config_files(self)
100         copy = self._copy_config_from_file
101         copy("introducer.furl", "client", "introducer.furl")
102         copy("helper.furl", "client", "helper.furl")
103         copy("key_generator.furl", "client", "key_generator.furl")
104         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
105         if os.path.exists(os.path.join(self.basedir, "no_storage")):
106             self.set_config("storage", "enabled", "false")
107         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
108             self.set_config("storage", "readonly", "true")
109         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
110             self.set_config("storage", "debug_discard", "true")
111         if os.path.exists(os.path.join(self.basedir, "run_helper")):
112             self.set_config("helper", "enabled", "true")
113
114     def init_introducer_client(self):
115         self.introducer_furl = self.get_config("client", "introducer.furl")
116         ic = IntroducerClient(self.tub, self.introducer_furl,
117                               self.nickname,
118                               str(allmydata.__version__),
119                               str(self.OLDEST_SUPPORTED_VERSION))
120         self.introducer_client = ic
121         # hold off on starting the IntroducerClient until our tub has been
122         # started, so we'll have a useful address on our RemoteReference, so
123         # that the introducer's status page will show us.
124         d = self.when_tub_ready()
125         def _start_introducer_client(res):
126             ic.setServiceParent(self)
127             # nodes that want to upload and download will need storage servers
128             ic.subscribe_to("storage")
129         d.addCallback(_start_introducer_client)
130         d.addErrback(log.err, facility="tahoe.init",
131                      level=log.BAD, umid="URyI5w")
132
133     def init_stats_provider(self):
134         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
135         self.stats_provider = StatsProvider(self, gatherer_furl)
136         self.add_service(self.stats_provider)
137         self.stats_provider.register_producer(self)
138
139     def get_stats(self):
140         return { 'node.uptime': time.time() - self.started_timestamp }
141
142     def init_lease_secret(self):
143         secret_s = self.get_or_create_private_config("secret", _make_secret)
144         self._lease_secret = base32.a2b(secret_s)
145
146     def init_storage(self):
147         # should we run a storage server (and publish it for others to use)?
148         if not self.get_config("storage", "enabled", True, boolean=True):
149             return
150         readonly = self.get_config("storage", "readonly", False, boolean=True)
151
152         storedir = os.path.join(self.basedir, self.STOREDIR)
153
154         data = self.get_config("storage", "reserved_space", None)
155         reserved = None
156         try:
157             reserved = parse_abbreviated_size(data)
158         except ValueError:
159             log.msg("[storage]reserved_space= contains unparseable value %s"
160                     % data)
161         if reserved is None:
162             reserved = 0
163         discard = self.get_config("storage", "debug_discard", False,
164                                   boolean=True)
165         ss = StorageServer(storedir,
166                            reserved_space=reserved,
167                            discard_storage=discard,
168                            readonly_storage=readonly,
169                            stats_provider=self.stats_provider)
170         self.add_service(ss)
171         d = self.when_tub_ready()
172         # we can't do registerReference until the Tub is ready
173         def _publish(res):
174             furl_file = os.path.join(self.basedir, "private", "storage.furl")
175             furl = self.tub.registerReference(ss, furlFile=furl_file)
176             ri_name = RIStorageServer.__remote_name__
177             self.introducer_client.publish(furl, "storage", ri_name)
178         d.addCallback(_publish)
179         d.addErrback(log.err, facility="tahoe.init",
180                      level=log.BAD, umid="aLGBKw")
181
182     def init_client(self):
183         helper_furl = self.get_config("client", "helper.furl", None)
184         DEP = self.DEFAULT_ENCODING_PARAMETERS
185         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
186         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
187         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
188         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
189         self.convergence = base32.a2b(convergence_s)
190         self._node_cache = weakref.WeakValueDictionary() # uri -> node
191         self.add_service(Uploader(helper_furl, self.stats_provider))
192         download_cachedir = os.path.join(self.basedir,
193                                          "private", "cache", "download")
194         self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
195         self.download_cache.setServiceParent(self)
196         self.add_service(Downloader(self.stats_provider))
197         self.add_service(MutableWatcher(self.stats_provider))
198         def _publish(res):
199             # we publish an empty object so that the introducer can count how
200             # many clients are connected and see what versions they're
201             # running.
202             sc = StubClient()
203             furl = self.tub.registerReference(sc)
204             ri_name = RIStubClient.__remote_name__
205             self.introducer_client.publish(furl, "stub_client", ri_name)
206         d = self.when_tub_ready()
207         d.addCallback(_publish)
208         d.addErrback(log.err, facility="tahoe.init",
209                      level=log.BAD, umid="OEHq3g")
210
211     def init_control(self):
212         d = self.when_tub_ready()
213         def _publish(res):
214             c = ControlServer()
215             c.setServiceParent(self)
216             control_url = self.tub.registerReference(c)
217             self.write_private_config("control.furl", control_url + "\n")
218         d.addCallback(_publish)
219         d.addErrback(log.err, facility="tahoe.init",
220                      level=log.BAD, umid="d3tNXA")
221
222     def init_helper(self):
223         d = self.when_tub_ready()
224         def _publish(self):
225             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
226             h.setServiceParent(self)
227             # TODO: this is confusing. BASEDIR/private/helper.furl is created
228             # by the helper. BASEDIR/helper.furl is consumed by the client
229             # who wants to use the helper. I like having the filename be the
230             # same, since that makes 'cp' work smoothly, but the difference
231             # between config inputs and generated outputs is hard to see.
232             helper_furlfile = os.path.join(self.basedir,
233                                            "private", "helper.furl")
234             self.tub.registerReference(h, furlFile=helper_furlfile)
235         d.addCallback(_publish)
236         d.addErrback(log.err, facility="tahoe.init",
237                      level=log.BAD, umid="K0mW5w")
238
239     def init_key_gen(self, key_gen_furl):
240         d = self.when_tub_ready()
241         def _subscribe(self):
242             self.tub.connectTo(key_gen_furl, self._got_key_generator)
243         d.addCallback(_subscribe)
244         d.addErrback(log.err, facility="tahoe.init",
245                      level=log.BAD, umid="z9DMzw")
246
247     def _got_key_generator(self, key_generator):
248         self._key_generator = key_generator
249         key_generator.notifyOnDisconnect(self._lost_key_generator)
250
251     def _lost_key_generator(self):
252         self._key_generator = None
253
254     def init_web(self, webport):
255         self.log("init_web(webport=%s)", args=(webport,))
256
257         from allmydata.webish import WebishServer
258         nodeurl_path = os.path.join(self.basedir, "node.url")
259         staticdir = self.get_config("node", "web.static", "public_html")
260         staticdir = os.path.expanduser(staticdir)
261         ws = WebishServer(webport, nodeurl_path, staticdir)
262         self.add_service(ws)
263
264     def init_ftp_server(self):
265         if self.get_config("ftpd", "enabled", False, boolean=True):
266             accountfile = self.get_config("ftpd", "accounts.file", None)
267             accounturl = self.get_config("ftpd", "accounts.url", None)
268             ftp_portstr = self.get_config("ftpd", "port", "8021")
269
270             from allmydata.frontends import ftpd
271             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
272             s.setServiceParent(self)
273
274     def init_sftp_server(self):
275         if self.get_config("sftpd", "enabled", False, boolean=True):
276             accountfile = self.get_config("sftpd", "accounts.file", None)
277             accounturl = self.get_config("sftpd", "accounts.url", None)
278             sftp_portstr = self.get_config("sftpd", "port", "8022")
279             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
280             privkey_file = self.get_config("sftpd", "host_privkey_file")
281
282             from allmydata.frontends import sftpd
283             s = sftpd.SFTPServer(self, accountfile, accounturl,
284                                  sftp_portstr, pubkey_file, privkey_file)
285             s.setServiceParent(self)
286
287     def _check_hotline(self, hotline_file):
288         if os.path.exists(hotline_file):
289             mtime = os.stat(hotline_file)[stat.ST_MTIME]
290             if mtime > time.time() - 20.0:
291                 return
292             else:
293                 self.log("hotline file too old, shutting down")
294         else:
295             self.log("hotline file missing, shutting down")
296         reactor.stop()
297
298     def get_all_peerids(self):
299         return self.introducer_client.get_all_peerids()
300     def get_nickname_for_peerid(self, peerid):
301         return self.introducer_client.get_nickname_for_peerid(peerid)
302
303     def get_permuted_peers(self, service_name, key):
304         """
305         @return: list of (peerid, connection,)
306         """
307         assert isinstance(service_name, str)
308         assert isinstance(key, str)
309         return self.introducer_client.get_permuted_peers(service_name, key)
310
311     def get_encoding_parameters(self):
312         return self.DEFAULT_ENCODING_PARAMETERS
313
314     def connected_to_introducer(self):
315         if self.introducer_client:
316             return self.introducer_client.connected_to_introducer()
317         return False
318
319     def get_renewal_secret(self):
320         return hashutil.my_renewal_secret_hash(self._lease_secret)
321
322     def get_cancel_secret(self):
323         return hashutil.my_cancel_secret_hash(self._lease_secret)
324
325     def debug_wait_for_client_connections(self, num_clients):
326         """Return a Deferred that fires (with None) when we have connections
327         to the given number of peers. Useful for tests that set up a
328         temporary test network and need to know when it is safe to proceed
329         with an upload or download."""
330         def _check():
331             current_clients = list(self.get_all_peerids())
332             return len(current_clients) >= num_clients
333         d = self.poll(_check, 0.5)
334         d.addCallback(lambda res: None)
335         return d
336
337
338     # these four methods are the primitives for creating filenodes and
339     # dirnodes. The first takes a URI and produces a filenode or (new-style)
340     # dirnode. The other three create brand-new filenodes/dirnodes.
341
342     def create_node_from_uri(self, u):
343         # this returns synchronously.
344         u = IURI(u)
345         u_s = u.to_string()
346         if u_s not in self._node_cache:
347             if IReadonlyNewDirectoryURI.providedBy(u):
348                 # new-style read-only dirnodes
349                 node = NewDirectoryNode(self).init_from_uri(u)
350             elif INewDirectoryURI.providedBy(u):
351                 # new-style dirnodes
352                 node = NewDirectoryNode(self).init_from_uri(u)
353             elif IFileURI.providedBy(u):
354                 if isinstance(u, LiteralFileURI):
355                     node = LiteralFileNode(u, self) # LIT
356                 else:
357                     key = base32.b2a(u.storage_index)
358                     cachefile = self.download_cache.get_file(key)
359                     node = FileNode(u, self, cachefile) # CHK
360             else:
361                 assert IMutableFileURI.providedBy(u), u
362                 node = MutableFileNode(self).init_from_uri(u)
363             self._node_cache[u_s] = node
364         return self._node_cache[u_s]
365
366     def notify_publish(self, publish_status, size):
367         self.getServiceNamed("mutable-watcher").notify_publish(publish_status,
368                                                                size)
369     def notify_retrieve(self, retrieve_status):
370         self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
371     def notify_mapupdate(self, update_status):
372         self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status)
373
374     def create_empty_dirnode(self):
375         n = NewDirectoryNode(self)
376         d = n.create(self._generate_pubprivkeys)
377         d.addCallback(lambda res: n)
378         return d
379
380     def create_mutable_file(self, contents=""):
381         n = MutableFileNode(self)
382         d = n.create(contents, self._generate_pubprivkeys)
383         d.addCallback(lambda res: n)
384         return d
385
386     def _generate_pubprivkeys(self, key_size):
387         if self._key_generator:
388             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
389             def make_key_objs((verifying_key, signing_key)):
390                 v = rsa.create_verifying_key_from_string(verifying_key)
391                 s = rsa.create_signing_key_from_string(signing_key)
392                 return v, s
393             d.addCallback(make_key_objs)
394             return d
395         else:
396             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
397             # secs
398             signer = rsa.generate(key_size)
399             verifier = signer.get_verifying_key()
400             return verifier, signer
401
402     def upload(self, uploadable):
403         uploader = self.getServiceNamed("uploader")
404         return uploader.upload(uploadable)
405
406
407     def list_all_upload_statuses(self):
408         uploader = self.getServiceNamed("uploader")
409         return uploader.list_all_upload_statuses()
410
411     def list_all_download_statuses(self):
412         downloader = self.getServiceNamed("downloader")
413         return downloader.list_all_download_statuses()
414
415     def list_all_mapupdate_statuses(self):
416         watcher = self.getServiceNamed("mutable-watcher")
417         return watcher.list_all_mapupdate_statuses()
418     def list_all_publish_statuses(self):
419         watcher = self.getServiceNamed("mutable-watcher")
420         return watcher.list_all_publish_statuses()
421     def list_all_retrieve_statuses(self):
422         watcher = self.getServiceNamed("mutable-watcher")
423         return watcher.list_all_retrieve_statuses()
424
425     def list_all_helper_statuses(self):
426         try:
427             helper = self.getServiceNamed("helper")
428         except KeyError:
429             return []
430         return helper.get_all_upload_statuses()
431