]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
client.py: remove the old "server.privkey" fallback
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata import node
3
4 from zope.interface import implements
5 from twisted.internet import reactor, defer
6 from twisted.application import service
7 from twisted.application.internet import TimerService
8 from pycryptopp.publickey import rsa
9
10 import allmydata
11 from allmydata.storage.server import StorageServer
12 from allmydata import storage_client
13 from allmydata.immutable.upload import Uploader
14 from allmydata.immutable.offloaded import Helper
15 from allmydata.control import ControlServer
16 from allmydata.introducer.client import IntroducerClient
17 from allmydata.util import hashutil, base32, pollmixin, log, keyutil
18 from allmydata.util.encodingutil import get_filesystem_encoding
19 from allmydata.util.abbreviate import parse_abbreviated_size
20 from allmydata.util.time_format import parse_duration, parse_date
21 from allmydata.stats import StatsProvider
22 from allmydata.history import History
23 from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION
24 from allmydata.nodemaker import NodeMaker
25 from allmydata.blacklist import Blacklist
26 from allmydata.node import OldConfigOptionError
27
28
29 KiB=1024
30 MiB=1024*KiB
31 GiB=1024*MiB
32 TiB=1024*GiB
33 PiB=1024*TiB
34
35 def _make_secret():
36     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
37
38 class SecretHolder:
39     def __init__(self, lease_secret, convergence_secret):
40         self._lease_secret = lease_secret
41         self._convergence_secret = convergence_secret
42
43     def get_renewal_secret(self):
44         return hashutil.my_renewal_secret_hash(self._lease_secret)
45
46     def get_cancel_secret(self):
47         return hashutil.my_cancel_secret_hash(self._lease_secret)
48
49     def get_convergence_secret(self):
50         return self._convergence_secret
51
52 class KeyGenerator:
53     """I create RSA keys for mutable files. Each call to generate() returns a
54     single keypair. The keysize is specified first by the keysize= argument
55     to generate(), then with a default set by set_default_keysize(), then
56     with a built-in default of 2048 bits."""
57     def __init__(self):
58         self._remote = None
59         self.default_keysize = 2048
60
61     def set_remote_generator(self, keygen):
62         self._remote = keygen
63     def set_default_keysize(self, keysize):
64         """Call this to override the size of the RSA keys created for new
65         mutable files which don't otherwise specify a size. This will affect
66         all subsequent calls to generate() without a keysize= argument. The
67         default size is 2048 bits. Test cases should call this method once
68         during setup, to cause me to create smaller keys, so the unit tests
69         run faster."""
70         self.default_keysize = keysize
71
72     def generate(self, keysize=None):
73         """I return a Deferred that fires with a (verifyingkey, signingkey)
74         pair. I accept a keysize in bits (2048 bit keys are standard, smaller
75         keys are used for testing). If you do not provide a keysize, I will
76         use my default, which is set by a call to set_default_keysize(). If
77         set_default_keysize() has never been called, I will create 2048 bit
78         keys."""
79         keysize = keysize or self.default_keysize
80         if self._remote:
81             d = self._remote.callRemote('get_rsa_key_pair', keysize)
82             def make_key_objs((verifying_key, signing_key)):
83                 v = rsa.create_verifying_key_from_string(verifying_key)
84                 s = rsa.create_signing_key_from_string(signing_key)
85                 return v, s
86             d.addCallback(make_key_objs)
87             return d
88         else:
89             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
90             # secs
91             signer = rsa.generate(keysize)
92             verifier = signer.get_verifying_key()
93             return defer.succeed( (verifier, signer) )
94
95 class Terminator(service.Service):
96     def __init__(self):
97         self._clients = weakref.WeakKeyDictionary()
98     def register(self, c):
99         self._clients[c] = None
100     def stopService(self):
101         for c in self._clients:
102             c.stop()
103         return service.Service.stopService(self)
104
105
106 class Client(node.Node, pollmixin.PollMixin):
107     implements(IStatsProducer)
108
109     PORTNUMFILE = "client.port"
110     STOREDIR = 'storage'
111     NODETYPE = "client"
112     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
113
114     # This means that if a storage server treats me as though I were a
115     # 1.0.0 storage client, it will work as they expect.
116     OLDEST_SUPPORTED_VERSION = "1.0.0"
117
118     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
119     # is the number of shares required to reconstruct a file. 'desired' means
120     # that we will abort an upload unless we can allocate space for at least
121     # this many. 'total' is the total number of shares created by encoding.
122     # If everybody has room then this is is how many we will upload.
123     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
124                                    "happy": 7,
125                                    "n": 10,
126                                    "max_segment_size": 128*KiB,
127                                    }
128
129     def __init__(self, basedir="."):
130         node.Node.__init__(self, basedir)
131         self.started_timestamp = time.time()
132         self.logSource="Client"
133         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
134         self.init_introducer_client()
135         self.init_stats_provider()
136         self.init_secrets()
137         self.init_storage()
138         self.init_control()
139         self.helper = None
140         if self.get_config("helper", "enabled", False, boolean=True):
141             self.init_helper()
142         self._key_generator = KeyGenerator()
143         key_gen_furl = self.get_config("client", "key_generator.furl", None)
144         if key_gen_furl:
145             self.init_key_gen(key_gen_furl)
146         self.init_client()
147         # ControlServer and Helper are attached after Tub startup
148         self.init_ftp_server()
149         self.init_sftp_server()
150         self.init_drop_uploader()
151
152         hotline_file = os.path.join(self.basedir,
153                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
154         if os.path.exists(hotline_file):
155             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
156             self.log("hotline file noticed (%ds old), starting timer" % age)
157             hotline = TimerService(1.0, self._check_hotline, hotline_file)
158             hotline.setServiceParent(self)
159
160         # this needs to happen last, so it can use getServiceNamed() to
161         # acquire references to StorageServer and other web-statusable things
162         webport = self.get_config("node", "web.port", None)
163         if webport:
164             self.init_web(webport) # strports string
165
166     def _sequencer(self):
167         seqnum_s = self.get_config_from_file("announcement-seqnum")
168         if not seqnum_s:
169             seqnum_s = "0"
170         seqnum = int(seqnum_s.strip())
171         seqnum += 1 # increment
172         self.write_config("announcement-seqnum", "%d\n" % seqnum)
173         nonce = _make_secret().strip()
174         return seqnum, nonce
175
176     def init_introducer_client(self):
177         self.introducer_furl = self.get_config("client", "introducer.furl")
178         ic = IntroducerClient(self.tub, self.introducer_furl,
179                               self.nickname,
180                               str(allmydata.__full_version__),
181                               str(self.OLDEST_SUPPORTED_VERSION),
182                               self.get_app_versions(),
183                               self._sequencer)
184         self.introducer_client = ic
185         # hold off on starting the IntroducerClient until our tub has been
186         # started, so we'll have a useful address on our RemoteReference, so
187         # that the introducer's status page will show us.
188         d = self.when_tub_ready()
189         def _start_introducer_client(res):
190             ic.setServiceParent(self)
191         d.addCallback(_start_introducer_client)
192         d.addErrback(log.err, facility="tahoe.init",
193                      level=log.BAD, umid="URyI5w")
194
195     def init_stats_provider(self):
196         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
197         self.stats_provider = StatsProvider(self, gatherer_furl)
198         self.add_service(self.stats_provider)
199         self.stats_provider.register_producer(self)
200
201     def get_stats(self):
202         return { 'node.uptime': time.time() - self.started_timestamp }
203
204     def init_secrets(self):
205         lease_s = self.get_or_create_private_config("secret", _make_secret)
206         lease_secret = base32.a2b(lease_s)
207         convergence_s = self.get_or_create_private_config('convergence',
208                                                           _make_secret)
209         self.convergence = base32.a2b(convergence_s)
210         self._secret_holder = SecretHolder(lease_secret, self.convergence)
211
212     def _maybe_create_node_key(self):
213         # we only create the key once. On all subsequent runs, we re-use the
214         # existing key
215         def _make_key():
216             sk_vs,vk_vs = keyutil.make_keypair()
217             return sk_vs+"\n"
218         sk_vs = self.get_or_create_private_config("node.privkey", _make_key)
219         sk,vk_vs = keyutil.parse_privkey(sk_vs.strip())
220         self.write_config("node.pubkey", vk_vs+"\n")
221         self._node_key = sk
222
223     def _init_permutation_seed(self, ss):
224         seed = self.get_config_from_file("permutation-seed")
225         if not seed:
226             have_shares = ss.have_shares()
227             if have_shares:
228                 # if the server has shares but not a recorded
229                 # permutation-seed, then it has been around since pre-#466
230                 # days, and the clients who uploaded those shares used our
231                 # TubID as a permutation-seed. We should keep using that same
232                 # seed to keep the shares in the same place in the permuted
233                 # ring, so those clients don't have to perform excessive
234                 # searches.
235                 seed = base32.b2a(self.nodeid)
236             else:
237                 # otherwise, we're free to use the more natural seed of our
238                 # pubkey-based serverid
239                 vk_bytes = self._node_key.get_verifying_key_bytes()
240                 seed = base32.b2a(vk_bytes)
241             self.write_config("permutation-seed", seed+"\n")
242         return seed.strip()
243
244     def init_storage(self):
245         # should we run a storage server (and publish it for others to use)?
246         if not self.get_config("storage", "enabled", True, boolean=True):
247             return
248         readonly = self.get_config("storage", "readonly", False, boolean=True)
249
250         self._maybe_create_node_key()
251
252         storedir = os.path.join(self.basedir, self.STOREDIR)
253
254         data = self.get_config("storage", "reserved_space", None)
255         try:
256             reserved = parse_abbreviated_size(data)
257         except ValueError:
258             log.msg("[storage]reserved_space= contains unparseable value %s"
259                     % data)
260             raise
261         if reserved is None:
262             reserved = 0
263         discard = self.get_config("storage", "debug_discard", False,
264                                   boolean=True)
265
266         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
267         if expire:
268             mode = self.get_config("storage", "expire.mode") # require a mode
269         else:
270             mode = self.get_config("storage", "expire.mode", "age")
271
272         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
273         if o_l_d is not None:
274             o_l_d = parse_duration(o_l_d)
275
276         cutoff_date = None
277         if mode == "cutoff-date":
278             cutoff_date = self.get_config("storage", "expire.cutoff_date")
279             cutoff_date = parse_date(cutoff_date)
280
281         sharetypes = []
282         if self.get_config("storage", "expire.immutable", True, boolean=True):
283             sharetypes.append("immutable")
284         if self.get_config("storage", "expire.mutable", True, boolean=True):
285             sharetypes.append("mutable")
286         expiration_sharetypes = tuple(sharetypes)
287
288         ss = StorageServer(storedir, self.nodeid,
289                            reserved_space=reserved,
290                            discard_storage=discard,
291                            readonly_storage=readonly,
292                            stats_provider=self.stats_provider,
293                            expiration_enabled=expire,
294                            expiration_mode=mode,
295                            expiration_override_lease_duration=o_l_d,
296                            expiration_cutoff_date=cutoff_date,
297                            expiration_sharetypes=expiration_sharetypes)
298         self.add_service(ss)
299
300         d = self.when_tub_ready()
301         # we can't do registerReference until the Tub is ready
302         def _publish(res):
303             furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
304             furl = self.tub.registerReference(ss, furlFile=furl_file)
305             ann = {"anonymous-storage-FURL": furl,
306                    "permutation-seed-base32": self._init_permutation_seed(ss),
307                    }
308             self.introducer_client.publish("storage", ann, self._node_key)
309         d.addCallback(_publish)
310         d.addErrback(log.err, facility="tahoe.init",
311                      level=log.BAD, umid="aLGBKw")
312
313     def init_client(self):
314         helper_furl = self.get_config("client", "helper.furl", None)
315         if helper_furl in ("None", ""):
316             helper_furl = None
317
318         DEP = self.DEFAULT_ENCODING_PARAMETERS
319         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
320         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
321         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
322
323         self.init_client_storage_broker()
324         self.history = History(self.stats_provider)
325         self.terminator = Terminator()
326         self.terminator.setServiceParent(self)
327         self.add_service(Uploader(helper_furl, self.stats_provider,
328                                   self.history))
329         self.init_blacklist()
330         self.init_nodemaker()
331
332     def init_client_storage_broker(self):
333         # create a StorageFarmBroker object, for use by Uploader/Downloader
334         # (and everybody else who wants to use storage servers)
335         sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
336         self.storage_broker = sb
337
338         # load static server specifications from tahoe.cfg, if any.
339         # Not quite ready yet.
340         #if self.config.has_section("client-server-selection"):
341         #    server_params = {} # maps serverid to dict of parameters
342         #    for (name, value) in self.config.items("client-server-selection"):
343         #        pieces = name.split(".")
344         #        if pieces[0] == "server":
345         #            serverid = pieces[1]
346         #            if serverid not in server_params:
347         #                server_params[serverid] = {}
348         #            server_params[serverid][pieces[2]] = value
349         #    for serverid, params in server_params.items():
350         #        server_type = params.pop("type")
351         #        if server_type == "tahoe-foolscap":
352         #            s = storage_client.NativeStorageClient(*params)
353         #        else:
354         #            msg = ("unrecognized server type '%s' in "
355         #                   "tahoe.cfg [client-server-selection]server.%s.type"
356         #                   % (server_type, serverid))
357         #            raise storage_client.UnknownServerTypeError(msg)
358         #        sb.add_server(s.serverid, s)
359
360         # check to see if we're supposed to use the introducer too
361         if self.get_config("client-server-selection", "use_introducer",
362                            default=True, boolean=True):
363             sb.use_introducer(self.introducer_client)
364
365     def get_storage_broker(self):
366         return self.storage_broker
367
368     def init_blacklist(self):
369         fn = os.path.join(self.basedir, "access.blacklist")
370         self.blacklist = Blacklist(fn)
371
372     def init_nodemaker(self):
373         default = self.get_config("client", "mutable.format", default="SDMF")
374         if default.upper() == "MDMF":
375             self.mutable_file_default = MDMF_VERSION
376         else:
377             self.mutable_file_default = SDMF_VERSION
378         self.nodemaker = NodeMaker(self.storage_broker,
379                                    self._secret_holder,
380                                    self.get_history(),
381                                    self.getServiceNamed("uploader"),
382                                    self.terminator,
383                                    self.get_encoding_parameters(),
384                                    self.mutable_file_default,
385                                    self._key_generator,
386                                    self.blacklist)
387
388     def get_history(self):
389         return self.history
390
391     def init_control(self):
392         d = self.when_tub_ready()
393         def _publish(res):
394             c = ControlServer()
395             c.setServiceParent(self)
396             control_url = self.tub.registerReference(c)
397             self.write_private_config("control.furl", control_url + "\n")
398         d.addCallback(_publish)
399         d.addErrback(log.err, facility="tahoe.init",
400                      level=log.BAD, umid="d3tNXA")
401
402     def init_helper(self):
403         d = self.when_tub_ready()
404         def _publish(self):
405             self.helper = Helper(os.path.join(self.basedir, "helper"),
406                                  self.storage_broker, self._secret_holder,
407                                  self.stats_provider, self.history)
408             # TODO: this is confusing. BASEDIR/private/helper.furl is created
409             # by the helper. BASEDIR/helper.furl is consumed by the client
410             # who wants to use the helper. I like having the filename be the
411             # same, since that makes 'cp' work smoothly, but the difference
412             # between config inputs and generated outputs is hard to see.
413             helper_furlfile = os.path.join(self.basedir,
414                                            "private", "helper.furl").encode(get_filesystem_encoding())
415             self.tub.registerReference(self.helper, furlFile=helper_furlfile)
416         d.addCallback(_publish)
417         d.addErrback(log.err, facility="tahoe.init",
418                      level=log.BAD, umid="K0mW5w")
419
420     def init_key_gen(self, key_gen_furl):
421         d = self.when_tub_ready()
422         def _subscribe(self):
423             self.tub.connectTo(key_gen_furl, self._got_key_generator)
424         d.addCallback(_subscribe)
425         d.addErrback(log.err, facility="tahoe.init",
426                      level=log.BAD, umid="z9DMzw")
427
428     def _got_key_generator(self, key_generator):
429         self._key_generator.set_remote_generator(key_generator)
430         key_generator.notifyOnDisconnect(self._lost_key_generator)
431
432     def _lost_key_generator(self):
433         self._key_generator.set_remote_generator(None)
434
435     def set_default_mutable_keysize(self, keysize):
436         self._key_generator.set_default_keysize(keysize)
437
438     def init_web(self, webport):
439         self.log("init_web(webport=%s)", args=(webport,))
440
441         from allmydata.webish import WebishServer
442         nodeurl_path = os.path.join(self.basedir, "node.url")
443         staticdir = self.get_config("node", "web.static", "public_html")
444         staticdir = os.path.expanduser(staticdir)
445         ws = WebishServer(self, webport, nodeurl_path, staticdir)
446         self.add_service(ws)
447
448     def init_ftp_server(self):
449         if self.get_config("ftpd", "enabled", False, boolean=True):
450             accountfile = self.get_config("ftpd", "accounts.file", None)
451             accounturl = self.get_config("ftpd", "accounts.url", None)
452             ftp_portstr = self.get_config("ftpd", "port", "8021")
453
454             from allmydata.frontends import ftpd
455             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
456             s.setServiceParent(self)
457
458     def init_sftp_server(self):
459         if self.get_config("sftpd", "enabled", False, boolean=True):
460             accountfile = self.get_config("sftpd", "accounts.file", None)
461             accounturl = self.get_config("sftpd", "accounts.url", None)
462             sftp_portstr = self.get_config("sftpd", "port", "8022")
463             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
464             privkey_file = self.get_config("sftpd", "host_privkey_file")
465
466             from allmydata.frontends import sftpd
467             s = sftpd.SFTPServer(self, accountfile, accounturl,
468                                  sftp_portstr, pubkey_file, privkey_file)
469             s.setServiceParent(self)
470
471     def init_drop_uploader(self):
472         if self.get_config("drop_upload", "enabled", False, boolean=True):
473             if self.get_config("drop_upload", "upload.dircap", None):
474                 raise OldConfigOptionError("The [drop_upload]upload.dircap option is no longer supported; please "
475                                            "put the cap in a 'private/drop_upload_dircap' file, and delete this option.")
476
477             upload_dircap = self.get_or_create_private_config("drop_upload_dircap")
478             local_dir_utf8 = self.get_config("drop_upload", "local.directory")
479
480             try:
481                 from allmydata.frontends import drop_upload
482                 s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
483                 s.setServiceParent(self)
484                 s.startService()
485             except Exception, e:
486                 self.log("couldn't start drop-uploader: %r", args=(e,))
487
488     def _check_hotline(self, hotline_file):
489         if os.path.exists(hotline_file):
490             mtime = os.stat(hotline_file)[stat.ST_MTIME]
491             if mtime > time.time() - 120.0:
492                 return
493             else:
494                 self.log("hotline file too old, shutting down")
495         else:
496             self.log("hotline file missing, shutting down")
497         reactor.stop()
498
499     def get_encoding_parameters(self):
500         return self.DEFAULT_ENCODING_PARAMETERS
501
502     def connected_to_introducer(self):
503         if self.introducer_client:
504             return self.introducer_client.connected_to_introducer()
505         return False
506
507     def get_renewal_secret(self): # this will go away
508         return self._secret_holder.get_renewal_secret()
509
510     def get_cancel_secret(self):
511         return self._secret_holder.get_cancel_secret()
512
513     def debug_wait_for_client_connections(self, num_clients):
514         """Return a Deferred that fires (with None) when we have connections
515         to the given number of peers. Useful for tests that set up a
516         temporary test network and need to know when it is safe to proceed
517         with an upload or download."""
518         def _check():
519             return len(self.storage_broker.get_connected_servers()) >= num_clients
520         d = self.poll(_check, 0.5)
521         d.addCallback(lambda res: None)
522         return d
523
524
525     # these four methods are the primitives for creating filenodes and
526     # dirnodes. The first takes a URI and produces a filenode or (new-style)
527     # dirnode. The other three create brand-new filenodes/dirnodes.
528
529     def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
530         # This returns synchronously.
531         # Note that it does *not* validate the write_uri and read_uri; instead we
532         # may get an opaque node if there were any problems.
533         return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
534
535     def create_dirnode(self, initial_children={}, version=None):
536         d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
537         return d
538
539     def create_immutable_dirnode(self, children, convergence=None):
540         return self.nodemaker.create_immutable_directory(children, convergence)
541
542     def create_mutable_file(self, contents=None, keysize=None, version=None):
543         return self.nodemaker.create_mutable_file(contents, keysize,
544                                                   version=version)
545
546     def upload(self, uploadable):
547         uploader = self.getServiceNamed("uploader")
548         return uploader.upload(uploadable)