]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
switch all foolscap imports to use foolscap.api or foolscap.logging
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
4
5 from zope.interface import implements
6 from twisted.internet import reactor
7 from twisted.application.internet import TimerService
8 from foolscap.api import Referenceable
9 from foolscap.logging import log
10 from pycryptopp.publickey import rsa
11
12 import allmydata
13 from allmydata.storage.server import StorageServer
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.download import Downloader
16 from allmydata.immutable.filenode import FileNode, LiteralFileNode
17 from allmydata.immutable.offloaded import Helper
18 from allmydata.control import ControlServer
19 from allmydata.introducer.client import IntroducerClient
20 from allmydata.util import hashutil, base32, pollmixin, cachedir
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.uri import LiteralFileURI
24 from allmydata.dirnode import NewDirectoryNode
25 from allmydata.mutable.filenode import MutableFileNode
26 from allmydata.stats import StatsProvider
27 from allmydata.history import History
28 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
29      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
30
31 KiB=1024
32 MiB=1024*KiB
33 GiB=1024*MiB
34 TiB=1024*GiB
35 PiB=1024*TiB
36
37 class StubClient(Referenceable):
38     implements(RIStubClient)
39
40 def _make_secret():
41     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
42
43 class Client(node.Node, pollmixin.PollMixin):
44     implements(IStatsProducer)
45
46     PORTNUMFILE = "client.port"
47     STOREDIR = 'storage'
48     NODETYPE = "client"
49     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
50
51     # This means that if a storage server treats me as though I were a
52     # 1.0.0 storage client, it will work as they expect.
53     OLDEST_SUPPORTED_VERSION = "1.0.0"
54
55     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
56     # is the number of shares required to reconstruct a file. 'desired' means
57     # that we will abort an upload unless we can allocate space for at least
58     # this many. 'total' is the total number of shares created by encoding.
59     # If everybody has room then this is is how many we will upload.
60     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
61                                    "happy": 7,
62                                    "n": 10,
63                                    "max_segment_size": 128*KiB,
64                                    }
65
66     def __init__(self, basedir="."):
67         node.Node.__init__(self, basedir)
68         self.started_timestamp = time.time()
69         self.logSource="Client"
70         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
71         self.init_introducer_client()
72         self.init_stats_provider()
73         self.init_lease_secret()
74         self.init_storage()
75         self.init_control()
76         if self.get_config("helper", "enabled", False, boolean=True):
77             self.init_helper()
78         self.init_client()
79         self._key_generator = None
80         key_gen_furl = self.get_config("client", "key_generator.furl", None)
81         if key_gen_furl:
82             self.init_key_gen(key_gen_furl)
83         # ControlServer and Helper are attached after Tub startup
84         self.init_ftp_server()
85         self.init_sftp_server()
86
87         hotline_file = os.path.join(self.basedir,
88                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
89         if os.path.exists(hotline_file):
90             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
91             self.log("hotline file noticed (%ds old), starting timer" % age)
92             hotline = TimerService(1.0, self._check_hotline, hotline_file)
93             hotline.setServiceParent(self)
94
95         # this needs to happen last, so it can use getServiceNamed() to
96         # acquire references to StorageServer and other web-statusable things
97         webport = self.get_config("node", "web.port", None)
98         if webport:
99             self.init_web(webport) # strports string
100
101     def read_old_config_files(self):
102         node.Node.read_old_config_files(self)
103         copy = self._copy_config_from_file
104         copy("introducer.furl", "client", "introducer.furl")
105         copy("helper.furl", "client", "helper.furl")
106         copy("key_generator.furl", "client", "key_generator.furl")
107         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
108         if os.path.exists(os.path.join(self.basedir, "no_storage")):
109             self.set_config("storage", "enabled", "false")
110         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
111             self.set_config("storage", "readonly", "true")
112         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
113             self.set_config("storage", "debug_discard", "true")
114         if os.path.exists(os.path.join(self.basedir, "run_helper")):
115             self.set_config("helper", "enabled", "true")
116
117     def init_introducer_client(self):
118         self.introducer_furl = self.get_config("client", "introducer.furl")
119         ic = IntroducerClient(self.tub, self.introducer_furl,
120                               self.nickname,
121                               str(allmydata.__full_version__),
122                               str(self.OLDEST_SUPPORTED_VERSION))
123         self.introducer_client = ic
124         # hold off on starting the IntroducerClient until our tub has been
125         # started, so we'll have a useful address on our RemoteReference, so
126         # that the introducer's status page will show us.
127         d = self.when_tub_ready()
128         def _start_introducer_client(res):
129             ic.setServiceParent(self)
130             # nodes that want to upload and download will need storage servers
131             ic.subscribe_to("storage")
132         d.addCallback(_start_introducer_client)
133         d.addErrback(log.err, facility="tahoe.init",
134                      level=log.BAD, umid="URyI5w")
135
136     def init_stats_provider(self):
137         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
138         self.stats_provider = StatsProvider(self, gatherer_furl)
139         self.add_service(self.stats_provider)
140         self.stats_provider.register_producer(self)
141
142     def get_stats(self):
143         return { 'node.uptime': time.time() - self.started_timestamp }
144
145     def init_lease_secret(self):
146         secret_s = self.get_or_create_private_config("secret", _make_secret)
147         self._lease_secret = base32.a2b(secret_s)
148
149     def init_storage(self):
150         # should we run a storage server (and publish it for others to use)?
151         if not self.get_config("storage", "enabled", True, boolean=True):
152             return
153         readonly = self.get_config("storage", "readonly", False, boolean=True)
154
155         storedir = os.path.join(self.basedir, self.STOREDIR)
156
157         data = self.get_config("storage", "reserved_space", None)
158         reserved = None
159         try:
160             reserved = parse_abbreviated_size(data)
161         except ValueError:
162             log.msg("[storage]reserved_space= contains unparseable value %s"
163                     % data)
164         if reserved is None:
165             reserved = 0
166         discard = self.get_config("storage", "debug_discard", False,
167                                   boolean=True)
168
169         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
170         if expire:
171             mode = self.get_config("storage", "expire.mode") # require a mode
172         else:
173             mode = self.get_config("storage", "expire.mode", "age")
174
175         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
176         if o_l_d is not None:
177             o_l_d = parse_duration(o_l_d)
178
179         cutoff_date = None
180         if mode == "cutoff-date":
181             cutoff_date = self.get_config("storage", "expire.cutoff_date")
182             cutoff_date = parse_date(cutoff_date)
183
184         sharetypes = []
185         if self.get_config("storage", "expire.immutable", True, boolean=True):
186             sharetypes.append("immutable")
187         if self.get_config("storage", "expire.mutable", True, boolean=True):
188             sharetypes.append("mutable")
189         expiration_sharetypes = tuple(sharetypes)
190
191         ss = StorageServer(storedir, self.nodeid,
192                            reserved_space=reserved,
193                            discard_storage=discard,
194                            readonly_storage=readonly,
195                            stats_provider=self.stats_provider,
196                            expiration_enabled=expire,
197                            expiration_mode=mode,
198                            expiration_override_lease_duration=o_l_d,
199                            expiration_cutoff_date=cutoff_date,
200                            expiration_sharetypes=expiration_sharetypes)
201         self.add_service(ss)
202
203         d = self.when_tub_ready()
204         # we can't do registerReference until the Tub is ready
205         def _publish(res):
206             furl_file = os.path.join(self.basedir, "private", "storage.furl")
207             furl = self.tub.registerReference(ss, furlFile=furl_file)
208             ri_name = RIStorageServer.__remote_name__
209             self.introducer_client.publish(furl, "storage", ri_name)
210         d.addCallback(_publish)
211         d.addErrback(log.err, facility="tahoe.init",
212                      level=log.BAD, umid="aLGBKw")
213
214     def init_client(self):
215         helper_furl = self.get_config("client", "helper.furl", None)
216         DEP = self.DEFAULT_ENCODING_PARAMETERS
217         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
218         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
219         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
220         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
221         self.convergence = base32.a2b(convergence_s)
222         self._node_cache = weakref.WeakValueDictionary() # uri -> node
223         self.add_service(History(self.stats_provider))
224         self.add_service(Uploader(helper_furl, self.stats_provider))
225         download_cachedir = os.path.join(self.basedir,
226                                          "private", "cache", "download")
227         self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
228         self.download_cache.setServiceParent(self)
229         self.add_service(Downloader(self.stats_provider))
230         self.init_stub_client()
231
232     def init_stub_client(self):
233         def _publish(res):
234             # we publish an empty object so that the introducer can count how
235             # many clients are connected and see what versions they're
236             # running.
237             sc = StubClient()
238             furl = self.tub.registerReference(sc)
239             ri_name = RIStubClient.__remote_name__
240             self.introducer_client.publish(furl, "stub_client", ri_name)
241         d = self.when_tub_ready()
242         d.addCallback(_publish)
243         d.addErrback(log.err, facility="tahoe.init",
244                      level=log.BAD, umid="OEHq3g")
245
246     def get_history(self):
247         return self.getServiceNamed("history")
248
249     def init_control(self):
250         d = self.when_tub_ready()
251         def _publish(res):
252             c = ControlServer()
253             c.setServiceParent(self)
254             control_url = self.tub.registerReference(c)
255             self.write_private_config("control.furl", control_url + "\n")
256         d.addCallback(_publish)
257         d.addErrback(log.err, facility="tahoe.init",
258                      level=log.BAD, umid="d3tNXA")
259
260     def init_helper(self):
261         d = self.when_tub_ready()
262         def _publish(self):
263             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
264             h.setServiceParent(self)
265             # TODO: this is confusing. BASEDIR/private/helper.furl is created
266             # by the helper. BASEDIR/helper.furl is consumed by the client
267             # who wants to use the helper. I like having the filename be the
268             # same, since that makes 'cp' work smoothly, but the difference
269             # between config inputs and generated outputs is hard to see.
270             helper_furlfile = os.path.join(self.basedir,
271                                            "private", "helper.furl")
272             self.tub.registerReference(h, furlFile=helper_furlfile)
273         d.addCallback(_publish)
274         d.addErrback(log.err, facility="tahoe.init",
275                      level=log.BAD, umid="K0mW5w")
276
277     def init_key_gen(self, key_gen_furl):
278         d = self.when_tub_ready()
279         def _subscribe(self):
280             self.tub.connectTo(key_gen_furl, self._got_key_generator)
281         d.addCallback(_subscribe)
282         d.addErrback(log.err, facility="tahoe.init",
283                      level=log.BAD, umid="z9DMzw")
284
285     def _got_key_generator(self, key_generator):
286         self._key_generator = key_generator
287         key_generator.notifyOnDisconnect(self._lost_key_generator)
288
289     def _lost_key_generator(self):
290         self._key_generator = None
291
292     def get_servers(self, service_name):
293         """ Return frozenset of (peerid, versioned-rref) """
294         assert isinstance(service_name, str)
295         return self.introducer_client.get_peers(service_name)
296
297     def init_web(self, webport):
298         self.log("init_web(webport=%s)", args=(webport,))
299
300         from allmydata.webish import WebishServer
301         nodeurl_path = os.path.join(self.basedir, "node.url")
302         staticdir = self.get_config("node", "web.static", "public_html")
303         staticdir = os.path.expanduser(staticdir)
304         ws = WebishServer(self, webport, nodeurl_path, staticdir)
305         self.add_service(ws)
306
307     def init_ftp_server(self):
308         if self.get_config("ftpd", "enabled", False, boolean=True):
309             accountfile = self.get_config("ftpd", "accounts.file", None)
310             accounturl = self.get_config("ftpd", "accounts.url", None)
311             ftp_portstr = self.get_config("ftpd", "port", "8021")
312
313             from allmydata.frontends import ftpd
314             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
315             s.setServiceParent(self)
316
317     def init_sftp_server(self):
318         if self.get_config("sftpd", "enabled", False, boolean=True):
319             accountfile = self.get_config("sftpd", "accounts.file", None)
320             accounturl = self.get_config("sftpd", "accounts.url", None)
321             sftp_portstr = self.get_config("sftpd", "port", "8022")
322             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
323             privkey_file = self.get_config("sftpd", "host_privkey_file")
324
325             from allmydata.frontends import sftpd
326             s = sftpd.SFTPServer(self, accountfile, accounturl,
327                                  sftp_portstr, pubkey_file, privkey_file)
328             s.setServiceParent(self)
329
330     def _check_hotline(self, hotline_file):
331         if os.path.exists(hotline_file):
332             mtime = os.stat(hotline_file)[stat.ST_MTIME]
333             if mtime > time.time() - 120.0:
334                 return
335             else:
336                 self.log("hotline file too old, shutting down")
337         else:
338             self.log("hotline file missing, shutting down")
339         reactor.stop()
340
341     def get_all_peerids(self):
342         return self.introducer_client.get_all_peerids()
343     def get_nickname_for_peerid(self, peerid):
344         return self.introducer_client.get_nickname_for_peerid(peerid)
345
346     def get_permuted_peers(self, service_name, key):
347         """
348         @return: list of (peerid, connection,)
349         """
350         assert isinstance(service_name, str)
351         assert isinstance(key, str)
352         return self.introducer_client.get_permuted_peers(service_name, key)
353
354     def get_encoding_parameters(self):
355         return self.DEFAULT_ENCODING_PARAMETERS
356
357     def connected_to_introducer(self):
358         if self.introducer_client:
359             return self.introducer_client.connected_to_introducer()
360         return False
361
362     def get_renewal_secret(self):
363         return hashutil.my_renewal_secret_hash(self._lease_secret)
364
365     def get_cancel_secret(self):
366         return hashutil.my_cancel_secret_hash(self._lease_secret)
367
368     def debug_wait_for_client_connections(self, num_clients):
369         """Return a Deferred that fires (with None) when we have connections
370         to the given number of peers. Useful for tests that set up a
371         temporary test network and need to know when it is safe to proceed
372         with an upload or download."""
373         def _check():
374             current_clients = list(self.get_all_peerids())
375             return len(current_clients) >= num_clients
376         d = self.poll(_check, 0.5)
377         d.addCallback(lambda res: None)
378         return d
379
380
381     # these four methods are the primitives for creating filenodes and
382     # dirnodes. The first takes a URI and produces a filenode or (new-style)
383     # dirnode. The other three create brand-new filenodes/dirnodes.
384
385     def create_node_from_uri(self, u):
386         # this returns synchronously.
387         u = IURI(u)
388         u_s = u.to_string()
389         if u_s not in self._node_cache:
390             if IReadonlyNewDirectoryURI.providedBy(u):
391                 # new-style read-only dirnodes
392                 node = NewDirectoryNode(self).init_from_uri(u)
393             elif INewDirectoryURI.providedBy(u):
394                 # new-style dirnodes
395                 node = NewDirectoryNode(self).init_from_uri(u)
396             elif IFileURI.providedBy(u):
397                 if isinstance(u, LiteralFileURI):
398                     node = LiteralFileNode(u, self) # LIT
399                 else:
400                     key = base32.b2a(u.storage_index)
401                     cachefile = self.download_cache.get_file(key)
402                     node = FileNode(u, self, cachefile) # CHK
403             else:
404                 assert IMutableFileURI.providedBy(u), u
405                 node = MutableFileNode(self).init_from_uri(u)
406             self._node_cache[u_s] = node
407         return self._node_cache[u_s]
408
409     def create_empty_dirnode(self):
410         n = NewDirectoryNode(self)
411         d = n.create(self._generate_pubprivkeys)
412         d.addCallback(lambda res: n)
413         return d
414
415     def create_mutable_file(self, contents=""):
416         n = MutableFileNode(self)
417         d = n.create(contents, self._generate_pubprivkeys)
418         d.addCallback(lambda res: n)
419         return d
420
421     def _generate_pubprivkeys(self, key_size):
422         if self._key_generator:
423             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
424             def make_key_objs((verifying_key, signing_key)):
425                 v = rsa.create_verifying_key_from_string(verifying_key)
426                 s = rsa.create_signing_key_from_string(signing_key)
427                 return v, s
428             d.addCallback(make_key_objs)
429             return d
430         else:
431             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
432             # secs
433             signer = rsa.generate(key_size)
434             verifier = signer.get_verifying_key()
435             return verifier, signer
436
437     def upload(self, uploadable):
438         uploader = self.getServiceNamed("uploader")
439         return uploader.upload(uploadable, history=self.get_history())
440
441
442     def list_all_upload_statuses(self):
443         return self.get_history().list_all_upload_statuses()
444
445     def list_all_download_statuses(self):
446         return self.get_history().list_all_download_statuses()
447
448     def list_all_mapupdate_statuses(self):
449         return self.get_history().list_all_mapupdate_statuses()
450     def list_all_publish_statuses(self):
451         return self.get_history().list_all_publish_statuses()
452     def list_all_retrieve_statuses(self):
453         return self.get_history().list_all_retrieve_statuses()
454
455     def list_all_helper_statuses(self):
456         try:
457             helper = self.getServiceNamed("helper")
458         except KeyError:
459             return []
460         return helper.get_all_upload_statuses()