]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
Merge pull request #103 from zooko/2280-recommend-tahoe-backup_3
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata import node
3
4 from zope.interface import implements
5 from twisted.internet import reactor, defer
6 from twisted.application import service
7 from twisted.application.internet import TimerService
8 from pycryptopp.publickey import rsa
9
10 import allmydata
11 from allmydata.storage.server import StorageServer
12 from allmydata import storage_client
13 from allmydata.immutable.upload import Uploader
14 from allmydata.immutable.offloaded import Helper
15 from allmydata.control import ControlServer
16 from allmydata.introducer.client import IntroducerClient
17 from allmydata.util import hashutil, base32, pollmixin, log, keyutil, idlib
18 from allmydata.util.encodingutil import get_filesystem_encoding
19 from allmydata.util.abbreviate import parse_abbreviated_size
20 from allmydata.util.time_format import parse_duration, parse_date
21 from allmydata.stats import StatsProvider
22 from allmydata.history import History
23 from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION
24 from allmydata.nodemaker import NodeMaker
25 from allmydata.blacklist import Blacklist
26 from allmydata.node import OldConfigOptionError
27
28
29 KiB=1024
30 MiB=1024*KiB
31 GiB=1024*MiB
32 TiB=1024*GiB
33 PiB=1024*TiB
34
35 def _make_secret():
36     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
37
38 class SecretHolder:
39     def __init__(self, lease_secret, convergence_secret):
40         self._lease_secret = lease_secret
41         self._convergence_secret = convergence_secret
42
43     def get_renewal_secret(self):
44         return hashutil.my_renewal_secret_hash(self._lease_secret)
45
46     def get_cancel_secret(self):
47         return hashutil.my_cancel_secret_hash(self._lease_secret)
48
49     def get_convergence_secret(self):
50         return self._convergence_secret
51
52 class KeyGenerator:
53     """I create RSA keys for mutable files. Each call to generate() returns a
54     single keypair. The keysize is specified first by the keysize= argument
55     to generate(), then with a default set by set_default_keysize(), then
56     with a built-in default of 2048 bits."""
57     def __init__(self):
58         self._remote = None
59         self.default_keysize = 2048
60
61     def set_remote_generator(self, keygen):
62         self._remote = keygen
63     def set_default_keysize(self, keysize):
64         """Call this to override the size of the RSA keys created for new
65         mutable files which don't otherwise specify a size. This will affect
66         all subsequent calls to generate() without a keysize= argument. The
67         default size is 2048 bits. Test cases should call this method once
68         during setup, to cause me to create smaller keys, so the unit tests
69         run faster."""
70         self.default_keysize = keysize
71
72     def generate(self, keysize=None):
73         """I return a Deferred that fires with a (verifyingkey, signingkey)
74         pair. I accept a keysize in bits (2048 bit keys are standard, smaller
75         keys are used for testing). If you do not provide a keysize, I will
76         use my default, which is set by a call to set_default_keysize(). If
77         set_default_keysize() has never been called, I will create 2048 bit
78         keys."""
79         keysize = keysize or self.default_keysize
80         if self._remote:
81             d = self._remote.callRemote('get_rsa_key_pair', keysize)
82             def make_key_objs((verifying_key, signing_key)):
83                 v = rsa.create_verifying_key_from_string(verifying_key)
84                 s = rsa.create_signing_key_from_string(signing_key)
85                 return v, s
86             d.addCallback(make_key_objs)
87             return d
88         else:
89             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
90             # secs
91             signer = rsa.generate(keysize)
92             verifier = signer.get_verifying_key()
93             return defer.succeed( (verifier, signer) )
94
95 class Terminator(service.Service):
96     def __init__(self):
97         self._clients = weakref.WeakKeyDictionary()
98     def register(self, c):
99         self._clients[c] = None
100     def stopService(self):
101         for c in self._clients:
102             c.stop()
103         return service.Service.stopService(self)
104
105
106 class Client(node.Node, pollmixin.PollMixin):
107     implements(IStatsProducer)
108
109     PORTNUMFILE = "client.port"
110     STOREDIR = 'storage'
111     NODETYPE = "client"
112     EXIT_TRIGGER_FILE = "exit_trigger"
113
114     # This means that if a storage server treats me as though I were a
115     # 1.0.0 storage client, it will work as they expect.
116     OLDEST_SUPPORTED_VERSION = "1.0.0"
117
118     # This is a dictionary of (needed, desired, total, max_segment_size). 'needed'
119     # is the number of shares required to reconstruct a file. 'desired' means
120     # that we will abort an upload unless we can allocate space for at least
121     # this many. 'total' is the total number of shares created by encoding.
122     # If everybody has room then this is is how many we will upload.
123     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
124                                    "happy": 7,
125                                    "n": 10,
126                                    "max_segment_size": 128*KiB,
127                                    }
128
129     def __init__(self, basedir="."):
130         node.Node.__init__(self, basedir)
131         self.started_timestamp = time.time()
132         self.logSource="Client"
133         self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
134         self.init_introducer_client()
135         self.init_stats_provider()
136         self.init_secrets()
137         self.init_node_key()
138         self.init_storage()
139         self.init_control()
140         self.helper = None
141         if self.get_config("helper", "enabled", False, boolean=True):
142             self.init_helper()
143         self._key_generator = KeyGenerator()
144         key_gen_furl = self.get_config("client", "key_generator.furl", None)
145         if key_gen_furl:
146             self.init_key_gen(key_gen_furl)
147         self.init_client()
148         # ControlServer and Helper are attached after Tub startup
149         self.init_ftp_server()
150         self.init_sftp_server()
151         self.init_drop_uploader()
152
153         # If the node sees an exit_trigger file, it will poll every second to see
154         # whether the file still exists, and what its mtime is. If the file does not
155         # exist or has not been modified for a given timeout, the node will exit.
156         exit_trigger_file = os.path.join(self.basedir,
157                                          self.EXIT_TRIGGER_FILE)
158         if os.path.exists(exit_trigger_file):
159             age = time.time() - os.stat(exit_trigger_file)[stat.ST_MTIME]
160             self.log("%s file noticed (%ds old), starting timer" % (self.EXIT_TRIGGER_FILE, age))
161             exit_trigger = TimerService(1.0, self._check_exit_trigger, exit_trigger_file)
162             exit_trigger.setServiceParent(self)
163
164         # this needs to happen last, so it can use getServiceNamed() to
165         # acquire references to StorageServer and other web-statusable things
166         webport = self.get_config("node", "web.port", None)
167         if webport:
168             self.init_web(webport) # strports string
169
170     def _sequencer(self):
171         seqnum_s = self.get_config_from_file("announcement-seqnum")
172         if not seqnum_s:
173             seqnum_s = "0"
174         seqnum = int(seqnum_s.strip())
175         seqnum += 1 # increment
176         self.write_config("announcement-seqnum", "%d\n" % seqnum)
177         nonce = _make_secret().strip()
178         return seqnum, nonce
179
180     def init_introducer_client(self):
181         self.introducer_furl = self.get_config("client", "introducer.furl")
182         ic = IntroducerClient(self.tub, self.introducer_furl,
183                               self.nickname,
184                               str(allmydata.__full_version__),
185                               str(self.OLDEST_SUPPORTED_VERSION),
186                               self.get_app_versions(),
187                               self._sequencer)
188         self.introducer_client = ic
189         # hold off on starting the IntroducerClient until our tub has been
190         # started, so we'll have a useful address on our RemoteReference, so
191         # that the introducer's status page will show us.
192         d = self.when_tub_ready()
193         def _start_introducer_client(res):
194             ic.setServiceParent(self)
195         d.addCallback(_start_introducer_client)
196         d.addErrback(log.err, facility="tahoe.init",
197                      level=log.BAD, umid="URyI5w")
198
199     def init_stats_provider(self):
200         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
201         self.stats_provider = StatsProvider(self, gatherer_furl)
202         self.add_service(self.stats_provider)
203         self.stats_provider.register_producer(self)
204
205     def get_stats(self):
206         return { 'node.uptime': time.time() - self.started_timestamp }
207
208     def init_secrets(self):
209         lease_s = self.get_or_create_private_config("secret", _make_secret)
210         lease_secret = base32.a2b(lease_s)
211         convergence_s = self.get_or_create_private_config('convergence',
212                                                           _make_secret)
213         self.convergence = base32.a2b(convergence_s)
214         self._secret_holder = SecretHolder(lease_secret, self.convergence)
215
216     def init_node_key(self):
217         # we only create the key once. On all subsequent runs, we re-use the
218         # existing key
219         def _make_key():
220             sk_vs,vk_vs = keyutil.make_keypair()
221             return sk_vs+"\n"
222         sk_vs = self.get_or_create_private_config("node.privkey", _make_key)
223         sk,vk_vs = keyutil.parse_privkey(sk_vs.strip())
224         self.write_config("node.pubkey", vk_vs+"\n")
225         self._node_key = sk
226
227     def get_long_nodeid(self):
228         # this matches what IServer.get_longname() says about us elsewhere
229         vk_bytes = self._node_key.get_verifying_key_bytes()
230         return "v0-"+base32.b2a(vk_bytes)
231
232     def get_long_tubid(self):
233         return idlib.nodeid_b2a(self.nodeid)
234
235     def _init_permutation_seed(self, ss):
236         seed = self.get_config_from_file("permutation-seed")
237         if not seed:
238             have_shares = ss.have_shares()
239             if have_shares:
240                 # if the server has shares but not a recorded
241                 # permutation-seed, then it has been around since pre-#466
242                 # days, and the clients who uploaded those shares used our
243                 # TubID as a permutation-seed. We should keep using that same
244                 # seed to keep the shares in the same place in the permuted
245                 # ring, so those clients don't have to perform excessive
246                 # searches.
247                 seed = base32.b2a(self.nodeid)
248             else:
249                 # otherwise, we're free to use the more natural seed of our
250                 # pubkey-based serverid
251                 vk_bytes = self._node_key.get_verifying_key_bytes()
252                 seed = base32.b2a(vk_bytes)
253             self.write_config("permutation-seed", seed+"\n")
254         return seed.strip()
255
256     def init_storage(self):
257         # should we run a storage server (and publish it for others to use)?
258         if not self.get_config("storage", "enabled", True, boolean=True):
259             return
260         readonly = self.get_config("storage", "readonly", False, boolean=True)
261
262         storedir = os.path.join(self.basedir, self.STOREDIR)
263
264         data = self.get_config("storage", "reserved_space", None)
265         try:
266             reserved = parse_abbreviated_size(data)
267         except ValueError:
268             log.msg("[storage]reserved_space= contains unparseable value %s"
269                     % data)
270             raise
271         if reserved is None:
272             reserved = 0
273         discard = self.get_config("storage", "debug_discard", False,
274                                   boolean=True)
275
276         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
277         if expire:
278             mode = self.get_config("storage", "expire.mode") # require a mode
279         else:
280             mode = self.get_config("storage", "expire.mode", "age")
281
282         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
283         if o_l_d is not None:
284             o_l_d = parse_duration(o_l_d)
285
286         cutoff_date = None
287         if mode == "cutoff-date":
288             cutoff_date = self.get_config("storage", "expire.cutoff_date")
289             cutoff_date = parse_date(cutoff_date)
290
291         sharetypes = []
292         if self.get_config("storage", "expire.immutable", True, boolean=True):
293             sharetypes.append("immutable")
294         if self.get_config("storage", "expire.mutable", True, boolean=True):
295             sharetypes.append("mutable")
296         expiration_sharetypes = tuple(sharetypes)
297
298         ss = StorageServer(storedir, self.nodeid,
299                            reserved_space=reserved,
300                            discard_storage=discard,
301                            readonly_storage=readonly,
302                            stats_provider=self.stats_provider,
303                            expiration_enabled=expire,
304                            expiration_mode=mode,
305                            expiration_override_lease_duration=o_l_d,
306                            expiration_cutoff_date=cutoff_date,
307                            expiration_sharetypes=expiration_sharetypes)
308         self.add_service(ss)
309
310         d = self.when_tub_ready()
311         # we can't do registerReference until the Tub is ready
312         def _publish(res):
313             furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
314             furl = self.tub.registerReference(ss, furlFile=furl_file)
315             ann = {"anonymous-storage-FURL": furl,
316                    "permutation-seed-base32": self._init_permutation_seed(ss),
317                    }
318             self.introducer_client.publish("storage", ann, self._node_key)
319         d.addCallback(_publish)
320         d.addErrback(log.err, facility="tahoe.init",
321                      level=log.BAD, umid="aLGBKw")
322
323     def init_client(self):
324         helper_furl = self.get_config("client", "helper.furl", None)
325         if helper_furl in ("None", ""):
326             helper_furl = None
327
328         DEP = self.encoding_params
329         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
330         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
331         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
332
333         self.init_client_storage_broker()
334         self.history = History(self.stats_provider)
335         self.terminator = Terminator()
336         self.terminator.setServiceParent(self)
337         self.add_service(Uploader(helper_furl, self.stats_provider,
338                                   self.history))
339         self.init_blacklist()
340         self.init_nodemaker()
341
342     def init_client_storage_broker(self):
343         # create a StorageFarmBroker object, for use by Uploader/Downloader
344         # (and everybody else who wants to use storage servers)
345         sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
346         self.storage_broker = sb
347
348         # load static server specifications from tahoe.cfg, if any.
349         # Not quite ready yet.
350         #if self.config.has_section("client-server-selection"):
351         #    server_params = {} # maps serverid to dict of parameters
352         #    for (name, value) in self.config.items("client-server-selection"):
353         #        pieces = name.split(".")
354         #        if pieces[0] == "server":
355         #            serverid = pieces[1]
356         #            if serverid not in server_params:
357         #                server_params[serverid] = {}
358         #            server_params[serverid][pieces[2]] = value
359         #    for serverid, params in server_params.items():
360         #        server_type = params.pop("type")
361         #        if server_type == "tahoe-foolscap":
362         #            s = storage_client.NativeStorageClient(*params)
363         #        else:
364         #            msg = ("unrecognized server type '%s' in "
365         #                   "tahoe.cfg [client-server-selection]server.%s.type"
366         #                   % (server_type, serverid))
367         #            raise storage_client.UnknownServerTypeError(msg)
368         #        sb.add_server(s.serverid, s)
369
370         # check to see if we're supposed to use the introducer too
371         if self.get_config("client-server-selection", "use_introducer",
372                            default=True, boolean=True):
373             sb.use_introducer(self.introducer_client)
374
375     def get_storage_broker(self):
376         return self.storage_broker
377
378     def init_blacklist(self):
379         fn = os.path.join(self.basedir, "access.blacklist")
380         self.blacklist = Blacklist(fn)
381
382     def init_nodemaker(self):
383         default = self.get_config("client", "mutable.format", default="SDMF")
384         if default.upper() == "MDMF":
385             self.mutable_file_default = MDMF_VERSION
386         else:
387             self.mutable_file_default = SDMF_VERSION
388         self.nodemaker = NodeMaker(self.storage_broker,
389                                    self._secret_holder,
390                                    self.get_history(),
391                                    self.getServiceNamed("uploader"),
392                                    self.terminator,
393                                    self.get_encoding_parameters(),
394                                    self.mutable_file_default,
395                                    self._key_generator,
396                                    self.blacklist)
397
398     def get_history(self):
399         return self.history
400
401     def init_control(self):
402         d = self.when_tub_ready()
403         def _publish(res):
404             c = ControlServer()
405             c.setServiceParent(self)
406             control_url = self.tub.registerReference(c)
407             self.write_private_config("control.furl", control_url + "\n")
408         d.addCallback(_publish)
409         d.addErrback(log.err, facility="tahoe.init",
410                      level=log.BAD, umid="d3tNXA")
411
412     def init_helper(self):
413         d = self.when_tub_ready()
414         def _publish(self):
415             self.helper = Helper(os.path.join(self.basedir, "helper"),
416                                  self.storage_broker, self._secret_holder,
417                                  self.stats_provider, self.history)
418             # TODO: this is confusing. BASEDIR/private/helper.furl is created
419             # by the helper. BASEDIR/helper.furl is consumed by the client
420             # who wants to use the helper. I like having the filename be the
421             # same, since that makes 'cp' work smoothly, but the difference
422             # between config inputs and generated outputs is hard to see.
423             helper_furlfile = os.path.join(self.basedir,
424                                            "private", "helper.furl").encode(get_filesystem_encoding())
425             self.tub.registerReference(self.helper, furlFile=helper_furlfile)
426         d.addCallback(_publish)
427         d.addErrback(log.err, facility="tahoe.init",
428                      level=log.BAD, umid="K0mW5w")
429
430     def init_key_gen(self, key_gen_furl):
431         d = self.when_tub_ready()
432         def _subscribe(self):
433             self.tub.connectTo(key_gen_furl, self._got_key_generator)
434         d.addCallback(_subscribe)
435         d.addErrback(log.err, facility="tahoe.init",
436                      level=log.BAD, umid="z9DMzw")
437
438     def _got_key_generator(self, key_generator):
439         self._key_generator.set_remote_generator(key_generator)
440         key_generator.notifyOnDisconnect(self._lost_key_generator)
441
442     def _lost_key_generator(self):
443         self._key_generator.set_remote_generator(None)
444
445     def set_default_mutable_keysize(self, keysize):
446         self._key_generator.set_default_keysize(keysize)
447
448     def init_web(self, webport):
449         self.log("init_web(webport=%s)", args=(webport,))
450
451         from allmydata.webish import WebishServer
452         nodeurl_path = os.path.join(self.basedir, "node.url")
453         staticdir = self.get_config("node", "web.static", "public_html")
454         staticdir = os.path.expanduser(staticdir)
455         ws = WebishServer(self, webport, nodeurl_path, staticdir)
456         self.add_service(ws)
457
458     def init_ftp_server(self):
459         if self.get_config("ftpd", "enabled", False, boolean=True):
460             accountfile = self.get_config("ftpd", "accounts.file", None)
461             accounturl = self.get_config("ftpd", "accounts.url", None)
462             ftp_portstr = self.get_config("ftpd", "port", "8021")
463
464             from allmydata.frontends import ftpd
465             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
466             s.setServiceParent(self)
467
468     def init_sftp_server(self):
469         if self.get_config("sftpd", "enabled", False, boolean=True):
470             accountfile = self.get_config("sftpd", "accounts.file", None)
471             accounturl = self.get_config("sftpd", "accounts.url", None)
472             sftp_portstr = self.get_config("sftpd", "port", "8022")
473             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
474             privkey_file = self.get_config("sftpd", "host_privkey_file")
475
476             from allmydata.frontends import sftpd
477             s = sftpd.SFTPServer(self, accountfile, accounturl,
478                                  sftp_portstr, pubkey_file, privkey_file)
479             s.setServiceParent(self)
480
481     def init_drop_uploader(self):
482         if self.get_config("drop_upload", "enabled", False, boolean=True):
483             if self.get_config("drop_upload", "upload.dircap", None):
484                 raise OldConfigOptionError("The [drop_upload]upload.dircap option is no longer supported; please "
485                                            "put the cap in a 'private/drop_upload_dircap' file, and delete this option.")
486
487             upload_dircap = self.get_or_create_private_config("drop_upload_dircap")
488             local_dir_utf8 = self.get_config("drop_upload", "local.directory")
489
490             try:
491                 from allmydata.frontends import drop_upload
492                 s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
493                 s.setServiceParent(self)
494                 s.startService()
495             except Exception, e:
496                 self.log("couldn't start drop-uploader: %r", args=(e,))
497
498     def _check_exit_trigger(self, exit_trigger_file):
499         if os.path.exists(exit_trigger_file):
500             mtime = os.stat(exit_trigger_file)[stat.ST_MTIME]
501             if mtime > time.time() - 120.0:
502                 return
503             else:
504                 self.log("%s file too old, shutting down" % (self.EXIT_TRIGGER_FILE,))
505         else:
506             self.log("%s file missing, shutting down" % (self.EXIT_TRIGGER_FILE,))
507         reactor.stop()
508
509     def get_encoding_parameters(self):
510         return self.encoding_params
511
512     def connected_to_introducer(self):
513         if self.introducer_client:
514             return self.introducer_client.connected_to_introducer()
515         return False
516
517     def get_renewal_secret(self): # this will go away
518         return self._secret_holder.get_renewal_secret()
519
520     def get_cancel_secret(self):
521         return self._secret_holder.get_cancel_secret()
522
523     def debug_wait_for_client_connections(self, num_clients):
524         """Return a Deferred that fires (with None) when we have connections
525         to the given number of peers. Useful for tests that set up a
526         temporary test network and need to know when it is safe to proceed
527         with an upload or download."""
528         def _check():
529             return len(self.storage_broker.get_connected_servers()) >= num_clients
530         d = self.poll(_check, 0.5)
531         d.addCallback(lambda res: None)
532         return d
533
534
535     # these four methods are the primitives for creating filenodes and
536     # dirnodes. The first takes a URI and produces a filenode or (new-style)
537     # dirnode. The other three create brand-new filenodes/dirnodes.
538
539     def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
540         # This returns synchronously.
541         # Note that it does *not* validate the write_uri and read_uri; instead we
542         # may get an opaque node if there were any problems.
543         return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
544
545     def create_dirnode(self, initial_children={}, version=None):
546         d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
547         return d
548
549     def create_immutable_dirnode(self, children, convergence=None):
550         return self.nodemaker.create_immutable_directory(children, convergence)
551
552     def create_mutable_file(self, contents=None, keysize=None, version=None):
553         return self.nodemaker.create_mutable_file(contents, keysize,
554                                                   version=version)
555
556     def upload(self, uploadable):
557         uploader = self.getServiceNamed("uploader")
558         return uploader.upload(uploadable)