]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
webapi: serve the /static URL tree from /public_html (configurable)
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1
2 import os, stat, time, re, weakref
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
5
6 from zope.interface import implements
7 from twisted.internet import reactor
8 from twisted.application.internet import TimerService
9 from foolscap import Referenceable
10 from foolscap.logging import log
11 from pycryptopp.publickey import rsa
12
13 import allmydata
14 from allmydata.storage import StorageServer
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.download import Downloader
17 from allmydata.immutable.filenode import FileNode, LiteralFileNode
18 from allmydata.offloaded import Helper
19 from allmydata.control import ControlServer
20 from allmydata.introducer.client import IntroducerClient
21 from allmydata.util import hashutil, base32, pollmixin, fileutil
22 from allmydata.uri import LiteralFileURI
23 from allmydata.dirnode import NewDirectoryNode
24 from allmydata.mutable.node import MutableFileNode, MutableWatcher
25 from allmydata.stats import StatsProvider
26 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
27      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
28
29 KiB=1024
30 MiB=1024*KiB
31 GiB=1024*MiB
32 TiB=1024*GiB
33 PiB=1024*TiB
34
35 class StubClient(Referenceable):
36     implements(RIStubClient)
37
38 def _make_secret():
39     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
40
41 class Client(node.Node, pollmixin.PollMixin):
42     implements(IStatsProducer)
43
44     PORTNUMFILE = "client.port"
45     STOREDIR = 'storage'
46     NODETYPE = "client"
47     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
48
49     # This means that if a storage server treats me as though I were a
50     # 1.0.0 storage client, it will work as they expect.
51     OLDEST_SUPPORTED_VERSION = "1.0.0"
52
53     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
54     # is the number of shares required to reconstruct a file. 'desired' means
55     # that we will abort an upload unless we can allocate space for at least
56     # this many. 'total' is the total number of shares created by encoding.
57     # If everybody has room then this is is how many we will upload.
58     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
59                                    "happy": 7,
60                                    "n": 10,
61                                    "max_segment_size": 128*KiB,
62                                    }
63
64     def __init__(self, basedir="."):
65         node.Node.__init__(self, basedir)
66         self.started_timestamp = time.time()
67         self.logSource="Client"
68         self.init_introducer_client()
69         self.init_stats_provider()
70         self.init_lease_secret()
71         self.init_storage()
72         self.init_control()
73         if self.get_config("helper", "enabled", False, boolean=True):
74             self.init_helper()
75         self.init_client()
76         self._key_generator = None
77         key_gen_furl = self.get_config("client", "key_generator.furl", None)
78         if key_gen_furl:
79             self.init_key_gen(key_gen_furl)
80         # ControlServer and Helper are attached after Tub startup
81         self.init_ftp_server()
82
83         hotline_file = os.path.join(self.basedir,
84                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
85         if os.path.exists(hotline_file):
86             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
87             self.log("hotline file noticed (%ds old), starting timer" % age)
88             hotline = TimerService(1.0, self._check_hotline, hotline_file)
89             hotline.setServiceParent(self)
90
91         webport = self.get_config("node", "web.port", None)
92         if webport:
93             self.init_web(webport) # strports string
94
95     def read_old_config_files(self):
96         node.Node.read_old_config_files(self)
97         copy = self._copy_config_from_file
98         copy("introducer.furl", "client", "introducer.furl")
99         copy("helper.furl", "client", "helper.furl")
100         copy("key_generator.furl", "client", "key_generator.furl")
101         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
102         if os.path.exists(os.path.join(self.basedir, "no_storage")):
103             self.set_config("storage", "enabled", "false")
104         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
105             self.set_config("storage", "readonly", "true")
106         copy("sizelimit", "storage", "sizelimit")
107         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
108             self.set_config("storage", "debug_discard", "true")
109         if os.path.exists(os.path.join(self.basedir, "run_helper")):
110             self.set_config("helper", "enabled", "true")
111
112     def init_introducer_client(self):
113         self.introducer_furl = self.get_config("client", "introducer.furl")
114         ic = IntroducerClient(self.tub, self.introducer_furl,
115                               self.nickname,
116                               str(allmydata.__version__),
117                               str(self.OLDEST_SUPPORTED_VERSION))
118         self.introducer_client = ic
119         # hold off on starting the IntroducerClient until our tub has been
120         # started, so we'll have a useful address on our RemoteReference, so
121         # that the introducer's status page will show us.
122         d = self.when_tub_ready()
123         def _start_introducer_client(res):
124             ic.setServiceParent(self)
125             # nodes that want to upload and download will need storage servers
126             ic.subscribe_to("storage")
127         d.addCallback(_start_introducer_client)
128         d.addErrback(log.err, facility="tahoe.init",
129                      level=log.BAD, umid="URyI5w")
130
131     def init_stats_provider(self):
132         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
133         self.stats_provider = StatsProvider(self, gatherer_furl)
134         self.add_service(self.stats_provider)
135         self.stats_provider.register_producer(self)
136
137     def get_stats(self):
138         return { 'node.uptime': time.time() - self.started_timestamp }
139
140     def init_lease_secret(self):
141         secret_s = self.get_or_create_private_config("secret", _make_secret)
142         self._lease_secret = base32.a2b(secret_s)
143
144     def init_storage(self):
145         # should we run a storage server (and publish it for others to use)?
146         if not self.get_config("storage", "enabled", True, boolean=True):
147             return
148         readonly = self.get_config("storage", "readonly", False, boolean=True)
149
150         storedir = os.path.join(self.basedir, self.STOREDIR)
151
152         sizelimit = None
153         data = self.get_config("storage", "sizelimit", None)
154         if data:
155             m = re.match(r"^(\d+)([kKmMgG]?[bB]?)$", data)
156             if not m:
157                 log.msg("SIZELIMIT_FILE contains unparseable value %s" % data)
158             else:
159                 number, suffix = m.groups()
160                 suffix = suffix.upper()
161                 if suffix.endswith("B"):
162                     suffix = suffix[:-1]
163                 multiplier = {"": 1,
164                               "K": 1000,
165                               "M": 1000 * 1000,
166                               "G": 1000 * 1000 * 1000,
167                               }[suffix]
168                 sizelimit = int(number) * multiplier
169         discard = self.get_config("storage", "debug_discard", False,
170                                   boolean=True)
171         ss = StorageServer(storedir, sizelimit, discard, readonly,
172                            self.stats_provider)
173         self.add_service(ss)
174         d = self.when_tub_ready()
175         # we can't do registerReference until the Tub is ready
176         def _publish(res):
177             furl_file = os.path.join(self.basedir, "private", "storage.furl")
178             furl = self.tub.registerReference(ss, furlFile=furl_file)
179             ri_name = RIStorageServer.__remote_name__
180             self.introducer_client.publish(furl, "storage", ri_name)
181         d.addCallback(_publish)
182         d.addErrback(log.err, facility="tahoe.init",
183                      level=log.BAD, umid="aLGBKw")
184
185     def init_client(self):
186         helper_furl = self.get_config("client", "helper.furl", None)
187         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
188         self.convergence = base32.a2b(convergence_s)
189         self._node_cache = weakref.WeakValueDictionary() # uri -> node
190         self.add_service(Uploader(helper_furl, self.stats_provider))
191         self.download_cachedir = os.path.join(self.basedir,
192                                               "private", "cache", "download")
193         fileutil.make_dirs(self.download_cachedir)
194         self.add_service(Downloader(self.stats_provider))
195         self.add_service(MutableWatcher(self.stats_provider))
196         def _publish(res):
197             # we publish an empty object so that the introducer can count how
198             # many clients are connected and see what versions they're
199             # running.
200             sc = StubClient()
201             furl = self.tub.registerReference(sc)
202             ri_name = RIStubClient.__remote_name__
203             self.introducer_client.publish(furl, "stub_client", ri_name)
204         d = self.when_tub_ready()
205         d.addCallback(_publish)
206         d.addErrback(log.err, facility="tahoe.init",
207                      level=log.BAD, umid="OEHq3g")
208
209     def init_control(self):
210         d = self.when_tub_ready()
211         def _publish(res):
212             c = ControlServer()
213             c.setServiceParent(self)
214             control_url = self.tub.registerReference(c)
215             self.write_private_config("control.furl", control_url + "\n")
216         d.addCallback(_publish)
217         d.addErrback(log.err, facility="tahoe.init",
218                      level=log.BAD, umid="d3tNXA")
219
220     def init_helper(self):
221         d = self.when_tub_ready()
222         def _publish(self):
223             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
224             h.setServiceParent(self)
225             # TODO: this is confusing. BASEDIR/private/helper.furl is created
226             # by the helper. BASEDIR/helper.furl is consumed by the client
227             # who wants to use the helper. I like having the filename be the
228             # same, since that makes 'cp' work smoothly, but the difference
229             # between config inputs and generated outputs is hard to see.
230             helper_furlfile = os.path.join(self.basedir,
231                                            "private", "helper.furl")
232             self.tub.registerReference(h, furlFile=helper_furlfile)
233         d.addCallback(_publish)
234         d.addErrback(log.err, facility="tahoe.init",
235                      level=log.BAD, umid="K0mW5w")
236
237     def init_key_gen(self, key_gen_furl):
238         d = self.when_tub_ready()
239         def _subscribe(self):
240             self.tub.connectTo(key_gen_furl, self._got_key_generator)
241         d.addCallback(_subscribe)
242         d.addErrback(log.err, facility="tahoe.init",
243                      level=log.BAD, umid="z9DMzw")
244
245     def _got_key_generator(self, key_generator):
246         self._key_generator = key_generator
247         key_generator.notifyOnDisconnect(self._lost_key_generator)
248
249     def _lost_key_generator(self):
250         self._key_generator = None
251
252     def init_web(self, webport):
253         self.log("init_web(webport=%s)", args=(webport,))
254
255         from allmydata.webish import WebishServer
256         nodeurl_path = os.path.join(self.basedir, "node.url")
257         staticdir = self.get_config("node", "web.static", "public_html")
258         staticdir = os.path.expanduser(staticdir)
259         ws = WebishServer(webport, nodeurl_path, staticdir)
260         self.add_service(ws)
261
262     def init_ftp_server(self):
263         if self.get_config("ftpd", "enabled", False, boolean=True):
264             accountfile = self.get_config("ftpd", "ftp.accounts.file", None)
265             accounturl = self.get_config("ftpd", "ftp.accounts.url", None)
266             ftp_portstr = self.get_config("ftpd", "ftp.port", "8021")
267
268             from allmydata import ftpd
269             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
270             s.setServiceParent(self)
271
272     def _check_hotline(self, hotline_file):
273         if os.path.exists(hotline_file):
274             mtime = os.stat(hotline_file)[stat.ST_MTIME]
275             if mtime > time.time() - 20.0:
276                 return
277             else:
278                 self.log("hotline file too old, shutting down")
279         else:
280             self.log("hotline file missing, shutting down")
281         reactor.stop()
282
283     def get_all_peerids(self):
284         return self.introducer_client.get_all_peerids()
285     def get_nickname_for_peerid(self, peerid):
286         return self.introducer_client.get_nickname_for_peerid(peerid)
287
288     def get_permuted_peers(self, service_name, key):
289         """
290         @return: list of (peerid, connection,)
291         """
292         assert isinstance(service_name, str)
293         assert isinstance(key, str)
294         return self.introducer_client.get_permuted_peers(service_name, key)
295
296     def get_encoding_parameters(self):
297         return self.DEFAULT_ENCODING_PARAMETERS
298
299     def connected_to_introducer(self):
300         if self.introducer_client:
301             return self.introducer_client.connected_to_introducer()
302         return False
303
304     def get_renewal_secret(self):
305         return hashutil.my_renewal_secret_hash(self._lease_secret)
306
307     def get_cancel_secret(self):
308         return hashutil.my_cancel_secret_hash(self._lease_secret)
309
310     def debug_wait_for_client_connections(self, num_clients):
311         """Return a Deferred that fires (with None) when we have connections
312         to the given number of peers. Useful for tests that set up a
313         temporary test network and need to know when it is safe to proceed
314         with an upload or download."""
315         def _check():
316             current_clients = list(self.get_all_peerids())
317             return len(current_clients) >= num_clients
318         d = self.poll(_check, 0.5)
319         d.addCallback(lambda res: None)
320         return d
321
322
323     # these four methods are the primitives for creating filenodes and
324     # dirnodes. The first takes a URI and produces a filenode or (new-style)
325     # dirnode. The other three create brand-new filenodes/dirnodes.
326
327     def create_node_from_uri(self, u):
328         # this returns synchronously.
329         u = IURI(u)
330         u_s = u.to_string()
331         if u_s not in self._node_cache:
332             if IReadonlyNewDirectoryURI.providedBy(u):
333                 # new-style read-only dirnodes
334                 node = NewDirectoryNode(self).init_from_uri(u)
335             elif INewDirectoryURI.providedBy(u):
336                 # new-style dirnodes
337                 node = NewDirectoryNode(self).init_from_uri(u)
338             elif IFileURI.providedBy(u):
339                 if isinstance(u, LiteralFileURI):
340                     node = LiteralFileNode(u, self) # LIT
341                 else:
342                     cachefile = os.path.join(self.download_cachedir,
343                                              base32.b2a(u.storage_index))
344                     # TODO: cachefile manager, weakref, expire policy
345                     node = FileNode(u, self, cachefile) # CHK
346             else:
347                 assert IMutableFileURI.providedBy(u), u
348                 node = MutableFileNode(self).init_from_uri(u)
349             self._node_cache[u_s] = node
350         return self._node_cache[u_s]
351
352     def notify_publish(self, publish_status, size):
353         self.getServiceNamed("mutable-watcher").notify_publish(publish_status,
354                                                                size)
355     def notify_retrieve(self, retrieve_status):
356         self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
357     def notify_mapupdate(self, update_status):
358         self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status)
359
360     def create_empty_dirnode(self):
361         n = NewDirectoryNode(self)
362         d = n.create(self._generate_pubprivkeys)
363         d.addCallback(lambda res: n)
364         return d
365
366     def create_mutable_file(self, contents=""):
367         n = MutableFileNode(self)
368         d = n.create(contents, self._generate_pubprivkeys)
369         d.addCallback(lambda res: n)
370         return d
371
372     def _generate_pubprivkeys(self, key_size):
373         if self._key_generator:
374             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
375             def make_key_objs((verifying_key, signing_key)):
376                 v = rsa.create_verifying_key_from_string(verifying_key)
377                 s = rsa.create_signing_key_from_string(signing_key)
378                 return v, s
379             d.addCallback(make_key_objs)
380             return d
381         else:
382             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
383             # secs
384             signer = rsa.generate(key_size)
385             verifier = signer.get_verifying_key()
386             return verifier, signer
387
388     def upload(self, uploadable):
389         uploader = self.getServiceNamed("uploader")
390         return uploader.upload(uploadable)
391
392
393     def list_all_upload_statuses(self):
394         uploader = self.getServiceNamed("uploader")
395         return uploader.list_all_upload_statuses()
396
397     def list_all_download_statuses(self):
398         downloader = self.getServiceNamed("downloader")
399         return downloader.list_all_download_statuses()
400
401     def list_all_mapupdate_statuses(self):
402         watcher = self.getServiceNamed("mutable-watcher")
403         return watcher.list_all_mapupdate_statuses()
404     def list_all_publish_statuses(self):
405         watcher = self.getServiceNamed("mutable-watcher")
406         return watcher.list_all_publish_statuses()
407     def list_all_retrieve_statuses(self):
408         watcher = self.getServiceNamed("mutable-watcher")
409         return watcher.list_all_retrieve_statuses()
410
411     def list_all_helper_statuses(self):
412         try:
413             helper = self.getServiceNamed("helper")
414         except KeyError:
415             return []
416         return helper.get_all_upload_statuses()
417