]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
Flesh out "tahoe magic-folder status" command
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata import node
3
4 from zope.interface import implements
5 from twisted.internet import reactor, defer
6 from twisted.application import service
7 from twisted.application.internet import TimerService
8 from pycryptopp.publickey import rsa
9
10 import allmydata
11 from allmydata.storage.server import StorageServer
12 from allmydata import storage_client
13 from allmydata.immutable.upload import Uploader
14 from allmydata.immutable.offloaded import Helper
15 from allmydata.control import ControlServer
16 from allmydata.introducer.client import IntroducerClient
17 from allmydata.util import hashutil, base32, pollmixin, log, keyutil, idlib
18 from allmydata.util.encodingutil import get_filesystem_encoding, \
19      from_utf8_or_none
20 from allmydata.util.fileutil import abspath_expanduser_unicode
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.stats import StatsProvider
24 from allmydata.history import History
25 from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION
26 from allmydata.nodemaker import NodeMaker
27 from allmydata.blacklist import Blacklist
28 from allmydata.node import OldConfigOptionError
29
30
31 KiB=1024
32 MiB=1024*KiB
33 GiB=1024*MiB
34 TiB=1024*GiB
35 PiB=1024*TiB
36
37 def _make_secret():
38     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
39
40 class SecretHolder:
41     def __init__(self, lease_secret, convergence_secret):
42         self._lease_secret = lease_secret
43         self._convergence_secret = convergence_secret
44
45     def get_renewal_secret(self):
46         return hashutil.my_renewal_secret_hash(self._lease_secret)
47
48     def get_cancel_secret(self):
49         return hashutil.my_cancel_secret_hash(self._lease_secret)
50
51     def get_convergence_secret(self):
52         return self._convergence_secret
53
54 class KeyGenerator:
55     """I create RSA keys for mutable files. Each call to generate() returns a
56     single keypair. The keysize is specified first by the keysize= argument
57     to generate(), then with a default set by set_default_keysize(), then
58     with a built-in default of 2048 bits."""
59     def __init__(self):
60         self._remote = None
61         self.default_keysize = 2048
62
63     def set_remote_generator(self, keygen):
64         self._remote = keygen
65     def set_default_keysize(self, keysize):
66         """Call this to override the size of the RSA keys created for new
67         mutable files which don't otherwise specify a size. This will affect
68         all subsequent calls to generate() without a keysize= argument. The
69         default size is 2048 bits. Test cases should call this method once
70         during setup, to cause me to create smaller keys, so the unit tests
71         run faster."""
72         self.default_keysize = keysize
73
74     def generate(self, keysize=None):
75         """I return a Deferred that fires with a (verifyingkey, signingkey)
76         pair. I accept a keysize in bits (2048 bit keys are standard, smaller
77         keys are used for testing). If you do not provide a keysize, I will
78         use my default, which is set by a call to set_default_keysize(). If
79         set_default_keysize() has never been called, I will create 2048 bit
80         keys."""
81         keysize = keysize or self.default_keysize
82         if self._remote:
83             d = self._remote.callRemote('get_rsa_key_pair', keysize)
84             def make_key_objs((verifying_key, signing_key)):
85                 v = rsa.create_verifying_key_from_string(verifying_key)
86                 s = rsa.create_signing_key_from_string(signing_key)
87                 return v, s
88             d.addCallback(make_key_objs)
89             return d
90         else:
91             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
92             # secs
93             signer = rsa.generate(keysize)
94             verifier = signer.get_verifying_key()
95             return defer.succeed( (verifier, signer) )
96
97 class Terminator(service.Service):
98     def __init__(self):
99         self._clients = weakref.WeakKeyDictionary()
100     def register(self, c):
101         self._clients[c] = None
102     def stopService(self):
103         for c in self._clients:
104             c.stop()
105         return service.Service.stopService(self)
106
107
108 class Client(node.Node, pollmixin.PollMixin):
109     implements(IStatsProducer)
110
111     PORTNUMFILE = "client.port"
112     STOREDIR = 'storage'
113     NODETYPE = "client"
114     EXIT_TRIGGER_FILE = "exit_trigger"
115
116     # This means that if a storage server treats me as though I were a
117     # 1.0.0 storage client, it will work as they expect.
118     OLDEST_SUPPORTED_VERSION = "1.0.0"
119
120     # This is a dictionary of (needed, desired, total, max_segment_size). 'needed'
121     # is the number of shares required to reconstruct a file. 'desired' means
122     # that we will abort an upload unless we can allocate space for at least
123     # this many. 'total' is the total number of shares created by encoding.
124     # If everybody has room then this is is how many we will upload.
125     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
126                                    "happy": 7,
127                                    "n": 10,
128                                    "max_segment_size": 128*KiB,
129                                    }
130
131     def __init__(self, basedir="."):
132         #print "Client.__init__(%r)" % (basedir,)
133         node.Node.__init__(self, basedir)
134         self.connected_enough_d = defer.Deferred()
135         self.started_timestamp = time.time()
136         self.logSource="Client"
137         self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
138         self.init_introducer_client()
139         self.init_stats_provider()
140         self.init_secrets()
141         self.init_node_key()
142         self.init_storage()
143         self.init_control()
144         self.helper = None
145         if self.get_config("helper", "enabled", False, boolean=True):
146             self.init_helper()
147         self._key_generator = KeyGenerator()
148         key_gen_furl = self.get_config("client", "key_generator.furl", None)
149         if key_gen_furl:
150             self.init_key_gen(key_gen_furl)
151         self.init_client()
152         # ControlServer and Helper are attached after Tub startup
153         self.init_ftp_server()
154         self.init_sftp_server()
155         self.init_magic_folder()
156
157         # If the node sees an exit_trigger file, it will poll every second to see
158         # whether the file still exists, and what its mtime is. If the file does not
159         # exist or has not been modified for a given timeout, the node will exit.
160         exit_trigger_file = os.path.join(self.basedir,
161                                          self.EXIT_TRIGGER_FILE)
162         if os.path.exists(exit_trigger_file):
163             age = time.time() - os.stat(exit_trigger_file)[stat.ST_MTIME]
164             self.log("%s file noticed (%ds old), starting timer" % (self.EXIT_TRIGGER_FILE, age))
165             exit_trigger = TimerService(1.0, self._check_exit_trigger, exit_trigger_file)
166             exit_trigger.setServiceParent(self)
167
168         # this needs to happen last, so it can use getServiceNamed() to
169         # acquire references to StorageServer and other web-statusable things
170         webport = self.get_config("node", "web.port", None)
171         if webport:
172             self.init_web(webport) # strports string
173
174     def _sequencer(self):
175         seqnum_s = self.get_config_from_file("announcement-seqnum")
176         if not seqnum_s:
177             seqnum_s = "0"
178         seqnum = int(seqnum_s.strip())
179         seqnum += 1 # increment
180         self.write_config("announcement-seqnum", "%d\n" % seqnum)
181         nonce = _make_secret().strip()
182         return seqnum, nonce
183
184     def init_introducer_client(self):
185         self.introducer_furl = self.get_config("client", "introducer.furl")
186         ic = IntroducerClient(self.tub, self.introducer_furl,
187                               self.nickname,
188                               str(allmydata.__full_version__),
189                               str(self.OLDEST_SUPPORTED_VERSION),
190                               self.get_app_versions(),
191                               self._sequencer)
192         self.introducer_client = ic
193         # hold off on starting the IntroducerClient until our tub has been
194         # started, so we'll have a useful address on our RemoteReference, so
195         # that the introducer's status page will show us.
196         d = self.when_tub_ready()
197         def _start_introducer_client(res):
198             ic.setServiceParent(self)
199         d.addCallback(_start_introducer_client)
200         d.addErrback(log.err, facility="tahoe.init",
201                      level=log.BAD, umid="URyI5w")
202
203     def init_stats_provider(self):
204         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
205         self.stats_provider = StatsProvider(self, gatherer_furl)
206         self.add_service(self.stats_provider)
207         self.stats_provider.register_producer(self)
208
209     def get_stats(self):
210         return { 'node.uptime': time.time() - self.started_timestamp }
211
212     def init_secrets(self):
213         lease_s = self.get_or_create_private_config("secret", _make_secret)
214         lease_secret = base32.a2b(lease_s)
215         convergence_s = self.get_or_create_private_config('convergence',
216                                                           _make_secret)
217         self.convergence = base32.a2b(convergence_s)
218         self._secret_holder = SecretHolder(lease_secret, self.convergence)
219
220     def init_node_key(self):
221         # we only create the key once. On all subsequent runs, we re-use the
222         # existing key
223         def _make_key():
224             sk_vs,vk_vs = keyutil.make_keypair()
225             return sk_vs+"\n"
226         sk_vs = self.get_or_create_private_config("node.privkey", _make_key)
227         sk,vk_vs = keyutil.parse_privkey(sk_vs.strip())
228         self.write_config("node.pubkey", vk_vs+"\n")
229         self._node_key = sk
230
231     def get_long_nodeid(self):
232         # this matches what IServer.get_longname() says about us elsewhere
233         vk_bytes = self._node_key.get_verifying_key_bytes()
234         return "v0-"+base32.b2a(vk_bytes)
235
236     def get_long_tubid(self):
237         return idlib.nodeid_b2a(self.nodeid)
238
239     def _init_permutation_seed(self, ss):
240         seed = self.get_config_from_file("permutation-seed")
241         if not seed:
242             have_shares = ss.have_shares()
243             if have_shares:
244                 # if the server has shares but not a recorded
245                 # permutation-seed, then it has been around since pre-#466
246                 # days, and the clients who uploaded those shares used our
247                 # TubID as a permutation-seed. We should keep using that same
248                 # seed to keep the shares in the same place in the permuted
249                 # ring, so those clients don't have to perform excessive
250                 # searches.
251                 seed = base32.b2a(self.nodeid)
252             else:
253                 # otherwise, we're free to use the more natural seed of our
254                 # pubkey-based serverid
255                 vk_bytes = self._node_key.get_verifying_key_bytes()
256                 seed = base32.b2a(vk_bytes)
257             self.write_config("permutation-seed", seed+"\n")
258         return seed.strip()
259
260     def init_storage(self):
261         # should we run a storage server (and publish it for others to use)?
262         if not self.get_config("storage", "enabled", True, boolean=True):
263             return
264         readonly = self.get_config("storage", "readonly", False, boolean=True)
265
266         storedir = os.path.join(self.basedir, self.STOREDIR)
267
268         data = self.get_config("storage", "reserved_space", None)
269         try:
270             reserved = parse_abbreviated_size(data)
271         except ValueError:
272             log.msg("[storage]reserved_space= contains unparseable value %s"
273                     % data)
274             raise
275         if reserved is None:
276             reserved = 0
277         discard = self.get_config("storage", "debug_discard", False,
278                                   boolean=True)
279
280         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
281         if expire:
282             mode = self.get_config("storage", "expire.mode") # require a mode
283         else:
284             mode = self.get_config("storage", "expire.mode", "age")
285
286         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
287         if o_l_d is not None:
288             o_l_d = parse_duration(o_l_d)
289
290         cutoff_date = None
291         if mode == "cutoff-date":
292             cutoff_date = self.get_config("storage", "expire.cutoff_date")
293             cutoff_date = parse_date(cutoff_date)
294
295         sharetypes = []
296         if self.get_config("storage", "expire.immutable", True, boolean=True):
297             sharetypes.append("immutable")
298         if self.get_config("storage", "expire.mutable", True, boolean=True):
299             sharetypes.append("mutable")
300         expiration_sharetypes = tuple(sharetypes)
301
302         ss = StorageServer(storedir, self.nodeid,
303                            reserved_space=reserved,
304                            discard_storage=discard,
305                            readonly_storage=readonly,
306                            stats_provider=self.stats_provider,
307                            expiration_enabled=expire,
308                            expiration_mode=mode,
309                            expiration_override_lease_duration=o_l_d,
310                            expiration_cutoff_date=cutoff_date,
311                            expiration_sharetypes=expiration_sharetypes)
312         self.add_service(ss)
313
314         d = self.when_tub_ready()
315         # we can't do registerReference until the Tub is ready
316         def _publish(res):
317             furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
318             furl = self.tub.registerReference(ss, furlFile=furl_file)
319             ann = {"anonymous-storage-FURL": furl,
320                    "permutation-seed-base32": self._init_permutation_seed(ss),
321                    }
322             self.introducer_client.publish("storage", ann, self._node_key)
323         d.addCallback(_publish)
324         d.addErrback(log.err, facility="tahoe.init",
325                      level=log.BAD, umid="aLGBKw")
326
327     def init_client(self):
328         helper_furl = self.get_config("client", "helper.furl", None)
329         if helper_furl in ("None", ""):
330             helper_furl = None
331
332         DEP = self.encoding_params
333         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
334         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
335         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
336
337         self.init_client_storage_broker()
338         self.history = History(self.stats_provider)
339         self.terminator = Terminator()
340         self.terminator.setServiceParent(self)
341         self.add_service(Uploader(helper_furl, self.stats_provider,
342                                   self.history))
343         self.init_blacklist()
344         self.init_nodemaker()
345
346     def init_client_storage_broker(self):
347         # create a StorageFarmBroker object, for use by Uploader/Downloader
348         # (and everybody else who wants to use storage servers)
349         ps = self.get_config("client", "peers.preferred", "").split(",")
350         preferred_peers = tuple([p.strip() for p in ps if p != ""])
351
352         connection_threshold = min(self.encoding_params["k"],
353                                    self.encoding_params["happy"] + 1)
354
355         sb = storage_client.StorageFarmBroker(self.tub, True, connection_threshold,
356                                               self.connected_enough_d, preferred_peers=preferred_peers)
357         self.storage_broker = sb
358
359         # load static server specifications from tahoe.cfg, if any.
360         # Not quite ready yet.
361         #if self.config.has_section("client-server-selection"):
362         #    server_params = {} # maps serverid to dict of parameters
363         #    for (name, value) in self.config.items("client-server-selection"):
364         #        pieces = name.split(".")
365         #        if pieces[0] == "server":
366         #            serverid = pieces[1]
367         #            if serverid not in server_params:
368         #                server_params[serverid] = {}
369         #            server_params[serverid][pieces[2]] = value
370         #    for serverid, params in server_params.items():
371         #        server_type = params.pop("type")
372         #        if server_type == "tahoe-foolscap":
373         #            s = storage_client.NativeStorageClient(*params)
374         #        else:
375         #            msg = ("unrecognized server type '%s' in "
376         #                   "tahoe.cfg [client-server-selection]server.%s.type"
377         #                   % (server_type, serverid))
378         #            raise storage_client.UnknownServerTypeError(msg)
379         #        sb.add_server(s.serverid, s)
380
381         # check to see if we're supposed to use the introducer too
382         if self.get_config("client-server-selection", "use_introducer",
383                            default=True, boolean=True):
384             sb.use_introducer(self.introducer_client)
385
386     def get_storage_broker(self):
387         return self.storage_broker
388
389     def init_blacklist(self):
390         fn = os.path.join(self.basedir, "access.blacklist")
391         self.blacklist = Blacklist(fn)
392
393     def init_nodemaker(self):
394         default = self.get_config("client", "mutable.format", default="SDMF")
395         if default.upper() == "MDMF":
396             self.mutable_file_default = MDMF_VERSION
397         else:
398             self.mutable_file_default = SDMF_VERSION
399         self.nodemaker = NodeMaker(self.storage_broker,
400                                    self._secret_holder,
401                                    self.get_history(),
402                                    self.getServiceNamed("uploader"),
403                                    self.terminator,
404                                    self.get_encoding_parameters(),
405                                    self.mutable_file_default,
406                                    self._key_generator,
407                                    self.blacklist)
408
409     def get_history(self):
410         return self.history
411
412     def init_control(self):
413         d = self.when_tub_ready()
414         def _publish(res):
415             c = ControlServer()
416             c.setServiceParent(self)
417             control_url = self.tub.registerReference(c)
418             self.write_private_config("control.furl", control_url + "\n")
419         d.addCallback(_publish)
420         d.addErrback(log.err, facility="tahoe.init",
421                      level=log.BAD, umid="d3tNXA")
422
423     def init_helper(self):
424         d = self.when_tub_ready()
425         def _publish(self):
426             self.helper = Helper(os.path.join(self.basedir, "helper"),
427                                  self.storage_broker, self._secret_holder,
428                                  self.stats_provider, self.history)
429             # TODO: this is confusing. BASEDIR/private/helper.furl is created
430             # by the helper. BASEDIR/helper.furl is consumed by the client
431             # who wants to use the helper. I like having the filename be the
432             # same, since that makes 'cp' work smoothly, but the difference
433             # between config inputs and generated outputs is hard to see.
434             helper_furlfile = os.path.join(self.basedir,
435                                            "private", "helper.furl").encode(get_filesystem_encoding())
436             self.tub.registerReference(self.helper, furlFile=helper_furlfile)
437         d.addCallback(_publish)
438         d.addErrback(log.err, facility="tahoe.init",
439                      level=log.BAD, umid="K0mW5w")
440
441     def init_key_gen(self, key_gen_furl):
442         d = self.when_tub_ready()
443         def _subscribe(self):
444             self.tub.connectTo(key_gen_furl, self._got_key_generator)
445         d.addCallback(_subscribe)
446         d.addErrback(log.err, facility="tahoe.init",
447                      level=log.BAD, umid="z9DMzw")
448
449     def _got_key_generator(self, key_generator):
450         self._key_generator.set_remote_generator(key_generator)
451         key_generator.notifyOnDisconnect(self._lost_key_generator)
452
453     def _lost_key_generator(self):
454         self._key_generator.set_remote_generator(None)
455
456     def set_default_mutable_keysize(self, keysize):
457         self._key_generator.set_default_keysize(keysize)
458
459     def init_web(self, webport):
460         self.log("init_web(webport=%s)", args=(webport,))
461
462         from allmydata.webish import WebishServer
463         nodeurl_path = os.path.join(self.basedir, "node.url")
464         staticdir_config = self.get_config("node", "web.static", "public_html").decode("utf-8")
465         staticdir = abspath_expanduser_unicode(staticdir_config, base=self.basedir)
466         ws = WebishServer(self, webport, nodeurl_path, staticdir)
467         self.add_service(ws)
468
469     def init_ftp_server(self):
470         if self.get_config("ftpd", "enabled", False, boolean=True):
471             accountfile = from_utf8_or_none(
472                 self.get_config("ftpd", "accounts.file", None))
473             if accountfile:
474                 accountfile = abspath_expanduser_unicode(accountfile, base=self.basedir)
475             accounturl = self.get_config("ftpd", "accounts.url", None)
476             ftp_portstr = self.get_config("ftpd", "port", "8021")
477
478             from allmydata.frontends import ftpd
479             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
480             s.setServiceParent(self)
481
482     def init_sftp_server(self):
483         if self.get_config("sftpd", "enabled", False, boolean=True):
484             accountfile = from_utf8_or_none(
485                 self.get_config("sftpd", "accounts.file", None))
486             if accountfile:
487                 accountfile = abspath_expanduser_unicode(accountfile, base=self.basedir)
488             accounturl = self.get_config("sftpd", "accounts.url", None)
489             sftp_portstr = self.get_config("sftpd", "port", "8022")
490             pubkey_file = from_utf8_or_none(self.get_config("sftpd", "host_pubkey_file"))
491             privkey_file = from_utf8_or_none(self.get_config("sftpd", "host_privkey_file"))
492
493             from allmydata.frontends import sftpd
494             s = sftpd.SFTPServer(self, accountfile, accounturl,
495                                  sftp_portstr, pubkey_file, privkey_file)
496             s.setServiceParent(self)
497
498     def init_magic_folder(self):
499         #print "init_magic_folder"
500         if self.get_config("drop_upload", "enabled", False, boolean=True):
501             raise OldConfigOptionError("The [drop_upload] section must be renamed to [magic_folder].\n"
502                                        "See docs/frontends/magic-folder.rst for more information.")
503
504         if self.get_config("magic_folder", "enabled", False, boolean=True):
505             #print "magic folder enabled"
506             upload_dircap = self.get_private_config("magic_folder_dircap")
507             collective_dircap = self.get_private_config("collective_dircap")
508
509             local_dir_config = self.get_config("magic_folder", "local.directory").decode("utf-8")
510             local_dir = abspath_expanduser_unicode(local_dir_config, base=self.basedir)
511
512             dbfile = os.path.join(self.basedir, "private", "magicfolderdb.sqlite")
513             dbfile = abspath_expanduser_unicode(dbfile)
514
515             from allmydata.frontends import magic_folder
516             umask = self.get_config("magic_folder", "download.umask", 0077)
517             s = magic_folder.MagicFolder(self, upload_dircap, collective_dircap, local_dir, dbfile, umask)
518             self._magic_folder = s
519             s.setServiceParent(self)
520             s.startService()
521
522             # start processing the upload queue when we've connected to enough servers
523             self.connected_enough_d.addCallback(lambda ign: s.ready())
524
525     def _check_exit_trigger(self, exit_trigger_file):
526         if os.path.exists(exit_trigger_file):
527             mtime = os.stat(exit_trigger_file)[stat.ST_MTIME]
528             if mtime > time.time() - 120.0:
529                 return
530             else:
531                 self.log("%s file too old, shutting down" % (self.EXIT_TRIGGER_FILE,))
532         else:
533             self.log("%s file missing, shutting down" % (self.EXIT_TRIGGER_FILE,))
534         reactor.stop()
535
536     def get_encoding_parameters(self):
537         return self.encoding_params
538
539     def connected_to_introducer(self):
540         if self.introducer_client:
541             return self.introducer_client.connected_to_introducer()
542         return False
543
544     def get_renewal_secret(self): # this will go away
545         return self._secret_holder.get_renewal_secret()
546
547     def get_cancel_secret(self):
548         return self._secret_holder.get_cancel_secret()
549
550     def debug_wait_for_client_connections(self, num_clients):
551         """Return a Deferred that fires (with None) when we have connections
552         to the given number of peers. Useful for tests that set up a
553         temporary test network and need to know when it is safe to proceed
554         with an upload or download."""
555         def _check():
556             return len(self.storage_broker.get_connected_servers()) >= num_clients
557         d = self.poll(_check, 0.5)
558         d.addCallback(lambda res: None)
559         return d
560
561
562     # these four methods are the primitives for creating filenodes and
563     # dirnodes. The first takes a URI and produces a filenode or (new-style)
564     # dirnode. The other three create brand-new filenodes/dirnodes.
565
566     def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
567         # This returns synchronously.
568         # Note that it does *not* validate the write_uri and read_uri; instead we
569         # may get an opaque node if there were any problems.
570         return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
571
572     def create_dirnode(self, initial_children={}, version=None):
573         d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
574         return d
575
576     def create_immutable_dirnode(self, children, convergence=None):
577         return self.nodemaker.create_immutable_directory(children, convergence)
578
579     def create_mutable_file(self, contents=None, keysize=None, version=None):
580         return self.nodemaker.create_mutable_file(contents, keysize,
581                                                   version=version)
582
583     def upload(self, uploadable):
584         uploader = self.getServiceNamed("uploader")
585         return uploader.upload(uploadable)