]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
more refactoring: move get_all_serverids() and get_nickname_for_serverid() from Clien...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
4
5 from zope.interface import implements
6 from twisted.internet import reactor
7 from twisted.application.internet import TimerService
8 from foolscap.api import Referenceable
9 from foolscap.logging import log
10 from pycryptopp.publickey import rsa
11
12 import allmydata
13 from allmydata.storage.server import StorageServer
14 from allmydata import storage_client
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.download import Downloader
17 from allmydata.immutable.filenode import FileNode, LiteralFileNode
18 from allmydata.immutable.offloaded import Helper
19 from allmydata.control import ControlServer
20 from allmydata.introducer.client import IntroducerClient
21 from allmydata.util import hashutil, base32, pollmixin, cachedir
22 from allmydata.util.abbreviate import parse_abbreviated_size
23 from allmydata.util.time_format import parse_duration, parse_date
24 from allmydata.uri import LiteralFileURI
25 from allmydata.dirnode import NewDirectoryNode
26 from allmydata.mutable.filenode import MutableFileNode
27 from allmydata.stats import StatsProvider
28 from allmydata.history import History
29 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
30      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
31
32 KiB=1024
33 MiB=1024*KiB
34 GiB=1024*MiB
35 TiB=1024*GiB
36 PiB=1024*TiB
37
38 class StubClient(Referenceable):
39     implements(RIStubClient)
40
41 def _make_secret():
42     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
43
44 class Client(node.Node, pollmixin.PollMixin):
45     implements(IStatsProducer)
46
47     PORTNUMFILE = "client.port"
48     STOREDIR = 'storage'
49     NODETYPE = "client"
50     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
51
52     # This means that if a storage server treats me as though I were a
53     # 1.0.0 storage client, it will work as they expect.
54     OLDEST_SUPPORTED_VERSION = "1.0.0"
55
56     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
57     # is the number of shares required to reconstruct a file. 'desired' means
58     # that we will abort an upload unless we can allocate space for at least
59     # this many. 'total' is the total number of shares created by encoding.
60     # If everybody has room then this is is how many we will upload.
61     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
62                                    "happy": 7,
63                                    "n": 10,
64                                    "max_segment_size": 128*KiB,
65                                    }
66
67     def __init__(self, basedir="."):
68         node.Node.__init__(self, basedir)
69         self.started_timestamp = time.time()
70         self.logSource="Client"
71         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
72         self.init_introducer_client()
73         self.init_stats_provider()
74         self.init_lease_secret()
75         self.init_storage()
76         self.init_control()
77         if self.get_config("helper", "enabled", False, boolean=True):
78             self.init_helper()
79         self.init_client()
80         self._key_generator = None
81         key_gen_furl = self.get_config("client", "key_generator.furl", None)
82         if key_gen_furl:
83             self.init_key_gen(key_gen_furl)
84         # ControlServer and Helper are attached after Tub startup
85         self.init_ftp_server()
86         self.init_sftp_server()
87
88         hotline_file = os.path.join(self.basedir,
89                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
90         if os.path.exists(hotline_file):
91             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
92             self.log("hotline file noticed (%ds old), starting timer" % age)
93             hotline = TimerService(1.0, self._check_hotline, hotline_file)
94             hotline.setServiceParent(self)
95
96         # this needs to happen last, so it can use getServiceNamed() to
97         # acquire references to StorageServer and other web-statusable things
98         webport = self.get_config("node", "web.port", None)
99         if webport:
100             self.init_web(webport) # strports string
101
102     def read_old_config_files(self):
103         node.Node.read_old_config_files(self)
104         copy = self._copy_config_from_file
105         copy("introducer.furl", "client", "introducer.furl")
106         copy("helper.furl", "client", "helper.furl")
107         copy("key_generator.furl", "client", "key_generator.furl")
108         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
109         if os.path.exists(os.path.join(self.basedir, "no_storage")):
110             self.set_config("storage", "enabled", "false")
111         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
112             self.set_config("storage", "readonly", "true")
113         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
114             self.set_config("storage", "debug_discard", "true")
115         if os.path.exists(os.path.join(self.basedir, "run_helper")):
116             self.set_config("helper", "enabled", "true")
117
118     def init_introducer_client(self):
119         self.introducer_furl = self.get_config("client", "introducer.furl")
120         ic = IntroducerClient(self.tub, self.introducer_furl,
121                               self.nickname,
122                               str(allmydata.__full_version__),
123                               str(self.OLDEST_SUPPORTED_VERSION))
124         self.introducer_client = ic
125         # hold off on starting the IntroducerClient until our tub has been
126         # started, so we'll have a useful address on our RemoteReference, so
127         # that the introducer's status page will show us.
128         d = self.when_tub_ready()
129         def _start_introducer_client(res):
130             ic.setServiceParent(self)
131             # nodes that want to upload and download will need storage servers
132             ic.subscribe_to("storage")
133         d.addCallback(_start_introducer_client)
134         d.addErrback(log.err, facility="tahoe.init",
135                      level=log.BAD, umid="URyI5w")
136
137     def init_stats_provider(self):
138         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
139         self.stats_provider = StatsProvider(self, gatherer_furl)
140         self.add_service(self.stats_provider)
141         self.stats_provider.register_producer(self)
142
143     def get_stats(self):
144         return { 'node.uptime': time.time() - self.started_timestamp }
145
146     def init_lease_secret(self):
147         secret_s = self.get_or_create_private_config("secret", _make_secret)
148         self._lease_secret = base32.a2b(secret_s)
149
150     def init_storage(self):
151         # should we run a storage server (and publish it for others to use)?
152         if not self.get_config("storage", "enabled", True, boolean=True):
153             return
154         readonly = self.get_config("storage", "readonly", False, boolean=True)
155
156         storedir = os.path.join(self.basedir, self.STOREDIR)
157
158         data = self.get_config("storage", "reserved_space", None)
159         reserved = None
160         try:
161             reserved = parse_abbreviated_size(data)
162         except ValueError:
163             log.msg("[storage]reserved_space= contains unparseable value %s"
164                     % data)
165         if reserved is None:
166             reserved = 0
167         discard = self.get_config("storage", "debug_discard", False,
168                                   boolean=True)
169
170         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
171         if expire:
172             mode = self.get_config("storage", "expire.mode") # require a mode
173         else:
174             mode = self.get_config("storage", "expire.mode", "age")
175
176         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
177         if o_l_d is not None:
178             o_l_d = parse_duration(o_l_d)
179
180         cutoff_date = None
181         if mode == "cutoff-date":
182             cutoff_date = self.get_config("storage", "expire.cutoff_date")
183             cutoff_date = parse_date(cutoff_date)
184
185         sharetypes = []
186         if self.get_config("storage", "expire.immutable", True, boolean=True):
187             sharetypes.append("immutable")
188         if self.get_config("storage", "expire.mutable", True, boolean=True):
189             sharetypes.append("mutable")
190         expiration_sharetypes = tuple(sharetypes)
191
192         ss = StorageServer(storedir, self.nodeid,
193                            reserved_space=reserved,
194                            discard_storage=discard,
195                            readonly_storage=readonly,
196                            stats_provider=self.stats_provider,
197                            expiration_enabled=expire,
198                            expiration_mode=mode,
199                            expiration_override_lease_duration=o_l_d,
200                            expiration_cutoff_date=cutoff_date,
201                            expiration_sharetypes=expiration_sharetypes)
202         self.add_service(ss)
203
204         d = self.when_tub_ready()
205         # we can't do registerReference until the Tub is ready
206         def _publish(res):
207             furl_file = os.path.join(self.basedir, "private", "storage.furl")
208             furl = self.tub.registerReference(ss, furlFile=furl_file)
209             ri_name = RIStorageServer.__remote_name__
210             self.introducer_client.publish(furl, "storage", ri_name)
211         d.addCallback(_publish)
212         d.addErrback(log.err, facility="tahoe.init",
213                      level=log.BAD, umid="aLGBKw")
214
215     def init_client(self):
216         helper_furl = self.get_config("client", "helper.furl", None)
217         DEP = self.DEFAULT_ENCODING_PARAMETERS
218         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
219         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
220         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
221         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
222         self.convergence = base32.a2b(convergence_s)
223         self._node_cache = weakref.WeakValueDictionary() # uri -> node
224
225         self.init_client_storage_broker()
226         self.add_service(History(self.stats_provider))
227         self.add_service(Uploader(helper_furl, self.stats_provider))
228         download_cachedir = os.path.join(self.basedir,
229                                          "private", "cache", "download")
230         self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
231         self.download_cache.setServiceParent(self)
232         self.add_service(Downloader(self.stats_provider))
233         self.init_stub_client()
234
235     def init_client_storage_broker(self):
236         # create a StorageFarmBroker object, for use by Uploader/Downloader
237         # (and everybody else who wants to use storage servers)
238         self.storage_broker = sb = storage_client.StorageFarmBroker()
239
240         # load static server specifications from tahoe.cfg, if any
241         #if self.config.has_section("client-server-selection"):
242         #    server_params = {} # maps serverid to dict of parameters
243         #    for (name, value) in self.config.items("client-server-selection"):
244         #        pieces = name.split(".")
245         #        if pieces[0] == "server":
246         #            serverid = pieces[1]
247         #            if serverid not in server_params:
248         #                server_params[serverid] = {}
249         #            server_params[serverid][pieces[2]] = value
250         #    for serverid, params in server_params.items():
251         #        server_type = params.pop("type")
252         #        if server_type == "tahoe-foolscap":
253         #            s = storage_client.NativeStorageClient(*params)
254         #        else:
255         #            msg = ("unrecognized server type '%s' in "
256         #                   "tahoe.cfg [client-server-selection]server.%s.type"
257         #                   % (server_type, serverid))
258         #            raise storage_client.UnknownServerTypeError(msg)
259         #        sb.add_server(s.serverid, s)
260
261         # check to see if we're supposed to use the introducer too
262         if self.get_config("client-server-selection", "use_introducer",
263                            default=True, boolean=True):
264             sb.use_introducer(self.introducer_client)
265
266     def get_storage_broker(self):
267         return self.storage_broker
268
269     def init_stub_client(self):
270         def _publish(res):
271             # we publish an empty object so that the introducer can count how
272             # many clients are connected and see what versions they're
273             # running.
274             sc = StubClient()
275             furl = self.tub.registerReference(sc)
276             ri_name = RIStubClient.__remote_name__
277             self.introducer_client.publish(furl, "stub_client", ri_name)
278         d = self.when_tub_ready()
279         d.addCallback(_publish)
280         d.addErrback(log.err, facility="tahoe.init",
281                      level=log.BAD, umid="OEHq3g")
282
283     def get_history(self):
284         return self.getServiceNamed("history")
285
286     def init_control(self):
287         d = self.when_tub_ready()
288         def _publish(res):
289             c = ControlServer()
290             c.setServiceParent(self)
291             control_url = self.tub.registerReference(c)
292             self.write_private_config("control.furl", control_url + "\n")
293         d.addCallback(_publish)
294         d.addErrback(log.err, facility="tahoe.init",
295                      level=log.BAD, umid="d3tNXA")
296
297     def init_helper(self):
298         d = self.when_tub_ready()
299         def _publish(self):
300             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
301             h.setServiceParent(self)
302             # TODO: this is confusing. BASEDIR/private/helper.furl is created
303             # by the helper. BASEDIR/helper.furl is consumed by the client
304             # who wants to use the helper. I like having the filename be the
305             # same, since that makes 'cp' work smoothly, but the difference
306             # between config inputs and generated outputs is hard to see.
307             helper_furlfile = os.path.join(self.basedir,
308                                            "private", "helper.furl")
309             self.tub.registerReference(h, furlFile=helper_furlfile)
310         d.addCallback(_publish)
311         d.addErrback(log.err, facility="tahoe.init",
312                      level=log.BAD, umid="K0mW5w")
313
314     def init_key_gen(self, key_gen_furl):
315         d = self.when_tub_ready()
316         def _subscribe(self):
317             self.tub.connectTo(key_gen_furl, self._got_key_generator)
318         d.addCallback(_subscribe)
319         d.addErrback(log.err, facility="tahoe.init",
320                      level=log.BAD, umid="z9DMzw")
321
322     def _got_key_generator(self, key_generator):
323         self._key_generator = key_generator
324         key_generator.notifyOnDisconnect(self._lost_key_generator)
325
326     def _lost_key_generator(self):
327         self._key_generator = None
328
329     def get_servers(self, service_name):
330         """ Return frozenset of (peerid, versioned-rref) """
331         assert isinstance(service_name, str)
332         return self.introducer_client.get_peers(service_name)
333
334     def init_web(self, webport):
335         self.log("init_web(webport=%s)", args=(webport,))
336
337         from allmydata.webish import WebishServer
338         nodeurl_path = os.path.join(self.basedir, "node.url")
339         staticdir = self.get_config("node", "web.static", "public_html")
340         staticdir = os.path.expanduser(staticdir)
341         ws = WebishServer(self, webport, nodeurl_path, staticdir)
342         self.add_service(ws)
343
344     def init_ftp_server(self):
345         if self.get_config("ftpd", "enabled", False, boolean=True):
346             accountfile = self.get_config("ftpd", "accounts.file", None)
347             accounturl = self.get_config("ftpd", "accounts.url", None)
348             ftp_portstr = self.get_config("ftpd", "port", "8021")
349
350             from allmydata.frontends import ftpd
351             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
352             s.setServiceParent(self)
353
354     def init_sftp_server(self):
355         if self.get_config("sftpd", "enabled", False, boolean=True):
356             accountfile = self.get_config("sftpd", "accounts.file", None)
357             accounturl = self.get_config("sftpd", "accounts.url", None)
358             sftp_portstr = self.get_config("sftpd", "port", "8022")
359             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
360             privkey_file = self.get_config("sftpd", "host_privkey_file")
361
362             from allmydata.frontends import sftpd
363             s = sftpd.SFTPServer(self, accountfile, accounturl,
364                                  sftp_portstr, pubkey_file, privkey_file)
365             s.setServiceParent(self)
366
367     def _check_hotline(self, hotline_file):
368         if os.path.exists(hotline_file):
369             mtime = os.stat(hotline_file)[stat.ST_MTIME]
370             if mtime > time.time() - 120.0:
371                 return
372             else:
373                 self.log("hotline file too old, shutting down")
374         else:
375             self.log("hotline file missing, shutting down")
376         reactor.stop()
377
378     def get_encoding_parameters(self):
379         return self.DEFAULT_ENCODING_PARAMETERS
380
381     def connected_to_introducer(self):
382         if self.introducer_client:
383             return self.introducer_client.connected_to_introducer()
384         return False
385
386     def get_renewal_secret(self):
387         return hashutil.my_renewal_secret_hash(self._lease_secret)
388
389     def get_cancel_secret(self):
390         return hashutil.my_cancel_secret_hash(self._lease_secret)
391
392     def debug_wait_for_client_connections(self, num_clients):
393         """Return a Deferred that fires (with None) when we have connections
394         to the given number of peers. Useful for tests that set up a
395         temporary test network and need to know when it is safe to proceed
396         with an upload or download."""
397         def _check():
398             current_clients = list(self.storage_broker.get_all_serverids())
399             return len(current_clients) >= num_clients
400         d = self.poll(_check, 0.5)
401         d.addCallback(lambda res: None)
402         return d
403
404
405     # these four methods are the primitives for creating filenodes and
406     # dirnodes. The first takes a URI and produces a filenode or (new-style)
407     # dirnode. The other three create brand-new filenodes/dirnodes.
408
409     def create_node_from_uri(self, u):
410         # this returns synchronously.
411         u = IURI(u)
412         u_s = u.to_string()
413         if u_s not in self._node_cache:
414             if IReadonlyNewDirectoryURI.providedBy(u):
415                 # new-style read-only dirnodes
416                 node = NewDirectoryNode(self).init_from_uri(u)
417             elif INewDirectoryURI.providedBy(u):
418                 # new-style dirnodes
419                 node = NewDirectoryNode(self).init_from_uri(u)
420             elif IFileURI.providedBy(u):
421                 if isinstance(u, LiteralFileURI):
422                     node = LiteralFileNode(u, self) # LIT
423                 else:
424                     key = base32.b2a(u.storage_index)
425                     cachefile = self.download_cache.get_file(key)
426                     node = FileNode(u, self, cachefile) # CHK
427             else:
428                 assert IMutableFileURI.providedBy(u), u
429                 node = MutableFileNode(self).init_from_uri(u)
430             self._node_cache[u_s] = node
431         return self._node_cache[u_s]
432
433     def create_empty_dirnode(self):
434         n = NewDirectoryNode(self)
435         d = n.create(self._generate_pubprivkeys)
436         d.addCallback(lambda res: n)
437         return d
438
439     def create_mutable_file(self, contents=""):
440         n = MutableFileNode(self)
441         d = n.create(contents, self._generate_pubprivkeys)
442         d.addCallback(lambda res: n)
443         return d
444
445     def _generate_pubprivkeys(self, key_size):
446         if self._key_generator:
447             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
448             def make_key_objs((verifying_key, signing_key)):
449                 v = rsa.create_verifying_key_from_string(verifying_key)
450                 s = rsa.create_signing_key_from_string(signing_key)
451                 return v, s
452             d.addCallback(make_key_objs)
453             return d
454         else:
455             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
456             # secs
457             signer = rsa.generate(key_size)
458             verifier = signer.get_verifying_key()
459             return verifier, signer
460
461     def upload(self, uploadable):
462         uploader = self.getServiceNamed("uploader")
463         return uploader.upload(uploadable, history=self.get_history())
464
465
466     def list_all_upload_statuses(self):
467         return self.get_history().list_all_upload_statuses()
468
469     def list_all_download_statuses(self):
470         return self.get_history().list_all_download_statuses()
471
472     def list_all_mapupdate_statuses(self):
473         return self.get_history().list_all_mapupdate_statuses()
474     def list_all_publish_statuses(self):
475         return self.get_history().list_all_publish_statuses()
476     def list_all_retrieve_statuses(self):
477         return self.get_history().list_all_retrieve_statuses()
478
479     def list_all_helper_statuses(self):
480         try:
481             helper = self.getServiceNamed("helper")
482         except KeyError:
483             return []
484         return helper.get_all_upload_statuses()