]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
24432787786296bbc37f0be2bead578e97199c2b
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata import node
3
4 from zope.interface import implements
5 from twisted.internet import reactor, defer
6 from twisted.application import service
7 from twisted.application.internet import TimerService
8 from pycryptopp.publickey import rsa
9
10 import allmydata
11 from allmydata.storage.server import StorageServer
12 from allmydata import storage_client
13 from allmydata.immutable.upload import Uploader
14 from allmydata.immutable.offloaded import Helper
15 from allmydata.control import ControlServer
16 from allmydata.introducer.client import IntroducerClient
17 from allmydata.util import hashutil, base32, pollmixin, log, keyutil, idlib
18 from allmydata.util.encodingutil import get_filesystem_encoding, \
19      from_utf8_or_none
20 from allmydata.util.fileutil import abspath_expanduser_unicode
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.stats import StatsProvider
24 from allmydata.history import History
25 from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION
26 from allmydata.nodemaker import NodeMaker
27 from allmydata.blacklist import Blacklist
28 from allmydata.node import OldConfigOptionError
29
30
31 KiB=1024
32 MiB=1024*KiB
33 GiB=1024*MiB
34 TiB=1024*GiB
35 PiB=1024*TiB
36
37 def _make_secret():
38     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
39
40 class SecretHolder:
41     def __init__(self, lease_secret, convergence_secret):
42         self._lease_secret = lease_secret
43         self._convergence_secret = convergence_secret
44
45     def get_renewal_secret(self):
46         return hashutil.my_renewal_secret_hash(self._lease_secret)
47
48     def get_cancel_secret(self):
49         return hashutil.my_cancel_secret_hash(self._lease_secret)
50
51     def get_convergence_secret(self):
52         return self._convergence_secret
53
54 class KeyGenerator:
55     """I create RSA keys for mutable files. Each call to generate() returns a
56     single keypair. The keysize is specified first by the keysize= argument
57     to generate(), then with a default set by set_default_keysize(), then
58     with a built-in default of 2048 bits."""
59     def __init__(self):
60         self._remote = None
61         self.default_keysize = 2048
62
63     def set_remote_generator(self, keygen):
64         self._remote = keygen
65     def set_default_keysize(self, keysize):
66         """Call this to override the size of the RSA keys created for new
67         mutable files which don't otherwise specify a size. This will affect
68         all subsequent calls to generate() without a keysize= argument. The
69         default size is 2048 bits. Test cases should call this method once
70         during setup, to cause me to create smaller keys, so the unit tests
71         run faster."""
72         self.default_keysize = keysize
73
74     def generate(self, keysize=None):
75         """I return a Deferred that fires with a (verifyingkey, signingkey)
76         pair. I accept a keysize in bits (2048 bit keys are standard, smaller
77         keys are used for testing). If you do not provide a keysize, I will
78         use my default, which is set by a call to set_default_keysize(). If
79         set_default_keysize() has never been called, I will create 2048 bit
80         keys."""
81         keysize = keysize or self.default_keysize
82         if self._remote:
83             d = self._remote.callRemote('get_rsa_key_pair', keysize)
84             def make_key_objs((verifying_key, signing_key)):
85                 v = rsa.create_verifying_key_from_string(verifying_key)
86                 s = rsa.create_signing_key_from_string(signing_key)
87                 return v, s
88             d.addCallback(make_key_objs)
89             return d
90         else:
91             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
92             # secs
93             signer = rsa.generate(keysize)
94             verifier = signer.get_verifying_key()
95             return defer.succeed( (verifier, signer) )
96
97 class Terminator(service.Service):
98     def __init__(self):
99         self._clients = weakref.WeakKeyDictionary()
100     def register(self, c):
101         self._clients[c] = None
102     def stopService(self):
103         for c in self._clients:
104             c.stop()
105         return service.Service.stopService(self)
106
107
108 class Client(node.Node, pollmixin.PollMixin):
109     implements(IStatsProducer)
110
111     PORTNUMFILE = "client.port"
112     STOREDIR = 'storage'
113     NODETYPE = "client"
114     EXIT_TRIGGER_FILE = "exit_trigger"
115
116     # This means that if a storage server treats me as though I were a
117     # 1.0.0 storage client, it will work as they expect.
118     OLDEST_SUPPORTED_VERSION = "1.0.0"
119
120     # This is a dictionary of (needed, desired, total, max_segment_size). 'needed'
121     # is the number of shares required to reconstruct a file. 'desired' means
122     # that we will abort an upload unless we can allocate space for at least
123     # this many. 'total' is the total number of shares created by encoding.
124     # If everybody has room then this is is how many we will upload.
125     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
126                                    "happy": 7,
127                                    "n": 10,
128                                    "max_segment_size": 128*KiB,
129                                    }
130
131     def __init__(self, basedir="."):
132         node.Node.__init__(self, basedir)
133         self.connected_enough_d = defer.Deferred()
134         self.started_timestamp = time.time()
135         self.logSource="Client"
136         self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
137         self.init_introducer_client()
138         self.init_stats_provider()
139         self.init_secrets()
140         self.init_node_key()
141         self.init_storage()
142         self.init_control()
143         self.helper = None
144         if self.get_config("helper", "enabled", False, boolean=True):
145             self.init_helper()
146         self._key_generator = KeyGenerator()
147         key_gen_furl = self.get_config("client", "key_generator.furl", None)
148         if key_gen_furl:
149             self.init_key_gen(key_gen_furl)
150         self.init_client()
151         # ControlServer and Helper are attached after Tub startup
152         self.init_ftp_server()
153         self.init_sftp_server()
154         self.init_drop_uploader()
155
156         # If the node sees an exit_trigger file, it will poll every second to see
157         # whether the file still exists, and what its mtime is. If the file does not
158         # exist or has not been modified for a given timeout, the node will exit.
159         exit_trigger_file = os.path.join(self.basedir,
160                                          self.EXIT_TRIGGER_FILE)
161         if os.path.exists(exit_trigger_file):
162             age = time.time() - os.stat(exit_trigger_file)[stat.ST_MTIME]
163             self.log("%s file noticed (%ds old), starting timer" % (self.EXIT_TRIGGER_FILE, age))
164             exit_trigger = TimerService(1.0, self._check_exit_trigger, exit_trigger_file)
165             exit_trigger.setServiceParent(self)
166
167         # this needs to happen last, so it can use getServiceNamed() to
168         # acquire references to StorageServer and other web-statusable things
169         webport = self.get_config("node", "web.port", None)
170         if webport:
171             self.init_web(webport) # strports string
172
173     def _sequencer(self):
174         seqnum_s = self.get_config_from_file("announcement-seqnum")
175         if not seqnum_s:
176             seqnum_s = "0"
177         seqnum = int(seqnum_s.strip())
178         seqnum += 1 # increment
179         self.write_config("announcement-seqnum", "%d\n" % seqnum)
180         nonce = _make_secret().strip()
181         return seqnum, nonce
182
183     def init_introducer_client(self):
184         self.introducer_furl = self.get_config("client", "introducer.furl")
185         ic = IntroducerClient(self.tub, self.introducer_furl,
186                               self.nickname,
187                               str(allmydata.__full_version__),
188                               str(self.OLDEST_SUPPORTED_VERSION),
189                               self.get_app_versions(),
190                               self._sequencer)
191         self.introducer_client = ic
192         # hold off on starting the IntroducerClient until our tub has been
193         # started, so we'll have a useful address on our RemoteReference, so
194         # that the introducer's status page will show us.
195         d = self.when_tub_ready()
196         def _start_introducer_client(res):
197             ic.setServiceParent(self)
198         d.addCallback(_start_introducer_client)
199         d.addErrback(log.err, facility="tahoe.init",
200                      level=log.BAD, umid="URyI5w")
201
202     def init_stats_provider(self):
203         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
204         self.stats_provider = StatsProvider(self, gatherer_furl)
205         self.add_service(self.stats_provider)
206         self.stats_provider.register_producer(self)
207
208     def get_stats(self):
209         return { 'node.uptime': time.time() - self.started_timestamp }
210
211     def init_secrets(self):
212         lease_s = self.get_or_create_private_config("secret", _make_secret)
213         lease_secret = base32.a2b(lease_s)
214         convergence_s = self.get_or_create_private_config('convergence',
215                                                           _make_secret)
216         self.convergence = base32.a2b(convergence_s)
217         self._secret_holder = SecretHolder(lease_secret, self.convergence)
218
219     def init_node_key(self):
220         # we only create the key once. On all subsequent runs, we re-use the
221         # existing key
222         def _make_key():
223             sk_vs,vk_vs = keyutil.make_keypair()
224             return sk_vs+"\n"
225         sk_vs = self.get_or_create_private_config("node.privkey", _make_key)
226         sk,vk_vs = keyutil.parse_privkey(sk_vs.strip())
227         self.write_config("node.pubkey", vk_vs+"\n")
228         self._node_key = sk
229
230     def get_long_nodeid(self):
231         # this matches what IServer.get_longname() says about us elsewhere
232         vk_bytes = self._node_key.get_verifying_key_bytes()
233         return "v0-"+base32.b2a(vk_bytes)
234
235     def get_long_tubid(self):
236         return idlib.nodeid_b2a(self.nodeid)
237
238     def _init_permutation_seed(self, ss):
239         seed = self.get_config_from_file("permutation-seed")
240         if not seed:
241             have_shares = ss.have_shares()
242             if have_shares:
243                 # if the server has shares but not a recorded
244                 # permutation-seed, then it has been around since pre-#466
245                 # days, and the clients who uploaded those shares used our
246                 # TubID as a permutation-seed. We should keep using that same
247                 # seed to keep the shares in the same place in the permuted
248                 # ring, so those clients don't have to perform excessive
249                 # searches.
250                 seed = base32.b2a(self.nodeid)
251             else:
252                 # otherwise, we're free to use the more natural seed of our
253                 # pubkey-based serverid
254                 vk_bytes = self._node_key.get_verifying_key_bytes()
255                 seed = base32.b2a(vk_bytes)
256             self.write_config("permutation-seed", seed+"\n")
257         return seed.strip()
258
259     def init_storage(self):
260         # should we run a storage server (and publish it for others to use)?
261         if not self.get_config("storage", "enabled", True, boolean=True):
262             return
263         readonly = self.get_config("storage", "readonly", False, boolean=True)
264
265         storedir = os.path.join(self.basedir, self.STOREDIR)
266
267         data = self.get_config("storage", "reserved_space", None)
268         try:
269             reserved = parse_abbreviated_size(data)
270         except ValueError:
271             log.msg("[storage]reserved_space= contains unparseable value %s"
272                     % data)
273             raise
274         if reserved is None:
275             reserved = 0
276         discard = self.get_config("storage", "debug_discard", False,
277                                   boolean=True)
278
279         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
280         if expire:
281             mode = self.get_config("storage", "expire.mode") # require a mode
282         else:
283             mode = self.get_config("storage", "expire.mode", "age")
284
285         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
286         if o_l_d is not None:
287             o_l_d = parse_duration(o_l_d)
288
289         cutoff_date = None
290         if mode == "cutoff-date":
291             cutoff_date = self.get_config("storage", "expire.cutoff_date")
292             cutoff_date = parse_date(cutoff_date)
293
294         sharetypes = []
295         if self.get_config("storage", "expire.immutable", True, boolean=True):
296             sharetypes.append("immutable")
297         if self.get_config("storage", "expire.mutable", True, boolean=True):
298             sharetypes.append("mutable")
299         expiration_sharetypes = tuple(sharetypes)
300
301         ss = StorageServer(storedir, self.nodeid,
302                            reserved_space=reserved,
303                            discard_storage=discard,
304                            readonly_storage=readonly,
305                            stats_provider=self.stats_provider,
306                            expiration_enabled=expire,
307                            expiration_mode=mode,
308                            expiration_override_lease_duration=o_l_d,
309                            expiration_cutoff_date=cutoff_date,
310                            expiration_sharetypes=expiration_sharetypes)
311         self.add_service(ss)
312
313         d = self.when_tub_ready()
314         # we can't do registerReference until the Tub is ready
315         def _publish(res):
316             furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
317             furl = self.tub.registerReference(ss, furlFile=furl_file)
318             ann = {"anonymous-storage-FURL": furl,
319                    "permutation-seed-base32": self._init_permutation_seed(ss),
320                    }
321             self.introducer_client.publish("storage", ann, self._node_key)
322         d.addCallback(_publish)
323         d.addErrback(log.err, facility="tahoe.init",
324                      level=log.BAD, umid="aLGBKw")
325
326     def init_client(self):
327         helper_furl = self.get_config("client", "helper.furl", None)
328         if helper_furl in ("None", ""):
329             helper_furl = None
330
331         DEP = self.encoding_params
332         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
333         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
334         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
335
336         self.init_client_storage_broker()
337         self.history = History(self.stats_provider)
338         self.terminator = Terminator()
339         self.terminator.setServiceParent(self)
340         self.add_service(Uploader(helper_furl, self.stats_provider,
341                                   self.history))
342         self.init_blacklist()
343         self.init_nodemaker()
344
345     def init_client_storage_broker(self):
346         # create a StorageFarmBroker object, for use by Uploader/Downloader
347         # (and everybody else who wants to use storage servers)
348         ps = self.get_config("client", "peers.preferred", "").split(",")
349         preferred_peers = tuple([p.strip() for p in ps if p != ""])
350
351         connection_threshold = min(self.encoding_params["k"],
352                                    self.encoding_params["happy"] + 1)
353
354         sb = storage_client.StorageFarmBroker(self.tub, True, connection_threshold,
355                                               self.connected_enough_d, preferred_peers=preferred_peers)
356         self.storage_broker = sb
357
358         # load static server specifications from tahoe.cfg, if any.
359         # Not quite ready yet.
360         #if self.config.has_section("client-server-selection"):
361         #    server_params = {} # maps serverid to dict of parameters
362         #    for (name, value) in self.config.items("client-server-selection"):
363         #        pieces = name.split(".")
364         #        if pieces[0] == "server":
365         #            serverid = pieces[1]
366         #            if serverid not in server_params:
367         #                server_params[serverid] = {}
368         #            server_params[serverid][pieces[2]] = value
369         #    for serverid, params in server_params.items():
370         #        server_type = params.pop("type")
371         #        if server_type == "tahoe-foolscap":
372         #            s = storage_client.NativeStorageClient(*params)
373         #        else:
374         #            msg = ("unrecognized server type '%s' in "
375         #                   "tahoe.cfg [client-server-selection]server.%s.type"
376         #                   % (server_type, serverid))
377         #            raise storage_client.UnknownServerTypeError(msg)
378         #        sb.add_server(s.serverid, s)
379
380         # check to see if we're supposed to use the introducer too
381         if self.get_config("client-server-selection", "use_introducer",
382                            default=True, boolean=True):
383             sb.use_introducer(self.introducer_client)
384
385     def get_storage_broker(self):
386         return self.storage_broker
387
388     def init_blacklist(self):
389         fn = os.path.join(self.basedir, "access.blacklist")
390         self.blacklist = Blacklist(fn)
391
392     def init_nodemaker(self):
393         default = self.get_config("client", "mutable.format", default="SDMF")
394         if default.upper() == "MDMF":
395             self.mutable_file_default = MDMF_VERSION
396         else:
397             self.mutable_file_default = SDMF_VERSION
398         self.nodemaker = NodeMaker(self.storage_broker,
399                                    self._secret_holder,
400                                    self.get_history(),
401                                    self.getServiceNamed("uploader"),
402                                    self.terminator,
403                                    self.get_encoding_parameters(),
404                                    self.mutable_file_default,
405                                    self._key_generator,
406                                    self.blacklist)
407
408     def get_history(self):
409         return self.history
410
411     def init_control(self):
412         d = self.when_tub_ready()
413         def _publish(res):
414             c = ControlServer()
415             c.setServiceParent(self)
416             control_url = self.tub.registerReference(c)
417             self.write_private_config("control.furl", control_url + "\n")
418         d.addCallback(_publish)
419         d.addErrback(log.err, facility="tahoe.init",
420                      level=log.BAD, umid="d3tNXA")
421
422     def init_helper(self):
423         d = self.when_tub_ready()
424         def _publish(self):
425             self.helper = Helper(os.path.join(self.basedir, "helper"),
426                                  self.storage_broker, self._secret_holder,
427                                  self.stats_provider, self.history)
428             # TODO: this is confusing. BASEDIR/private/helper.furl is created
429             # by the helper. BASEDIR/helper.furl is consumed by the client
430             # who wants to use the helper. I like having the filename be the
431             # same, since that makes 'cp' work smoothly, but the difference
432             # between config inputs and generated outputs is hard to see.
433             helper_furlfile = os.path.join(self.basedir,
434                                            "private", "helper.furl").encode(get_filesystem_encoding())
435             self.tub.registerReference(self.helper, furlFile=helper_furlfile)
436         d.addCallback(_publish)
437         d.addErrback(log.err, facility="tahoe.init",
438                      level=log.BAD, umid="K0mW5w")
439
440     def init_key_gen(self, key_gen_furl):
441         d = self.when_tub_ready()
442         def _subscribe(self):
443             self.tub.connectTo(key_gen_furl, self._got_key_generator)
444         d.addCallback(_subscribe)
445         d.addErrback(log.err, facility="tahoe.init",
446                      level=log.BAD, umid="z9DMzw")
447
448     def _got_key_generator(self, key_generator):
449         self._key_generator.set_remote_generator(key_generator)
450         key_generator.notifyOnDisconnect(self._lost_key_generator)
451
452     def _lost_key_generator(self):
453         self._key_generator.set_remote_generator(None)
454
455     def set_default_mutable_keysize(self, keysize):
456         self._key_generator.set_default_keysize(keysize)
457
458     def init_web(self, webport):
459         self.log("init_web(webport=%s)", args=(webport,))
460
461         from allmydata.webish import WebishServer
462         nodeurl_path = os.path.join(self.basedir, "node.url")
463         staticdir_config = self.get_config("node", "web.static", "public_html").decode("utf-8")
464         staticdir = abspath_expanduser_unicode(staticdir_config, base=self.basedir)
465         ws = WebishServer(self, webport, nodeurl_path, staticdir)
466         self.add_service(ws)
467
468     def init_ftp_server(self):
469         if self.get_config("ftpd", "enabled", False, boolean=True):
470             accountfile = from_utf8_or_none(
471                 self.get_config("ftpd", "accounts.file", None))
472             if accountfile:
473                 accountfile = abspath_expanduser_unicode(accountfile, base=self.basedir)
474             accounturl = self.get_config("ftpd", "accounts.url", None)
475             ftp_portstr = self.get_config("ftpd", "port", "8021")
476
477             from allmydata.frontends import ftpd
478             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
479             s.setServiceParent(self)
480
481     def init_sftp_server(self):
482         if self.get_config("sftpd", "enabled", False, boolean=True):
483             accountfile = from_utf8_or_none(
484                 self.get_config("sftpd", "accounts.file", None))
485             if accountfile:
486                 accountfile = abspath_expanduser_unicode(accountfile, base=self.basedir)
487             accounturl = self.get_config("sftpd", "accounts.url", None)
488             sftp_portstr = self.get_config("sftpd", "port", "8022")
489             pubkey_file = from_utf8_or_none(self.get_config("sftpd", "host_pubkey_file"))
490             privkey_file = from_utf8_or_none(self.get_config("sftpd", "host_privkey_file"))
491
492             from allmydata.frontends import sftpd
493             s = sftpd.SFTPServer(self, accountfile, accounturl,
494                                  sftp_portstr, pubkey_file, privkey_file)
495             s.setServiceParent(self)
496
497     def init_drop_uploader(self):
498         if self.get_config("drop_upload", "enabled", False, boolean=True):
499             if self.get_config("drop_upload", "upload.dircap", None):
500                 raise OldConfigOptionError("The [drop_upload]upload.dircap option is no longer supported; please "
501                                            "put the cap in a 'private/drop_upload_dircap' file, and delete this option.")
502
503             upload_dircap = self.get_or_create_private_config("drop_upload_dircap")
504             local_dir_utf8 = self.get_config("drop_upload", "local.directory")
505
506             try:
507                 from allmydata.frontends import drop_upload
508                 s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
509                 s.setServiceParent(self)
510                 s.startService()
511
512                 # start processing the upload queue when we've connected to enough servers
513                 self.connected_enough_d.addCallback(s.ready)
514             except Exception, e:
515                 self.log("couldn't start drop-uploader: %r", args=(e,))
516
517     def _check_exit_trigger(self, exit_trigger_file):
518         if os.path.exists(exit_trigger_file):
519             mtime = os.stat(exit_trigger_file)[stat.ST_MTIME]
520             if mtime > time.time() - 120.0:
521                 return
522             else:
523                 self.log("%s file too old, shutting down" % (self.EXIT_TRIGGER_FILE,))
524         else:
525             self.log("%s file missing, shutting down" % (self.EXIT_TRIGGER_FILE,))
526         reactor.stop()
527
528     def get_encoding_parameters(self):
529         return self.encoding_params
530
531     def connected_to_introducer(self):
532         if self.introducer_client:
533             return self.introducer_client.connected_to_introducer()
534         return False
535
536     def get_renewal_secret(self): # this will go away
537         return self._secret_holder.get_renewal_secret()
538
539     def get_cancel_secret(self):
540         return self._secret_holder.get_cancel_secret()
541
542     def debug_wait_for_client_connections(self, num_clients):
543         """Return a Deferred that fires (with None) when we have connections
544         to the given number of peers. Useful for tests that set up a
545         temporary test network and need to know when it is safe to proceed
546         with an upload or download."""
547         def _check():
548             return len(self.storage_broker.get_connected_servers()) >= num_clients
549         d = self.poll(_check, 0.5)
550         d.addCallback(lambda res: None)
551         return d
552
553
554     # these four methods are the primitives for creating filenodes and
555     # dirnodes. The first takes a URI and produces a filenode or (new-style)
556     # dirnode. The other three create brand-new filenodes/dirnodes.
557
558     def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
559         # This returns synchronously.
560         # Note that it does *not* validate the write_uri and read_uri; instead we
561         # may get an opaque node if there were any problems.
562         return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
563
564     def create_dirnode(self, initial_children={}, version=None):
565         d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
566         return d
567
568     def create_immutable_dirnode(self, children, convergence=None):
569         return self.nodemaker.create_immutable_directory(children, convergence)
570
571     def create_mutable_file(self, contents=None, keysize=None, version=None):
572         return self.nodemaker.create_mutable_file(contents, keysize,
573                                                   version=version)
574
575     def upload(self, uploadable):
576         uploader = self.getServiceNamed("uploader")
577         return uploader.upload(uploadable)