]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
bb6dce23096d4d0af7b89973af4e08394a0a460b
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata import node
3
4 from zope.interface import implements
5 from twisted.internet import reactor, defer
6 from twisted.application import service
7 from twisted.application.internet import TimerService
8 from pycryptopp.publickey import rsa
9
10 import allmydata
11 from allmydata.storage.server import StorageServer
12 from allmydata import storage_client
13 from allmydata.immutable.upload import Uploader
14 from allmydata.immutable.offloaded import Helper
15 from allmydata.control import ControlServer
16 from allmydata.introducer.client import IntroducerClient
17 from allmydata.util import hashutil, base32, pollmixin, log, keyutil, idlib
18 from allmydata.util.encodingutil import get_filesystem_encoding, \
19      from_utf8_or_none
20 from allmydata.util.fileutil import abspath_expanduser_unicode
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.stats import StatsProvider
24 from allmydata.history import History
25 from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION
26 from allmydata.nodemaker import NodeMaker
27 from allmydata.blacklist import Blacklist
28 from allmydata.node import OldConfigOptionError
29
30
31 KiB=1024
32 MiB=1024*KiB
33 GiB=1024*MiB
34 TiB=1024*GiB
35 PiB=1024*TiB
36
37 def _make_secret():
38     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
39
40 class SecretHolder:
41     def __init__(self, lease_secret, convergence_secret):
42         self._lease_secret = lease_secret
43         self._convergence_secret = convergence_secret
44
45     def get_renewal_secret(self):
46         return hashutil.my_renewal_secret_hash(self._lease_secret)
47
48     def get_cancel_secret(self):
49         return hashutil.my_cancel_secret_hash(self._lease_secret)
50
51     def get_convergence_secret(self):
52         return self._convergence_secret
53
54 class KeyGenerator:
55     """I create RSA keys for mutable files. Each call to generate() returns a
56     single keypair. The keysize is specified first by the keysize= argument
57     to generate(), then with a default set by set_default_keysize(), then
58     with a built-in default of 2048 bits."""
59     def __init__(self):
60         self._remote = None
61         self.default_keysize = 2048
62
63     def set_remote_generator(self, keygen):
64         self._remote = keygen
65     def set_default_keysize(self, keysize):
66         """Call this to override the size of the RSA keys created for new
67         mutable files which don't otherwise specify a size. This will affect
68         all subsequent calls to generate() without a keysize= argument. The
69         default size is 2048 bits. Test cases should call this method once
70         during setup, to cause me to create smaller keys, so the unit tests
71         run faster."""
72         self.default_keysize = keysize
73
74     def generate(self, keysize=None):
75         """I return a Deferred that fires with a (verifyingkey, signingkey)
76         pair. I accept a keysize in bits (2048 bit keys are standard, smaller
77         keys are used for testing). If you do not provide a keysize, I will
78         use my default, which is set by a call to set_default_keysize(). If
79         set_default_keysize() has never been called, I will create 2048 bit
80         keys."""
81         keysize = keysize or self.default_keysize
82         if self._remote:
83             d = self._remote.callRemote('get_rsa_key_pair', keysize)
84             def make_key_objs((verifying_key, signing_key)):
85                 v = rsa.create_verifying_key_from_string(verifying_key)
86                 s = rsa.create_signing_key_from_string(signing_key)
87                 return v, s
88             d.addCallback(make_key_objs)
89             return d
90         else:
91             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
92             # secs
93             signer = rsa.generate(keysize)
94             verifier = signer.get_verifying_key()
95             return defer.succeed( (verifier, signer) )
96
97 class Terminator(service.Service):
98     def __init__(self):
99         self._clients = weakref.WeakKeyDictionary()
100     def register(self, c):
101         self._clients[c] = None
102     def stopService(self):
103         for c in self._clients:
104             c.stop()
105         return service.Service.stopService(self)
106
107
108 class Client(node.Node, pollmixin.PollMixin):
109     implements(IStatsProducer)
110
111     PORTNUMFILE = "client.port"
112     STOREDIR = 'storage'
113     NODETYPE = "client"
114     EXIT_TRIGGER_FILE = "exit_trigger"
115
116     # This means that if a storage server treats me as though I were a
117     # 1.0.0 storage client, it will work as they expect.
118     OLDEST_SUPPORTED_VERSION = "1.0.0"
119
120     # This is a dictionary of (needed, desired, total, max_segment_size). 'needed'
121     # is the number of shares required to reconstruct a file. 'desired' means
122     # that we will abort an upload unless we can allocate space for at least
123     # this many. 'total' is the total number of shares created by encoding.
124     # If everybody has room then this is is how many we will upload.
125     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
126                                    "happy": 7,
127                                    "n": 10,
128                                    "max_segment_size": 128*KiB,
129                                    }
130
131     def __init__(self, basedir="."):
132         node.Node.__init__(self, basedir)
133         self.started_timestamp = time.time()
134         self.logSource="Client"
135         self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
136         self.init_introducer_client()
137         self.init_stats_provider()
138         self.init_secrets()
139         self.init_node_key()
140         self.init_storage()
141         self.init_control()
142         self.helper = None
143         if self.get_config("helper", "enabled", False, boolean=True):
144             self.init_helper()
145         self._key_generator = KeyGenerator()
146         key_gen_furl = self.get_config("client", "key_generator.furl", None)
147         if key_gen_furl:
148             self.init_key_gen(key_gen_furl)
149         self.init_client()
150         # ControlServer and Helper are attached after Tub startup
151         self.init_ftp_server()
152         self.init_sftp_server()
153         self.init_drop_uploader()
154
155         # If the node sees an exit_trigger file, it will poll every second to see
156         # whether the file still exists, and what its mtime is. If the file does not
157         # exist or has not been modified for a given timeout, the node will exit.
158         exit_trigger_file = os.path.join(self.basedir,
159                                          self.EXIT_TRIGGER_FILE)
160         if os.path.exists(exit_trigger_file):
161             age = time.time() - os.stat(exit_trigger_file)[stat.ST_MTIME]
162             self.log("%s file noticed (%ds old), starting timer" % (self.EXIT_TRIGGER_FILE, age))
163             exit_trigger = TimerService(1.0, self._check_exit_trigger, exit_trigger_file)
164             exit_trigger.setServiceParent(self)
165
166         # this needs to happen last, so it can use getServiceNamed() to
167         # acquire references to StorageServer and other web-statusable things
168         webport = self.get_config("node", "web.port", None)
169         if webport:
170             self.init_web(webport) # strports string
171
172     def _sequencer(self):
173         seqnum_s = self.get_config_from_file("announcement-seqnum")
174         if not seqnum_s:
175             seqnum_s = "0"
176         seqnum = int(seqnum_s.strip())
177         seqnum += 1 # increment
178         self.write_config("announcement-seqnum", "%d\n" % seqnum)
179         nonce = _make_secret().strip()
180         return seqnum, nonce
181
182     def init_introducer_client(self):
183         self.introducer_furl = self.get_config("client", "introducer.furl")
184         ic = IntroducerClient(self.tub, self.introducer_furl,
185                               self.nickname,
186                               str(allmydata.__full_version__),
187                               str(self.OLDEST_SUPPORTED_VERSION),
188                               self.get_app_versions(),
189                               self._sequencer)
190         self.introducer_client = ic
191         # hold off on starting the IntroducerClient until our tub has been
192         # started, so we'll have a useful address on our RemoteReference, so
193         # that the introducer's status page will show us.
194         d = self.when_tub_ready()
195         def _start_introducer_client(res):
196             ic.setServiceParent(self)
197         d.addCallback(_start_introducer_client)
198         d.addErrback(log.err, facility="tahoe.init",
199                      level=log.BAD, umid="URyI5w")
200
201     def init_stats_provider(self):
202         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
203         self.stats_provider = StatsProvider(self, gatherer_furl)
204         self.add_service(self.stats_provider)
205         self.stats_provider.register_producer(self)
206
207     def get_stats(self):
208         return { 'node.uptime': time.time() - self.started_timestamp }
209
210     def init_secrets(self):
211         lease_s = self.get_or_create_private_config("secret", _make_secret)
212         lease_secret = base32.a2b(lease_s)
213         convergence_s = self.get_or_create_private_config('convergence',
214                                                           _make_secret)
215         self.convergence = base32.a2b(convergence_s)
216         self._secret_holder = SecretHolder(lease_secret, self.convergence)
217
218     def init_node_key(self):
219         # we only create the key once. On all subsequent runs, we re-use the
220         # existing key
221         def _make_key():
222             sk_vs,vk_vs = keyutil.make_keypair()
223             return sk_vs+"\n"
224         sk_vs = self.get_or_create_private_config("node.privkey", _make_key)
225         sk,vk_vs = keyutil.parse_privkey(sk_vs.strip())
226         self.write_config("node.pubkey", vk_vs+"\n")
227         self._node_key = sk
228
229     def get_long_nodeid(self):
230         # this matches what IServer.get_longname() says about us elsewhere
231         vk_bytes = self._node_key.get_verifying_key_bytes()
232         return "v0-"+base32.b2a(vk_bytes)
233
234     def get_long_tubid(self):
235         return idlib.nodeid_b2a(self.nodeid)
236
237     def _init_permutation_seed(self, ss):
238         seed = self.get_config_from_file("permutation-seed")
239         if not seed:
240             have_shares = ss.have_shares()
241             if have_shares:
242                 # if the server has shares but not a recorded
243                 # permutation-seed, then it has been around since pre-#466
244                 # days, and the clients who uploaded those shares used our
245                 # TubID as a permutation-seed. We should keep using that same
246                 # seed to keep the shares in the same place in the permuted
247                 # ring, so those clients don't have to perform excessive
248                 # searches.
249                 seed = base32.b2a(self.nodeid)
250             else:
251                 # otherwise, we're free to use the more natural seed of our
252                 # pubkey-based serverid
253                 vk_bytes = self._node_key.get_verifying_key_bytes()
254                 seed = base32.b2a(vk_bytes)
255             self.write_config("permutation-seed", seed+"\n")
256         return seed.strip()
257
258     def init_storage(self):
259         # should we run a storage server (and publish it for others to use)?
260         if not self.get_config("storage", "enabled", True, boolean=True):
261             return
262         readonly = self.get_config("storage", "readonly", False, boolean=True)
263
264         storedir = os.path.join(self.basedir, self.STOREDIR)
265
266         data = self.get_config("storage", "reserved_space", None)
267         try:
268             reserved = parse_abbreviated_size(data)
269         except ValueError:
270             log.msg("[storage]reserved_space= contains unparseable value %s"
271                     % data)
272             raise
273         if reserved is None:
274             reserved = 0
275         discard = self.get_config("storage", "debug_discard", False,
276                                   boolean=True)
277
278         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
279         if expire:
280             mode = self.get_config("storage", "expire.mode") # require a mode
281         else:
282             mode = self.get_config("storage", "expire.mode", "age")
283
284         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
285         if o_l_d is not None:
286             o_l_d = parse_duration(o_l_d)
287
288         cutoff_date = None
289         if mode == "cutoff-date":
290             cutoff_date = self.get_config("storage", "expire.cutoff_date")
291             cutoff_date = parse_date(cutoff_date)
292
293         sharetypes = []
294         if self.get_config("storage", "expire.immutable", True, boolean=True):
295             sharetypes.append("immutable")
296         if self.get_config("storage", "expire.mutable", True, boolean=True):
297             sharetypes.append("mutable")
298         expiration_sharetypes = tuple(sharetypes)
299
300         ss = StorageServer(storedir, self.nodeid,
301                            reserved_space=reserved,
302                            discard_storage=discard,
303                            readonly_storage=readonly,
304                            stats_provider=self.stats_provider,
305                            expiration_enabled=expire,
306                            expiration_mode=mode,
307                            expiration_override_lease_duration=o_l_d,
308                            expiration_cutoff_date=cutoff_date,
309                            expiration_sharetypes=expiration_sharetypes)
310         self.add_service(ss)
311
312         d = self.when_tub_ready()
313         # we can't do registerReference until the Tub is ready
314         def _publish(res):
315             furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
316             furl = self.tub.registerReference(ss, furlFile=furl_file)
317             ann = {"anonymous-storage-FURL": furl,
318                    "permutation-seed-base32": self._init_permutation_seed(ss),
319                    }
320             self.introducer_client.publish("storage", ann, self._node_key)
321         d.addCallback(_publish)
322         d.addErrback(log.err, facility="tahoe.init",
323                      level=log.BAD, umid="aLGBKw")
324
325     def init_client(self):
326         helper_furl = self.get_config("client", "helper.furl", None)
327         if helper_furl in ("None", ""):
328             helper_furl = None
329
330         DEP = self.encoding_params
331         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
332         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
333         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
334
335         self.init_client_storage_broker()
336         self.history = History(self.stats_provider)
337         self.terminator = Terminator()
338         self.terminator.setServiceParent(self)
339         self.add_service(Uploader(helper_furl, self.stats_provider,
340                                   self.history))
341         self.init_blacklist()
342         self.init_nodemaker()
343
344     def init_client_storage_broker(self):
345         # create a StorageFarmBroker object, for use by Uploader/Downloader
346         # (and everybody else who wants to use storage servers)
347         sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
348         self.storage_broker = sb
349
350         # load static server specifications from tahoe.cfg, if any.
351         # Not quite ready yet.
352         #if self.config.has_section("client-server-selection"):
353         #    server_params = {} # maps serverid to dict of parameters
354         #    for (name, value) in self.config.items("client-server-selection"):
355         #        pieces = name.split(".")
356         #        if pieces[0] == "server":
357         #            serverid = pieces[1]
358         #            if serverid not in server_params:
359         #                server_params[serverid] = {}
360         #            server_params[serverid][pieces[2]] = value
361         #    for serverid, params in server_params.items():
362         #        server_type = params.pop("type")
363         #        if server_type == "tahoe-foolscap":
364         #            s = storage_client.NativeStorageClient(*params)
365         #        else:
366         #            msg = ("unrecognized server type '%s' in "
367         #                   "tahoe.cfg [client-server-selection]server.%s.type"
368         #                   % (server_type, serverid))
369         #            raise storage_client.UnknownServerTypeError(msg)
370         #        sb.add_server(s.serverid, s)
371
372         # check to see if we're supposed to use the introducer too
373         if self.get_config("client-server-selection", "use_introducer",
374                            default=True, boolean=True):
375             sb.use_introducer(self.introducer_client)
376
377     def get_storage_broker(self):
378         return self.storage_broker
379
380     def init_blacklist(self):
381         fn = os.path.join(self.basedir, "access.blacklist")
382         self.blacklist = Blacklist(fn)
383
384     def init_nodemaker(self):
385         default = self.get_config("client", "mutable.format", default="SDMF")
386         if default.upper() == "MDMF":
387             self.mutable_file_default = MDMF_VERSION
388         else:
389             self.mutable_file_default = SDMF_VERSION
390         self.nodemaker = NodeMaker(self.storage_broker,
391                                    self._secret_holder,
392                                    self.get_history(),
393                                    self.getServiceNamed("uploader"),
394                                    self.terminator,
395                                    self.get_encoding_parameters(),
396                                    self.mutable_file_default,
397                                    self._key_generator,
398                                    self.blacklist)
399
400     def get_history(self):
401         return self.history
402
403     def init_control(self):
404         d = self.when_tub_ready()
405         def _publish(res):
406             c = ControlServer()
407             c.setServiceParent(self)
408             control_url = self.tub.registerReference(c)
409             self.write_private_config("control.furl", control_url + "\n")
410         d.addCallback(_publish)
411         d.addErrback(log.err, facility="tahoe.init",
412                      level=log.BAD, umid="d3tNXA")
413
414     def init_helper(self):
415         d = self.when_tub_ready()
416         def _publish(self):
417             self.helper = Helper(os.path.join(self.basedir, "helper"),
418                                  self.storage_broker, self._secret_holder,
419                                  self.stats_provider, self.history)
420             # TODO: this is confusing. BASEDIR/private/helper.furl is created
421             # by the helper. BASEDIR/helper.furl is consumed by the client
422             # who wants to use the helper. I like having the filename be the
423             # same, since that makes 'cp' work smoothly, but the difference
424             # between config inputs and generated outputs is hard to see.
425             helper_furlfile = os.path.join(self.basedir,
426                                            "private", "helper.furl").encode(get_filesystem_encoding())
427             self.tub.registerReference(self.helper, furlFile=helper_furlfile)
428         d.addCallback(_publish)
429         d.addErrback(log.err, facility="tahoe.init",
430                      level=log.BAD, umid="K0mW5w")
431
432     def init_key_gen(self, key_gen_furl):
433         d = self.when_tub_ready()
434         def _subscribe(self):
435             self.tub.connectTo(key_gen_furl, self._got_key_generator)
436         d.addCallback(_subscribe)
437         d.addErrback(log.err, facility="tahoe.init",
438                      level=log.BAD, umid="z9DMzw")
439
440     def _got_key_generator(self, key_generator):
441         self._key_generator.set_remote_generator(key_generator)
442         key_generator.notifyOnDisconnect(self._lost_key_generator)
443
444     def _lost_key_generator(self):
445         self._key_generator.set_remote_generator(None)
446
447     def set_default_mutable_keysize(self, keysize):
448         self._key_generator.set_default_keysize(keysize)
449
450     def init_web(self, webport):
451         self.log("init_web(webport=%s)", args=(webport,))
452
453         from allmydata.webish import WebishServer
454         nodeurl_path = os.path.join(self.basedir, "node.url")
455         staticdir_config = self.get_config("node", "web.static", "public_html").decode("utf-8")
456         staticdir = abspath_expanduser_unicode(staticdir_config, base=self.basedir)
457         ws = WebishServer(self, webport, nodeurl_path, staticdir)
458         self.add_service(ws)
459
460     def init_ftp_server(self):
461         if self.get_config("ftpd", "enabled", False, boolean=True):
462             accountfile = from_utf8_or_none(
463                 self.get_config("ftpd", "accounts.file", None))
464             if accountfile:
465                 accountfile = abspath_expanduser_unicode(accountfile, base=self.basedir)
466             accounturl = self.get_config("ftpd", "accounts.url", None)
467             ftp_portstr = self.get_config("ftpd", "port", "8021")
468
469             from allmydata.frontends import ftpd
470             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
471             s.setServiceParent(self)
472
473     def init_sftp_server(self):
474         if self.get_config("sftpd", "enabled", False, boolean=True):
475             accountfile = from_utf8_or_none(
476                 self.get_config("sftpd", "accounts.file", None))
477             if accountfile:
478                 accountfile = abspath_expanduser_unicode(accountfile, base=self.basedir)
479             accounturl = self.get_config("sftpd", "accounts.url", None)
480             sftp_portstr = self.get_config("sftpd", "port", "8022")
481             pubkey_file = from_utf8_or_none(self.get_config("sftpd", "host_pubkey_file"))
482             privkey_file = from_utf8_or_none(self.get_config("sftpd", "host_privkey_file"))
483
484             from allmydata.frontends import sftpd
485             s = sftpd.SFTPServer(self, accountfile, accounturl,
486                                  sftp_portstr, pubkey_file, privkey_file)
487             s.setServiceParent(self)
488
489     def init_drop_uploader(self):
490         if self.get_config("drop_upload", "enabled", False, boolean=True):
491             if self.get_config("drop_upload", "upload.dircap", None):
492                 raise OldConfigOptionError("The [drop_upload]upload.dircap option is no longer supported; please "
493                                            "put the cap in a 'private/drop_upload_dircap' file, and delete this option.")
494
495             upload_dircap = self.get_or_create_private_config("drop_upload_dircap")
496             local_dir_utf8 = self.get_config("drop_upload", "local.directory")
497
498             try:
499                 from allmydata.frontends import drop_upload
500                 s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
501                 s.setServiceParent(self)
502                 s.startService()
503             except Exception, e:
504                 self.log("couldn't start drop-uploader: %r", args=(e,))
505
506     def _check_exit_trigger(self, exit_trigger_file):
507         if os.path.exists(exit_trigger_file):
508             mtime = os.stat(exit_trigger_file)[stat.ST_MTIME]
509             if mtime > time.time() - 120.0:
510                 return
511             else:
512                 self.log("%s file too old, shutting down" % (self.EXIT_TRIGGER_FILE,))
513         else:
514             self.log("%s file missing, shutting down" % (self.EXIT_TRIGGER_FILE,))
515         reactor.stop()
516
517     def get_encoding_parameters(self):
518         return self.encoding_params
519
520     def connected_to_introducer(self):
521         if self.introducer_client:
522             return self.introducer_client.connected_to_introducer()
523         return False
524
525     def get_renewal_secret(self): # this will go away
526         return self._secret_holder.get_renewal_secret()
527
528     def get_cancel_secret(self):
529         return self._secret_holder.get_cancel_secret()
530
531     def debug_wait_for_client_connections(self, num_clients):
532         """Return a Deferred that fires (with None) when we have connections
533         to the given number of peers. Useful for tests that set up a
534         temporary test network and need to know when it is safe to proceed
535         with an upload or download."""
536         def _check():
537             return len(self.storage_broker.get_connected_servers()) >= num_clients
538         d = self.poll(_check, 0.5)
539         d.addCallback(lambda res: None)
540         return d
541
542
543     # these four methods are the primitives for creating filenodes and
544     # dirnodes. The first takes a URI and produces a filenode or (new-style)
545     # dirnode. The other three create brand-new filenodes/dirnodes.
546
547     def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
548         # This returns synchronously.
549         # Note that it does *not* validate the write_uri and read_uri; instead we
550         # may get an opaque node if there were any problems.
551         return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
552
553     def create_dirnode(self, initial_children={}, version=None):
554         d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
555         return d
556
557     def create_immutable_dirnode(self, children, convergence=None):
558         return self.nodemaker.create_immutable_directory(children, convergence)
559
560     def create_mutable_file(self, contents=None, keysize=None, version=None):
561         return self.nodemaker.create_mutable_file(contents, keysize,
562                                                   version=version)
563
564     def upload(self, uploadable):
565         uploader = self.getServiceNamed("uploader")
566         return uploader.upload(uploadable)