]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
client.py: create node key even when storage is disabled. Closes #1945.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata import node
3
4 from zope.interface import implements
5 from twisted.internet import reactor, defer
6 from twisted.application import service
7 from twisted.application.internet import TimerService
8 from pycryptopp.publickey import rsa
9
10 import allmydata
11 from allmydata.storage.server import StorageServer
12 from allmydata import storage_client
13 from allmydata.immutable.upload import Uploader
14 from allmydata.immutable.offloaded import Helper
15 from allmydata.control import ControlServer
16 from allmydata.introducer.client import IntroducerClient
17 from allmydata.util import hashutil, base32, pollmixin, log, keyutil, idlib
18 from allmydata.util.encodingutil import get_filesystem_encoding
19 from allmydata.util.abbreviate import parse_abbreviated_size
20 from allmydata.util.time_format import parse_duration, parse_date
21 from allmydata.stats import StatsProvider
22 from allmydata.history import History
23 from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION
24 from allmydata.nodemaker import NodeMaker
25 from allmydata.blacklist import Blacklist
26 from allmydata.node import OldConfigOptionError
27
28
29 KiB=1024
30 MiB=1024*KiB
31 GiB=1024*MiB
32 TiB=1024*GiB
33 PiB=1024*TiB
34
35 def _make_secret():
36     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
37
38 class SecretHolder:
39     def __init__(self, lease_secret, convergence_secret):
40         self._lease_secret = lease_secret
41         self._convergence_secret = convergence_secret
42
43     def get_renewal_secret(self):
44         return hashutil.my_renewal_secret_hash(self._lease_secret)
45
46     def get_cancel_secret(self):
47         return hashutil.my_cancel_secret_hash(self._lease_secret)
48
49     def get_convergence_secret(self):
50         return self._convergence_secret
51
52 class KeyGenerator:
53     """I create RSA keys for mutable files. Each call to generate() returns a
54     single keypair. The keysize is specified first by the keysize= argument
55     to generate(), then with a default set by set_default_keysize(), then
56     with a built-in default of 2048 bits."""
57     def __init__(self):
58         self._remote = None
59         self.default_keysize = 2048
60
61     def set_remote_generator(self, keygen):
62         self._remote = keygen
63     def set_default_keysize(self, keysize):
64         """Call this to override the size of the RSA keys created for new
65         mutable files which don't otherwise specify a size. This will affect
66         all subsequent calls to generate() without a keysize= argument. The
67         default size is 2048 bits. Test cases should call this method once
68         during setup, to cause me to create smaller keys, so the unit tests
69         run faster."""
70         self.default_keysize = keysize
71
72     def generate(self, keysize=None):
73         """I return a Deferred that fires with a (verifyingkey, signingkey)
74         pair. I accept a keysize in bits (2048 bit keys are standard, smaller
75         keys are used for testing). If you do not provide a keysize, I will
76         use my default, which is set by a call to set_default_keysize(). If
77         set_default_keysize() has never been called, I will create 2048 bit
78         keys."""
79         keysize = keysize or self.default_keysize
80         if self._remote:
81             d = self._remote.callRemote('get_rsa_key_pair', keysize)
82             def make_key_objs((verifying_key, signing_key)):
83                 v = rsa.create_verifying_key_from_string(verifying_key)
84                 s = rsa.create_signing_key_from_string(signing_key)
85                 return v, s
86             d.addCallback(make_key_objs)
87             return d
88         else:
89             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
90             # secs
91             signer = rsa.generate(keysize)
92             verifier = signer.get_verifying_key()
93             return defer.succeed( (verifier, signer) )
94
95 class Terminator(service.Service):
96     def __init__(self):
97         self._clients = weakref.WeakKeyDictionary()
98     def register(self, c):
99         self._clients[c] = None
100     def stopService(self):
101         for c in self._clients:
102             c.stop()
103         return service.Service.stopService(self)
104
105
106 class Client(node.Node, pollmixin.PollMixin):
107     implements(IStatsProducer)
108
109     PORTNUMFILE = "client.port"
110     STOREDIR = 'storage'
111     NODETYPE = "client"
112     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
113
114     # This means that if a storage server treats me as though I were a
115     # 1.0.0 storage client, it will work as they expect.
116     OLDEST_SUPPORTED_VERSION = "1.0.0"
117
118     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
119     # is the number of shares required to reconstruct a file. 'desired' means
120     # that we will abort an upload unless we can allocate space for at least
121     # this many. 'total' is the total number of shares created by encoding.
122     # If everybody has room then this is is how many we will upload.
123     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
124                                    "happy": 7,
125                                    "n": 10,
126                                    "max_segment_size": 128*KiB,
127                                    }
128
129     def __init__(self, basedir="."):
130         node.Node.__init__(self, basedir)
131         self.started_timestamp = time.time()
132         self.logSource="Client"
133         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
134         self.init_introducer_client()
135         self.init_stats_provider()
136         self.init_secrets()
137         self.init_node_key()
138         self.init_storage()
139         self.init_control()
140         self.helper = None
141         if self.get_config("helper", "enabled", False, boolean=True):
142             self.init_helper()
143         self._key_generator = KeyGenerator()
144         key_gen_furl = self.get_config("client", "key_generator.furl", None)
145         if key_gen_furl:
146             self.init_key_gen(key_gen_furl)
147         self.init_client()
148         # ControlServer and Helper are attached after Tub startup
149         self.init_ftp_server()
150         self.init_sftp_server()
151         self.init_drop_uploader()
152
153         hotline_file = os.path.join(self.basedir,
154                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
155         if os.path.exists(hotline_file):
156             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
157             self.log("hotline file noticed (%ds old), starting timer" % age)
158             hotline = TimerService(1.0, self._check_hotline, hotline_file)
159             hotline.setServiceParent(self)
160
161         # this needs to happen last, so it can use getServiceNamed() to
162         # acquire references to StorageServer and other web-statusable things
163         webport = self.get_config("node", "web.port", None)
164         if webport:
165             self.init_web(webport) # strports string
166
167     def _sequencer(self):
168         seqnum_s = self.get_config_from_file("announcement-seqnum")
169         if not seqnum_s:
170             seqnum_s = "0"
171         seqnum = int(seqnum_s.strip())
172         seqnum += 1 # increment
173         self.write_config("announcement-seqnum", "%d\n" % seqnum)
174         nonce = _make_secret().strip()
175         return seqnum, nonce
176
177     def init_introducer_client(self):
178         self.introducer_furl = self.get_config("client", "introducer.furl")
179         ic = IntroducerClient(self.tub, self.introducer_furl,
180                               self.nickname,
181                               str(allmydata.__full_version__),
182                               str(self.OLDEST_SUPPORTED_VERSION),
183                               self.get_app_versions(),
184                               self._sequencer)
185         self.introducer_client = ic
186         # hold off on starting the IntroducerClient until our tub has been
187         # started, so we'll have a useful address on our RemoteReference, so
188         # that the introducer's status page will show us.
189         d = self.when_tub_ready()
190         def _start_introducer_client(res):
191             ic.setServiceParent(self)
192         d.addCallback(_start_introducer_client)
193         d.addErrback(log.err, facility="tahoe.init",
194                      level=log.BAD, umid="URyI5w")
195
196     def init_stats_provider(self):
197         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
198         self.stats_provider = StatsProvider(self, gatherer_furl)
199         self.add_service(self.stats_provider)
200         self.stats_provider.register_producer(self)
201
202     def get_stats(self):
203         return { 'node.uptime': time.time() - self.started_timestamp }
204
205     def init_secrets(self):
206         lease_s = self.get_or_create_private_config("secret", _make_secret)
207         lease_secret = base32.a2b(lease_s)
208         convergence_s = self.get_or_create_private_config('convergence',
209                                                           _make_secret)
210         self.convergence = base32.a2b(convergence_s)
211         self._secret_holder = SecretHolder(lease_secret, self.convergence)
212
213     def init_node_key(self):
214         # we only create the key once. On all subsequent runs, we re-use the
215         # existing key
216         def _make_key():
217             sk_vs,vk_vs = keyutil.make_keypair()
218             return sk_vs+"\n"
219         sk_vs = self.get_or_create_private_config("node.privkey", _make_key)
220         sk,vk_vs = keyutil.parse_privkey(sk_vs.strip())
221         self.write_config("node.pubkey", vk_vs+"\n")
222         self._node_key = sk
223
224     def get_long_nodeid(self):
225         # this matches what IServer.get_longname() says about us elsewhere
226         vk_bytes = self._node_key.get_verifying_key_bytes()
227         return "v0-"+base32.b2a(vk_bytes)
228
229     def get_long_tubid(self):
230         return idlib.nodeid_b2a(self.nodeid)
231
232     def _init_permutation_seed(self, ss):
233         seed = self.get_config_from_file("permutation-seed")
234         if not seed:
235             have_shares = ss.have_shares()
236             if have_shares:
237                 # if the server has shares but not a recorded
238                 # permutation-seed, then it has been around since pre-#466
239                 # days, and the clients who uploaded those shares used our
240                 # TubID as a permutation-seed. We should keep using that same
241                 # seed to keep the shares in the same place in the permuted
242                 # ring, so those clients don't have to perform excessive
243                 # searches.
244                 seed = base32.b2a(self.nodeid)
245             else:
246                 # otherwise, we're free to use the more natural seed of our
247                 # pubkey-based serverid
248                 vk_bytes = self._node_key.get_verifying_key_bytes()
249                 seed = base32.b2a(vk_bytes)
250             self.write_config("permutation-seed", seed+"\n")
251         return seed.strip()
252
253     def init_storage(self):
254         # should we run a storage server (and publish it for others to use)?
255         if not self.get_config("storage", "enabled", True, boolean=True):
256             return
257         readonly = self.get_config("storage", "readonly", False, boolean=True)
258
259         storedir = os.path.join(self.basedir, self.STOREDIR)
260
261         data = self.get_config("storage", "reserved_space", None)
262         try:
263             reserved = parse_abbreviated_size(data)
264         except ValueError:
265             log.msg("[storage]reserved_space= contains unparseable value %s"
266                     % data)
267             raise
268         if reserved is None:
269             reserved = 0
270         discard = self.get_config("storage", "debug_discard", False,
271                                   boolean=True)
272
273         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
274         if expire:
275             mode = self.get_config("storage", "expire.mode") # require a mode
276         else:
277             mode = self.get_config("storage", "expire.mode", "age")
278
279         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
280         if o_l_d is not None:
281             o_l_d = parse_duration(o_l_d)
282
283         cutoff_date = None
284         if mode == "cutoff-date":
285             cutoff_date = self.get_config("storage", "expire.cutoff_date")
286             cutoff_date = parse_date(cutoff_date)
287
288         sharetypes = []
289         if self.get_config("storage", "expire.immutable", True, boolean=True):
290             sharetypes.append("immutable")
291         if self.get_config("storage", "expire.mutable", True, boolean=True):
292             sharetypes.append("mutable")
293         expiration_sharetypes = tuple(sharetypes)
294
295         ss = StorageServer(storedir, self.nodeid,
296                            reserved_space=reserved,
297                            discard_storage=discard,
298                            readonly_storage=readonly,
299                            stats_provider=self.stats_provider,
300                            expiration_enabled=expire,
301                            expiration_mode=mode,
302                            expiration_override_lease_duration=o_l_d,
303                            expiration_cutoff_date=cutoff_date,
304                            expiration_sharetypes=expiration_sharetypes)
305         self.add_service(ss)
306
307         d = self.when_tub_ready()
308         # we can't do registerReference until the Tub is ready
309         def _publish(res):
310             furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
311             furl = self.tub.registerReference(ss, furlFile=furl_file)
312             ann = {"anonymous-storage-FURL": furl,
313                    "permutation-seed-base32": self._init_permutation_seed(ss),
314                    }
315             self.introducer_client.publish("storage", ann, self._node_key)
316         d.addCallback(_publish)
317         d.addErrback(log.err, facility="tahoe.init",
318                      level=log.BAD, umid="aLGBKw")
319
320     def init_client(self):
321         helper_furl = self.get_config("client", "helper.furl", None)
322         if helper_furl in ("None", ""):
323             helper_furl = None
324
325         DEP = self.DEFAULT_ENCODING_PARAMETERS
326         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
327         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
328         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
329
330         self.init_client_storage_broker()
331         self.history = History(self.stats_provider)
332         self.terminator = Terminator()
333         self.terminator.setServiceParent(self)
334         self.add_service(Uploader(helper_furl, self.stats_provider,
335                                   self.history))
336         self.init_blacklist()
337         self.init_nodemaker()
338
339     def init_client_storage_broker(self):
340         # create a StorageFarmBroker object, for use by Uploader/Downloader
341         # (and everybody else who wants to use storage servers)
342         sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
343         self.storage_broker = sb
344
345         # load static server specifications from tahoe.cfg, if any.
346         # Not quite ready yet.
347         #if self.config.has_section("client-server-selection"):
348         #    server_params = {} # maps serverid to dict of parameters
349         #    for (name, value) in self.config.items("client-server-selection"):
350         #        pieces = name.split(".")
351         #        if pieces[0] == "server":
352         #            serverid = pieces[1]
353         #            if serverid not in server_params:
354         #                server_params[serverid] = {}
355         #            server_params[serverid][pieces[2]] = value
356         #    for serverid, params in server_params.items():
357         #        server_type = params.pop("type")
358         #        if server_type == "tahoe-foolscap":
359         #            s = storage_client.NativeStorageClient(*params)
360         #        else:
361         #            msg = ("unrecognized server type '%s' in "
362         #                   "tahoe.cfg [client-server-selection]server.%s.type"
363         #                   % (server_type, serverid))
364         #            raise storage_client.UnknownServerTypeError(msg)
365         #        sb.add_server(s.serverid, s)
366
367         # check to see if we're supposed to use the introducer too
368         if self.get_config("client-server-selection", "use_introducer",
369                            default=True, boolean=True):
370             sb.use_introducer(self.introducer_client)
371
372     def get_storage_broker(self):
373         return self.storage_broker
374
375     def init_blacklist(self):
376         fn = os.path.join(self.basedir, "access.blacklist")
377         self.blacklist = Blacklist(fn)
378
379     def init_nodemaker(self):
380         default = self.get_config("client", "mutable.format", default="SDMF")
381         if default.upper() == "MDMF":
382             self.mutable_file_default = MDMF_VERSION
383         else:
384             self.mutable_file_default = SDMF_VERSION
385         self.nodemaker = NodeMaker(self.storage_broker,
386                                    self._secret_holder,
387                                    self.get_history(),
388                                    self.getServiceNamed("uploader"),
389                                    self.terminator,
390                                    self.get_encoding_parameters(),
391                                    self.mutable_file_default,
392                                    self._key_generator,
393                                    self.blacklist)
394
395     def get_history(self):
396         return self.history
397
398     def init_control(self):
399         d = self.when_tub_ready()
400         def _publish(res):
401             c = ControlServer()
402             c.setServiceParent(self)
403             control_url = self.tub.registerReference(c)
404             self.write_private_config("control.furl", control_url + "\n")
405         d.addCallback(_publish)
406         d.addErrback(log.err, facility="tahoe.init",
407                      level=log.BAD, umid="d3tNXA")
408
409     def init_helper(self):
410         d = self.when_tub_ready()
411         def _publish(self):
412             self.helper = Helper(os.path.join(self.basedir, "helper"),
413                                  self.storage_broker, self._secret_holder,
414                                  self.stats_provider, self.history)
415             # TODO: this is confusing. BASEDIR/private/helper.furl is created
416             # by the helper. BASEDIR/helper.furl is consumed by the client
417             # who wants to use the helper. I like having the filename be the
418             # same, since that makes 'cp' work smoothly, but the difference
419             # between config inputs and generated outputs is hard to see.
420             helper_furlfile = os.path.join(self.basedir,
421                                            "private", "helper.furl").encode(get_filesystem_encoding())
422             self.tub.registerReference(self.helper, furlFile=helper_furlfile)
423         d.addCallback(_publish)
424         d.addErrback(log.err, facility="tahoe.init",
425                      level=log.BAD, umid="K0mW5w")
426
427     def init_key_gen(self, key_gen_furl):
428         d = self.when_tub_ready()
429         def _subscribe(self):
430             self.tub.connectTo(key_gen_furl, self._got_key_generator)
431         d.addCallback(_subscribe)
432         d.addErrback(log.err, facility="tahoe.init",
433                      level=log.BAD, umid="z9DMzw")
434
435     def _got_key_generator(self, key_generator):
436         self._key_generator.set_remote_generator(key_generator)
437         key_generator.notifyOnDisconnect(self._lost_key_generator)
438
439     def _lost_key_generator(self):
440         self._key_generator.set_remote_generator(None)
441
442     def set_default_mutable_keysize(self, keysize):
443         self._key_generator.set_default_keysize(keysize)
444
445     def init_web(self, webport):
446         self.log("init_web(webport=%s)", args=(webport,))
447
448         from allmydata.webish import WebishServer
449         nodeurl_path = os.path.join(self.basedir, "node.url")
450         staticdir = self.get_config("node", "web.static", "public_html")
451         staticdir = os.path.expanduser(staticdir)
452         ws = WebishServer(self, webport, nodeurl_path, staticdir)
453         self.add_service(ws)
454
455     def init_ftp_server(self):
456         if self.get_config("ftpd", "enabled", False, boolean=True):
457             accountfile = self.get_config("ftpd", "accounts.file", None)
458             accounturl = self.get_config("ftpd", "accounts.url", None)
459             ftp_portstr = self.get_config("ftpd", "port", "8021")
460
461             from allmydata.frontends import ftpd
462             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
463             s.setServiceParent(self)
464
465     def init_sftp_server(self):
466         if self.get_config("sftpd", "enabled", False, boolean=True):
467             accountfile = self.get_config("sftpd", "accounts.file", None)
468             accounturl = self.get_config("sftpd", "accounts.url", None)
469             sftp_portstr = self.get_config("sftpd", "port", "8022")
470             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
471             privkey_file = self.get_config("sftpd", "host_privkey_file")
472
473             from allmydata.frontends import sftpd
474             s = sftpd.SFTPServer(self, accountfile, accounturl,
475                                  sftp_portstr, pubkey_file, privkey_file)
476             s.setServiceParent(self)
477
478     def init_drop_uploader(self):
479         if self.get_config("drop_upload", "enabled", False, boolean=True):
480             if self.get_config("drop_upload", "upload.dircap", None):
481                 raise OldConfigOptionError("The [drop_upload]upload.dircap option is no longer supported; please "
482                                            "put the cap in a 'private/drop_upload_dircap' file, and delete this option.")
483
484             upload_dircap = self.get_or_create_private_config("drop_upload_dircap")
485             local_dir_utf8 = self.get_config("drop_upload", "local.directory")
486
487             try:
488                 from allmydata.frontends import drop_upload
489                 s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
490                 s.setServiceParent(self)
491                 s.startService()
492             except Exception, e:
493                 self.log("couldn't start drop-uploader: %r", args=(e,))
494
495     def _check_hotline(self, hotline_file):
496         if os.path.exists(hotline_file):
497             mtime = os.stat(hotline_file)[stat.ST_MTIME]
498             if mtime > time.time() - 120.0:
499                 return
500             else:
501                 self.log("hotline file too old, shutting down")
502         else:
503             self.log("hotline file missing, shutting down")
504         reactor.stop()
505
506     def get_encoding_parameters(self):
507         return self.DEFAULT_ENCODING_PARAMETERS
508
509     def connected_to_introducer(self):
510         if self.introducer_client:
511             return self.introducer_client.connected_to_introducer()
512         return False
513
514     def get_renewal_secret(self): # this will go away
515         return self._secret_holder.get_renewal_secret()
516
517     def get_cancel_secret(self):
518         return self._secret_holder.get_cancel_secret()
519
520     def debug_wait_for_client_connections(self, num_clients):
521         """Return a Deferred that fires (with None) when we have connections
522         to the given number of peers. Useful for tests that set up a
523         temporary test network and need to know when it is safe to proceed
524         with an upload or download."""
525         def _check():
526             return len(self.storage_broker.get_connected_servers()) >= num_clients
527         d = self.poll(_check, 0.5)
528         d.addCallback(lambda res: None)
529         return d
530
531
532     # these four methods are the primitives for creating filenodes and
533     # dirnodes. The first takes a URI and produces a filenode or (new-style)
534     # dirnode. The other three create brand-new filenodes/dirnodes.
535
536     def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
537         # This returns synchronously.
538         # Note that it does *not* validate the write_uri and read_uri; instead we
539         # may get an opaque node if there were any problems.
540         return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
541
542     def create_dirnode(self, initial_children={}, version=None):
543         d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
544         return d
545
546     def create_immutable_dirnode(self, children, convergence=None):
547         return self.nodemaker.create_immutable_directory(children, convergence)
548
549     def create_mutable_file(self, contents=None, keysize=None, version=None):
550         return self.nodemaker.create_mutable_file(contents, keysize,
551                                                   version=version)
552
553     def upload(self, uploadable):
554         uploader = self.getServiceNamed("uploader")
555         return uploader.upload(uploadable)