]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
add 'web.ambient_upload_authority' as a paramater to tahoe.cfg
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1
2 import os, stat, time, weakref
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
5
6 from zope.interface import implements
7 from twisted.internet import reactor
8 from twisted.application.internet import TimerService
9 from foolscap import Referenceable
10 from foolscap.logging import log
11 from pycryptopp.publickey import rsa
12
13 import allmydata
14 from allmydata.storage import StorageServer
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.download import Downloader
17 from allmydata.immutable.filenode import FileNode, LiteralFileNode
18 from allmydata.immutable.offloaded import Helper
19 from allmydata.control import ControlServer
20 from allmydata.introducer.client import IntroducerClient
21 from allmydata.util import hashutil, base32, pollmixin, cachedir
22 from allmydata.util.abbreviate import parse_abbreviated_size
23 from allmydata.uri import LiteralFileURI
24 from allmydata.dirnode import NewDirectoryNode
25 from allmydata.mutable.filenode import MutableFileNode
26 from allmydata.stats import StatsProvider
27 from allmydata.history import History
28 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
29      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
30
31 KiB=1024
32 MiB=1024*KiB
33 GiB=1024*MiB
34 TiB=1024*GiB
35 PiB=1024*TiB
36
37 class StubClient(Referenceable):
38     implements(RIStubClient)
39
40 def _make_secret():
41     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
42
43 class Client(node.Node, pollmixin.PollMixin):
44     implements(IStatsProducer)
45
46     PORTNUMFILE = "client.port"
47     STOREDIR = 'storage'
48     NODETYPE = "client"
49     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
50
51     # This means that if a storage server treats me as though I were a
52     # 1.0.0 storage client, it will work as they expect.
53     OLDEST_SUPPORTED_VERSION = "1.0.0"
54
55     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
56     # is the number of shares required to reconstruct a file. 'desired' means
57     # that we will abort an upload unless we can allocate space for at least
58     # this many. 'total' is the total number of shares created by encoding.
59     # If everybody has room then this is is how many we will upload.
60     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
61                                    "happy": 7,
62                                    "n": 10,
63                                    "max_segment_size": 128*KiB,
64                                    }
65
66     def __init__(self, basedir="."):
67         node.Node.__init__(self, basedir)
68         self.started_timestamp = time.time()
69         self.logSource="Client"
70         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
71         self.init_introducer_client()
72         self.init_stats_provider()
73         self.init_lease_secret()
74         self.init_storage()
75         self.init_control()
76         if self.get_config("helper", "enabled", False, boolean=True):
77             self.init_helper()
78         self.init_client()
79         self._key_generator = None
80         key_gen_furl = self.get_config("client", "key_generator.furl", None)
81         if key_gen_furl:
82             self.init_key_gen(key_gen_furl)
83         # ControlServer and Helper are attached after Tub startup
84         self.init_ftp_server()
85         self.init_sftp_server()
86
87         hotline_file = os.path.join(self.basedir,
88                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
89         if os.path.exists(hotline_file):
90             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
91             self.log("hotline file noticed (%ds old), starting timer" % age)
92             hotline = TimerService(1.0, self._check_hotline, hotline_file)
93             hotline.setServiceParent(self)
94
95         webport = self.get_config("node", "web.port", None)
96         if webport:
97             self.init_web(webport) # strports string
98
99     def read_old_config_files(self):
100         node.Node.read_old_config_files(self)
101         copy = self._copy_config_from_file
102         copy("introducer.furl", "client", "introducer.furl")
103         copy("helper.furl", "client", "helper.furl")
104         copy("key_generator.furl", "client", "key_generator.furl")
105         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
106         if os.path.exists(os.path.join(self.basedir, "no_storage")):
107             self.set_config("storage", "enabled", "false")
108         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
109             self.set_config("storage", "readonly", "true")
110         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
111             self.set_config("storage", "debug_discard", "true")
112         if os.path.exists(os.path.join(self.basedir, "run_helper")):
113             self.set_config("helper", "enabled", "true")
114
115     def init_introducer_client(self):
116         self.introducer_furl = self.get_config("client", "introducer.furl")
117         ic = IntroducerClient(self.tub, self.introducer_furl,
118                               self.nickname,
119                               str(allmydata.__version__),
120                               str(self.OLDEST_SUPPORTED_VERSION))
121         self.introducer_client = ic
122         # hold off on starting the IntroducerClient until our tub has been
123         # started, so we'll have a useful address on our RemoteReference, so
124         # that the introducer's status page will show us.
125         d = self.when_tub_ready()
126         def _start_introducer_client(res):
127             ic.setServiceParent(self)
128             # nodes that want to upload and download will need storage servers
129             ic.subscribe_to("storage")
130         d.addCallback(_start_introducer_client)
131         d.addErrback(log.err, facility="tahoe.init",
132                      level=log.BAD, umid="URyI5w")
133
134     def init_stats_provider(self):
135         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
136         self.stats_provider = StatsProvider(self, gatherer_furl)
137         self.add_service(self.stats_provider)
138         self.stats_provider.register_producer(self)
139
140     def get_stats(self):
141         return { 'node.uptime': time.time() - self.started_timestamp }
142
143     def init_lease_secret(self):
144         secret_s = self.get_or_create_private_config("secret", _make_secret)
145         self._lease_secret = base32.a2b(secret_s)
146
147     def init_storage(self):
148         # should we run a storage server (and publish it for others to use)?
149         if not self.get_config("storage", "enabled", True, boolean=True):
150             return
151         readonly = self.get_config("storage", "readonly", False, boolean=True)
152
153         storedir = os.path.join(self.basedir, self.STOREDIR)
154
155         data = self.get_config("storage", "reserved_space", None)
156         reserved = None
157         try:
158             reserved = parse_abbreviated_size(data)
159         except ValueError:
160             log.msg("[storage]reserved_space= contains unparseable value %s"
161                     % data)
162         if reserved is None:
163             reserved = 0
164         discard = self.get_config("storage", "debug_discard", False,
165                                   boolean=True)
166         ss = StorageServer(storedir,
167                            reserved_space=reserved,
168                            discard_storage=discard,
169                            readonly_storage=readonly,
170                            stats_provider=self.stats_provider)
171         self.add_service(ss)
172         d = self.when_tub_ready()
173         # we can't do registerReference until the Tub is ready
174         def _publish(res):
175             furl_file = os.path.join(self.basedir, "private", "storage.furl")
176             furl = self.tub.registerReference(ss, furlFile=furl_file)
177             ri_name = RIStorageServer.__remote_name__
178             self.introducer_client.publish(furl, "storage", ri_name)
179         d.addCallback(_publish)
180         d.addErrback(log.err, facility="tahoe.init",
181                      level=log.BAD, umid="aLGBKw")
182
183     def init_client(self):
184         helper_furl = self.get_config("client", "helper.furl", None)
185         DEP = self.DEFAULT_ENCODING_PARAMETERS
186         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
187         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
188         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
189         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
190         self.convergence = base32.a2b(convergence_s)
191         self._node_cache = weakref.WeakValueDictionary() # uri -> node
192         self.add_service(History(self.stats_provider))
193         self.add_service(Uploader(helper_furl, self.stats_provider))
194         download_cachedir = os.path.join(self.basedir,
195                                          "private", "cache", "download")
196         self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
197         self.download_cache.setServiceParent(self)
198         self.add_service(Downloader(self.stats_provider))
199         def _publish(res):
200             # we publish an empty object so that the introducer can count how
201             # many clients are connected and see what versions they're
202             # running.
203             sc = StubClient()
204             furl = self.tub.registerReference(sc)
205             ri_name = RIStubClient.__remote_name__
206             self.introducer_client.publish(furl, "stub_client", ri_name)
207         d = self.when_tub_ready()
208         d.addCallback(_publish)
209         d.addErrback(log.err, facility="tahoe.init",
210                      level=log.BAD, umid="OEHq3g")
211
212     def get_history(self):
213         return self.getServiceNamed("history")
214
215     def init_control(self):
216         d = self.when_tub_ready()
217         def _publish(res):
218             c = ControlServer()
219             c.setServiceParent(self)
220             control_url = self.tub.registerReference(c)
221             self.write_private_config("control.furl", control_url + "\n")
222         d.addCallback(_publish)
223         d.addErrback(log.err, facility="tahoe.init",
224                      level=log.BAD, umid="d3tNXA")
225
226     def init_helper(self):
227         d = self.when_tub_ready()
228         def _publish(self):
229             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
230             h.setServiceParent(self)
231             # TODO: this is confusing. BASEDIR/private/helper.furl is created
232             # by the helper. BASEDIR/helper.furl is consumed by the client
233             # who wants to use the helper. I like having the filename be the
234             # same, since that makes 'cp' work smoothly, but the difference
235             # between config inputs and generated outputs is hard to see.
236             helper_furlfile = os.path.join(self.basedir,
237                                            "private", "helper.furl")
238             self.tub.registerReference(h, furlFile=helper_furlfile)
239         d.addCallback(_publish)
240         d.addErrback(log.err, facility="tahoe.init",
241                      level=log.BAD, umid="K0mW5w")
242
243     def init_key_gen(self, key_gen_furl):
244         d = self.when_tub_ready()
245         def _subscribe(self):
246             self.tub.connectTo(key_gen_furl, self._got_key_generator)
247         d.addCallback(_subscribe)
248         d.addErrback(log.err, facility="tahoe.init",
249                      level=log.BAD, umid="z9DMzw")
250
251     def _got_key_generator(self, key_generator):
252         self._key_generator = key_generator
253         key_generator.notifyOnDisconnect(self._lost_key_generator)
254
255     def _lost_key_generator(self):
256         self._key_generator = None
257
258     def get_servers(self, service_name):
259         """ Return set of (peerid, versioned-rref) """
260         assert isinstance(service_name, str)
261         return self.introducer_client.get_peers(service_name)
262
263     def init_web(self, webport):
264         self.log("init_web(webport=%s)", args=(webport,))
265
266         from allmydata.webish import WebishServer
267         nodeurl_path = os.path.join(self.basedir, "node.url")
268         staticdir = self.get_config("node", "web.static", "public_html")
269         staticdir = os.path.expanduser(staticdir)
270         # should we provide ambient upload authority?
271         ambientUploadAuthority = self.get_config("node", "web.ambient_upload_authority", True, boolean=True)
272         ws = WebishServer(webport, nodeurl_path, staticdir, ambientUploadAuthority)
273         self.add_service(ws)
274
275     def init_ftp_server(self):
276         if self.get_config("ftpd", "enabled", False, boolean=True):
277             accountfile = self.get_config("ftpd", "accounts.file", None)
278             accounturl = self.get_config("ftpd", "accounts.url", None)
279             ftp_portstr = self.get_config("ftpd", "port", "8021")
280
281             from allmydata.frontends import ftpd
282             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
283             s.setServiceParent(self)
284
285     def init_sftp_server(self):
286         if self.get_config("sftpd", "enabled", False, boolean=True):
287             accountfile = self.get_config("sftpd", "accounts.file", None)
288             accounturl = self.get_config("sftpd", "accounts.url", None)
289             sftp_portstr = self.get_config("sftpd", "port", "8022")
290             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
291             privkey_file = self.get_config("sftpd", "host_privkey_file")
292
293             from allmydata.frontends import sftpd
294             s = sftpd.SFTPServer(self, accountfile, accounturl,
295                                  sftp_portstr, pubkey_file, privkey_file)
296             s.setServiceParent(self)
297
298     def _check_hotline(self, hotline_file):
299         if os.path.exists(hotline_file):
300             mtime = os.stat(hotline_file)[stat.ST_MTIME]
301             if mtime > time.time() - 40.0:
302                 return
303             else:
304                 self.log("hotline file too old, shutting down")
305         else:
306             self.log("hotline file missing, shutting down")
307         reactor.stop()
308
309     def get_all_peerids(self):
310         return self.introducer_client.get_all_peerids()
311     def get_nickname_for_peerid(self, peerid):
312         return self.introducer_client.get_nickname_for_peerid(peerid)
313
314     def get_permuted_peers(self, service_name, key):
315         """
316         @return: list of (peerid, connection,)
317         """
318         assert isinstance(service_name, str)
319         assert isinstance(key, str)
320         return self.introducer_client.get_permuted_peers(service_name, key)
321
322     def get_encoding_parameters(self):
323         return self.DEFAULT_ENCODING_PARAMETERS
324
325     def connected_to_introducer(self):
326         if self.introducer_client:
327             return self.introducer_client.connected_to_introducer()
328         return False
329
330     def get_renewal_secret(self):
331         return hashutil.my_renewal_secret_hash(self._lease_secret)
332
333     def get_cancel_secret(self):
334         return hashutil.my_cancel_secret_hash(self._lease_secret)
335
336     def debug_wait_for_client_connections(self, num_clients):
337         """Return a Deferred that fires (with None) when we have connections
338         to the given number of peers. Useful for tests that set up a
339         temporary test network and need to know when it is safe to proceed
340         with an upload or download."""
341         def _check():
342             current_clients = list(self.get_all_peerids())
343             return len(current_clients) >= num_clients
344         d = self.poll(_check, 0.5)
345         d.addCallback(lambda res: None)
346         return d
347
348
349     # these four methods are the primitives for creating filenodes and
350     # dirnodes. The first takes a URI and produces a filenode or (new-style)
351     # dirnode. The other three create brand-new filenodes/dirnodes.
352
353     def create_node_from_uri(self, u):
354         # this returns synchronously.
355         u = IURI(u)
356         u_s = u.to_string()
357         if u_s not in self._node_cache:
358             if IReadonlyNewDirectoryURI.providedBy(u):
359                 # new-style read-only dirnodes
360                 node = NewDirectoryNode(self).init_from_uri(u)
361             elif INewDirectoryURI.providedBy(u):
362                 # new-style dirnodes
363                 node = NewDirectoryNode(self).init_from_uri(u)
364             elif IFileURI.providedBy(u):
365                 if isinstance(u, LiteralFileURI):
366                     node = LiteralFileNode(u, self) # LIT
367                 else:
368                     key = base32.b2a(u.storage_index)
369                     cachefile = self.download_cache.get_file(key)
370                     node = FileNode(u, self, cachefile) # CHK
371             else:
372                 assert IMutableFileURI.providedBy(u), u
373                 node = MutableFileNode(self).init_from_uri(u)
374             self._node_cache[u_s] = node
375         return self._node_cache[u_s]
376
377     def create_empty_dirnode(self):
378         n = NewDirectoryNode(self)
379         d = n.create(self._generate_pubprivkeys)
380         d.addCallback(lambda res: n)
381         return d
382
383     def create_mutable_file(self, contents=""):
384         n = MutableFileNode(self)
385         d = n.create(contents, self._generate_pubprivkeys)
386         d.addCallback(lambda res: n)
387         return d
388
389     def _generate_pubprivkeys(self, key_size):
390         if self._key_generator:
391             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
392             def make_key_objs((verifying_key, signing_key)):
393                 v = rsa.create_verifying_key_from_string(verifying_key)
394                 s = rsa.create_signing_key_from_string(signing_key)
395                 return v, s
396             d.addCallback(make_key_objs)
397             return d
398         else:
399             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
400             # secs
401             signer = rsa.generate(key_size)
402             verifier = signer.get_verifying_key()
403             return verifier, signer
404
405     def upload(self, uploadable):
406         uploader = self.getServiceNamed("uploader")
407         return uploader.upload(uploadable, history=self.get_history())
408
409
410     def list_all_upload_statuses(self):
411         return self.get_history().list_all_upload_statuses()
412
413     def list_all_download_statuses(self):
414         return self.get_history().list_all_download_statuses()
415
416     def list_all_mapupdate_statuses(self):
417         return self.get_history().list_all_mapupdate_statuses()
418     def list_all_publish_statuses(self):
419         return self.get_history().list_all_publish_statuses()
420     def list_all_retrieve_statuses(self):
421         return self.get_history().list_all_retrieve_statuses()
422
423     def list_all_helper_statuses(self):
424         try:
425             helper = self.getServiceNamed("helper")
426         except KeyError:
427             return []
428         return helper.get_all_upload_statuses()
429