]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
new feature: preferred storage servers
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata import node
3
4 from zope.interface import implements
5 from twisted.internet import reactor, defer
6 from twisted.application import service
7 from twisted.application.internet import TimerService
8 from pycryptopp.publickey import rsa
9
10 import allmydata
11 from allmydata.storage.server import StorageServer
12 from allmydata import storage_client
13 from allmydata.immutable.upload import Uploader
14 from allmydata.immutable.offloaded import Helper
15 from allmydata.control import ControlServer
16 from allmydata.introducer.client import IntroducerClient
17 from allmydata.util import hashutil, base32, pollmixin, log, keyutil, idlib
18 from allmydata.util.encodingutil import get_filesystem_encoding, \
19      from_utf8_or_none
20 from allmydata.util.fileutil import abspath_expanduser_unicode
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.stats import StatsProvider
24 from allmydata.history import History
25 from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION
26 from allmydata.nodemaker import NodeMaker
27 from allmydata.blacklist import Blacklist
28 from allmydata.node import OldConfigOptionError
29
30
31 KiB=1024
32 MiB=1024*KiB
33 GiB=1024*MiB
34 TiB=1024*GiB
35 PiB=1024*TiB
36
37 def _make_secret():
38     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
39
40 class SecretHolder:
41     def __init__(self, lease_secret, convergence_secret):
42         self._lease_secret = lease_secret
43         self._convergence_secret = convergence_secret
44
45     def get_renewal_secret(self):
46         return hashutil.my_renewal_secret_hash(self._lease_secret)
47
48     def get_cancel_secret(self):
49         return hashutil.my_cancel_secret_hash(self._lease_secret)
50
51     def get_convergence_secret(self):
52         return self._convergence_secret
53
54 class KeyGenerator:
55     """I create RSA keys for mutable files. Each call to generate() returns a
56     single keypair. The keysize is specified first by the keysize= argument
57     to generate(), then with a default set by set_default_keysize(), then
58     with a built-in default of 2048 bits."""
59     def __init__(self):
60         self._remote = None
61         self.default_keysize = 2048
62
63     def set_remote_generator(self, keygen):
64         self._remote = keygen
65     def set_default_keysize(self, keysize):
66         """Call this to override the size of the RSA keys created for new
67         mutable files which don't otherwise specify a size. This will affect
68         all subsequent calls to generate() without a keysize= argument. The
69         default size is 2048 bits. Test cases should call this method once
70         during setup, to cause me to create smaller keys, so the unit tests
71         run faster."""
72         self.default_keysize = keysize
73
74     def generate(self, keysize=None):
75         """I return a Deferred that fires with a (verifyingkey, signingkey)
76         pair. I accept a keysize in bits (2048 bit keys are standard, smaller
77         keys are used for testing). If you do not provide a keysize, I will
78         use my default, which is set by a call to set_default_keysize(). If
79         set_default_keysize() has never been called, I will create 2048 bit
80         keys."""
81         keysize = keysize or self.default_keysize
82         if self._remote:
83             d = self._remote.callRemote('get_rsa_key_pair', keysize)
84             def make_key_objs((verifying_key, signing_key)):
85                 v = rsa.create_verifying_key_from_string(verifying_key)
86                 s = rsa.create_signing_key_from_string(signing_key)
87                 return v, s
88             d.addCallback(make_key_objs)
89             return d
90         else:
91             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
92             # secs
93             signer = rsa.generate(keysize)
94             verifier = signer.get_verifying_key()
95             return defer.succeed( (verifier, signer) )
96
97 class Terminator(service.Service):
98     def __init__(self):
99         self._clients = weakref.WeakKeyDictionary()
100     def register(self, c):
101         self._clients[c] = None
102     def stopService(self):
103         for c in self._clients:
104             c.stop()
105         return service.Service.stopService(self)
106
107
108 class Client(node.Node, pollmixin.PollMixin):
109     implements(IStatsProducer)
110
111     PORTNUMFILE = "client.port"
112     STOREDIR = 'storage'
113     NODETYPE = "client"
114     EXIT_TRIGGER_FILE = "exit_trigger"
115
116     # This means that if a storage server treats me as though I were a
117     # 1.0.0 storage client, it will work as they expect.
118     OLDEST_SUPPORTED_VERSION = "1.0.0"
119
120     # This is a dictionary of (needed, desired, total, max_segment_size). 'needed'
121     # is the number of shares required to reconstruct a file. 'desired' means
122     # that we will abort an upload unless we can allocate space for at least
123     # this many. 'total' is the total number of shares created by encoding.
124     # If everybody has room then this is is how many we will upload.
125     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
126                                    "happy": 7,
127                                    "n": 10,
128                                    "max_segment_size": 128*KiB,
129                                    }
130
131     def __init__(self, basedir="."):
132         node.Node.__init__(self, basedir)
133         self.started_timestamp = time.time()
134         self.logSource="Client"
135         self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
136         self.init_introducer_client()
137         self.init_stats_provider()
138         self.init_secrets()
139         self.init_node_key()
140         self.init_storage()
141         self.init_control()
142         self.helper = None
143         if self.get_config("helper", "enabled", False, boolean=True):
144             self.init_helper()
145         self._key_generator = KeyGenerator()
146         key_gen_furl = self.get_config("client", "key_generator.furl", None)
147         if key_gen_furl:
148             self.init_key_gen(key_gen_furl)
149         self.init_client()
150         # ControlServer and Helper are attached after Tub startup
151         self.init_ftp_server()
152         self.init_sftp_server()
153         self.init_drop_uploader()
154
155         # If the node sees an exit_trigger file, it will poll every second to see
156         # whether the file still exists, and what its mtime is. If the file does not
157         # exist or has not been modified for a given timeout, the node will exit.
158         exit_trigger_file = os.path.join(self.basedir,
159                                          self.EXIT_TRIGGER_FILE)
160         if os.path.exists(exit_trigger_file):
161             age = time.time() - os.stat(exit_trigger_file)[stat.ST_MTIME]
162             self.log("%s file noticed (%ds old), starting timer" % (self.EXIT_TRIGGER_FILE, age))
163             exit_trigger = TimerService(1.0, self._check_exit_trigger, exit_trigger_file)
164             exit_trigger.setServiceParent(self)
165
166         # this needs to happen last, so it can use getServiceNamed() to
167         # acquire references to StorageServer and other web-statusable things
168         webport = self.get_config("node", "web.port", None)
169         if webport:
170             self.init_web(webport) # strports string
171
172     def _sequencer(self):
173         seqnum_s = self.get_config_from_file("announcement-seqnum")
174         if not seqnum_s:
175             seqnum_s = "0"
176         seqnum = int(seqnum_s.strip())
177         seqnum += 1 # increment
178         self.write_config("announcement-seqnum", "%d\n" % seqnum)
179         nonce = _make_secret().strip()
180         return seqnum, nonce
181
182     def init_introducer_client(self):
183         self.introducer_furl = self.get_config("client", "introducer.furl")
184         ic = IntroducerClient(self.tub, self.introducer_furl,
185                               self.nickname,
186                               str(allmydata.__full_version__),
187                               str(self.OLDEST_SUPPORTED_VERSION),
188                               self.get_app_versions(),
189                               self._sequencer)
190         self.introducer_client = ic
191         # hold off on starting the IntroducerClient until our tub has been
192         # started, so we'll have a useful address on our RemoteReference, so
193         # that the introducer's status page will show us.
194         d = self.when_tub_ready()
195         def _start_introducer_client(res):
196             ic.setServiceParent(self)
197         d.addCallback(_start_introducer_client)
198         d.addErrback(log.err, facility="tahoe.init",
199                      level=log.BAD, umid="URyI5w")
200
201     def init_stats_provider(self):
202         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
203         self.stats_provider = StatsProvider(self, gatherer_furl)
204         self.add_service(self.stats_provider)
205         self.stats_provider.register_producer(self)
206
207     def get_stats(self):
208         return { 'node.uptime': time.time() - self.started_timestamp }
209
210     def init_secrets(self):
211         lease_s = self.get_or_create_private_config("secret", _make_secret)
212         lease_secret = base32.a2b(lease_s)
213         convergence_s = self.get_or_create_private_config('convergence',
214                                                           _make_secret)
215         self.convergence = base32.a2b(convergence_s)
216         self._secret_holder = SecretHolder(lease_secret, self.convergence)
217
218     def init_node_key(self):
219         # we only create the key once. On all subsequent runs, we re-use the
220         # existing key
221         def _make_key():
222             sk_vs,vk_vs = keyutil.make_keypair()
223             return sk_vs+"\n"
224         sk_vs = self.get_or_create_private_config("node.privkey", _make_key)
225         sk,vk_vs = keyutil.parse_privkey(sk_vs.strip())
226         self.write_config("node.pubkey", vk_vs+"\n")
227         self._node_key = sk
228
229     def get_long_nodeid(self):
230         # this matches what IServer.get_longname() says about us elsewhere
231         vk_bytes = self._node_key.get_verifying_key_bytes()
232         return "v0-"+base32.b2a(vk_bytes)
233
234     def get_long_tubid(self):
235         return idlib.nodeid_b2a(self.nodeid)
236
237     def _init_permutation_seed(self, ss):
238         seed = self.get_config_from_file("permutation-seed")
239         if not seed:
240             have_shares = ss.have_shares()
241             if have_shares:
242                 # if the server has shares but not a recorded
243                 # permutation-seed, then it has been around since pre-#466
244                 # days, and the clients who uploaded those shares used our
245                 # TubID as a permutation-seed. We should keep using that same
246                 # seed to keep the shares in the same place in the permuted
247                 # ring, so those clients don't have to perform excessive
248                 # searches.
249                 seed = base32.b2a(self.nodeid)
250             else:
251                 # otherwise, we're free to use the more natural seed of our
252                 # pubkey-based serverid
253                 vk_bytes = self._node_key.get_verifying_key_bytes()
254                 seed = base32.b2a(vk_bytes)
255             self.write_config("permutation-seed", seed+"\n")
256         return seed.strip()
257
258     def init_storage(self):
259         # should we run a storage server (and publish it for others to use)?
260         if not self.get_config("storage", "enabled", True, boolean=True):
261             return
262         readonly = self.get_config("storage", "readonly", False, boolean=True)
263
264         storedir = os.path.join(self.basedir, self.STOREDIR)
265
266         data = self.get_config("storage", "reserved_space", None)
267         try:
268             reserved = parse_abbreviated_size(data)
269         except ValueError:
270             log.msg("[storage]reserved_space= contains unparseable value %s"
271                     % data)
272             raise
273         if reserved is None:
274             reserved = 0
275         discard = self.get_config("storage", "debug_discard", False,
276                                   boolean=True)
277
278         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
279         if expire:
280             mode = self.get_config("storage", "expire.mode") # require a mode
281         else:
282             mode = self.get_config("storage", "expire.mode", "age")
283
284         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
285         if o_l_d is not None:
286             o_l_d = parse_duration(o_l_d)
287
288         cutoff_date = None
289         if mode == "cutoff-date":
290             cutoff_date = self.get_config("storage", "expire.cutoff_date")
291             cutoff_date = parse_date(cutoff_date)
292
293         sharetypes = []
294         if self.get_config("storage", "expire.immutable", True, boolean=True):
295             sharetypes.append("immutable")
296         if self.get_config("storage", "expire.mutable", True, boolean=True):
297             sharetypes.append("mutable")
298         expiration_sharetypes = tuple(sharetypes)
299
300         ss = StorageServer(storedir, self.nodeid,
301                            reserved_space=reserved,
302                            discard_storage=discard,
303                            readonly_storage=readonly,
304                            stats_provider=self.stats_provider,
305                            expiration_enabled=expire,
306                            expiration_mode=mode,
307                            expiration_override_lease_duration=o_l_d,
308                            expiration_cutoff_date=cutoff_date,
309                            expiration_sharetypes=expiration_sharetypes)
310         self.add_service(ss)
311
312         d = self.when_tub_ready()
313         # we can't do registerReference until the Tub is ready
314         def _publish(res):
315             furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
316             furl = self.tub.registerReference(ss, furlFile=furl_file)
317             ann = {"anonymous-storage-FURL": furl,
318                    "permutation-seed-base32": self._init_permutation_seed(ss),
319                    }
320             self.introducer_client.publish("storage", ann, self._node_key)
321         d.addCallback(_publish)
322         d.addErrback(log.err, facility="tahoe.init",
323                      level=log.BAD, umid="aLGBKw")
324
325     def init_client(self):
326         helper_furl = self.get_config("client", "helper.furl", None)
327         if helper_furl in ("None", ""):
328             helper_furl = None
329
330         DEP = self.encoding_params
331         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
332         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
333         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
334
335         self.init_client_storage_broker()
336         self.history = History(self.stats_provider)
337         self.terminator = Terminator()
338         self.terminator.setServiceParent(self)
339         self.add_service(Uploader(helper_furl, self.stats_provider,
340                                   self.history))
341         self.init_blacklist()
342         self.init_nodemaker()
343
344     def init_client_storage_broker(self):
345         # create a StorageFarmBroker object, for use by Uploader/Downloader
346         # (and everybody else who wants to use storage servers)
347         ps = self.get_config("client", "peers.preferred", "").split(",")
348         preferred_peers = tuple([p.strip() for p in ps if p != ""])
349         sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True, preferred_peers=preferred_peers)
350         self.storage_broker = sb
351
352         # load static server specifications from tahoe.cfg, if any.
353         # Not quite ready yet.
354         #if self.config.has_section("client-server-selection"):
355         #    server_params = {} # maps serverid to dict of parameters
356         #    for (name, value) in self.config.items("client-server-selection"):
357         #        pieces = name.split(".")
358         #        if pieces[0] == "server":
359         #            serverid = pieces[1]
360         #            if serverid not in server_params:
361         #                server_params[serverid] = {}
362         #            server_params[serverid][pieces[2]] = value
363         #    for serverid, params in server_params.items():
364         #        server_type = params.pop("type")
365         #        if server_type == "tahoe-foolscap":
366         #            s = storage_client.NativeStorageClient(*params)
367         #        else:
368         #            msg = ("unrecognized server type '%s' in "
369         #                   "tahoe.cfg [client-server-selection]server.%s.type"
370         #                   % (server_type, serverid))
371         #            raise storage_client.UnknownServerTypeError(msg)
372         #        sb.add_server(s.serverid, s)
373
374         # check to see if we're supposed to use the introducer too
375         if self.get_config("client-server-selection", "use_introducer",
376                            default=True, boolean=True):
377             sb.use_introducer(self.introducer_client)
378
379     def get_storage_broker(self):
380         return self.storage_broker
381
382     def init_blacklist(self):
383         fn = os.path.join(self.basedir, "access.blacklist")
384         self.blacklist = Blacklist(fn)
385
386     def init_nodemaker(self):
387         default = self.get_config("client", "mutable.format", default="SDMF")
388         if default.upper() == "MDMF":
389             self.mutable_file_default = MDMF_VERSION
390         else:
391             self.mutable_file_default = SDMF_VERSION
392         self.nodemaker = NodeMaker(self.storage_broker,
393                                    self._secret_holder,
394                                    self.get_history(),
395                                    self.getServiceNamed("uploader"),
396                                    self.terminator,
397                                    self.get_encoding_parameters(),
398                                    self.mutable_file_default,
399                                    self._key_generator,
400                                    self.blacklist)
401
402     def get_history(self):
403         return self.history
404
405     def init_control(self):
406         d = self.when_tub_ready()
407         def _publish(res):
408             c = ControlServer()
409             c.setServiceParent(self)
410             control_url = self.tub.registerReference(c)
411             self.write_private_config("control.furl", control_url + "\n")
412         d.addCallback(_publish)
413         d.addErrback(log.err, facility="tahoe.init",
414                      level=log.BAD, umid="d3tNXA")
415
416     def init_helper(self):
417         d = self.when_tub_ready()
418         def _publish(self):
419             self.helper = Helper(os.path.join(self.basedir, "helper"),
420                                  self.storage_broker, self._secret_holder,
421                                  self.stats_provider, self.history)
422             # TODO: this is confusing. BASEDIR/private/helper.furl is created
423             # by the helper. BASEDIR/helper.furl is consumed by the client
424             # who wants to use the helper. I like having the filename be the
425             # same, since that makes 'cp' work smoothly, but the difference
426             # between config inputs and generated outputs is hard to see.
427             helper_furlfile = os.path.join(self.basedir,
428                                            "private", "helper.furl").encode(get_filesystem_encoding())
429             self.tub.registerReference(self.helper, furlFile=helper_furlfile)
430         d.addCallback(_publish)
431         d.addErrback(log.err, facility="tahoe.init",
432                      level=log.BAD, umid="K0mW5w")
433
434     def init_key_gen(self, key_gen_furl):
435         d = self.when_tub_ready()
436         def _subscribe(self):
437             self.tub.connectTo(key_gen_furl, self._got_key_generator)
438         d.addCallback(_subscribe)
439         d.addErrback(log.err, facility="tahoe.init",
440                      level=log.BAD, umid="z9DMzw")
441
442     def _got_key_generator(self, key_generator):
443         self._key_generator.set_remote_generator(key_generator)
444         key_generator.notifyOnDisconnect(self._lost_key_generator)
445
446     def _lost_key_generator(self):
447         self._key_generator.set_remote_generator(None)
448
449     def set_default_mutable_keysize(self, keysize):
450         self._key_generator.set_default_keysize(keysize)
451
452     def init_web(self, webport):
453         self.log("init_web(webport=%s)", args=(webport,))
454
455         from allmydata.webish import WebishServer
456         nodeurl_path = os.path.join(self.basedir, "node.url")
457         staticdir_config = self.get_config("node", "web.static", "public_html").decode("utf-8")
458         staticdir = abspath_expanduser_unicode(staticdir_config, base=self.basedir)
459         ws = WebishServer(self, webport, nodeurl_path, staticdir)
460         self.add_service(ws)
461
462     def init_ftp_server(self):
463         if self.get_config("ftpd", "enabled", False, boolean=True):
464             accountfile = from_utf8_or_none(
465                 self.get_config("ftpd", "accounts.file", None))
466             if accountfile:
467                 accountfile = abspath_expanduser_unicode(accountfile, base=self.basedir)
468             accounturl = self.get_config("ftpd", "accounts.url", None)
469             ftp_portstr = self.get_config("ftpd", "port", "8021")
470
471             from allmydata.frontends import ftpd
472             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
473             s.setServiceParent(self)
474
475     def init_sftp_server(self):
476         if self.get_config("sftpd", "enabled", False, boolean=True):
477             accountfile = from_utf8_or_none(
478                 self.get_config("sftpd", "accounts.file", None))
479             if accountfile:
480                 accountfile = abspath_expanduser_unicode(accountfile, base=self.basedir)
481             accounturl = self.get_config("sftpd", "accounts.url", None)
482             sftp_portstr = self.get_config("sftpd", "port", "8022")
483             pubkey_file = from_utf8_or_none(self.get_config("sftpd", "host_pubkey_file"))
484             privkey_file = from_utf8_or_none(self.get_config("sftpd", "host_privkey_file"))
485
486             from allmydata.frontends import sftpd
487             s = sftpd.SFTPServer(self, accountfile, accounturl,
488                                  sftp_portstr, pubkey_file, privkey_file)
489             s.setServiceParent(self)
490
491     def init_drop_uploader(self):
492         if self.get_config("drop_upload", "enabled", False, boolean=True):
493             if self.get_config("drop_upload", "upload.dircap", None):
494                 raise OldConfigOptionError("The [drop_upload]upload.dircap option is no longer supported; please "
495                                            "put the cap in a 'private/drop_upload_dircap' file, and delete this option.")
496
497             upload_dircap = self.get_or_create_private_config("drop_upload_dircap")
498             local_dir_utf8 = self.get_config("drop_upload", "local.directory")
499
500             try:
501                 from allmydata.frontends import drop_upload
502                 s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
503                 s.setServiceParent(self)
504                 s.startService()
505             except Exception, e:
506                 self.log("couldn't start drop-uploader: %r", args=(e,))
507
508     def _check_exit_trigger(self, exit_trigger_file):
509         if os.path.exists(exit_trigger_file):
510             mtime = os.stat(exit_trigger_file)[stat.ST_MTIME]
511             if mtime > time.time() - 120.0:
512                 return
513             else:
514                 self.log("%s file too old, shutting down" % (self.EXIT_TRIGGER_FILE,))
515         else:
516             self.log("%s file missing, shutting down" % (self.EXIT_TRIGGER_FILE,))
517         reactor.stop()
518
519     def get_encoding_parameters(self):
520         return self.encoding_params
521
522     def connected_to_introducer(self):
523         if self.introducer_client:
524             return self.introducer_client.connected_to_introducer()
525         return False
526
527     def get_renewal_secret(self): # this will go away
528         return self._secret_holder.get_renewal_secret()
529
530     def get_cancel_secret(self):
531         return self._secret_holder.get_cancel_secret()
532
533     def debug_wait_for_client_connections(self, num_clients):
534         """Return a Deferred that fires (with None) when we have connections
535         to the given number of peers. Useful for tests that set up a
536         temporary test network and need to know when it is safe to proceed
537         with an upload or download."""
538         def _check():
539             return len(self.storage_broker.get_connected_servers()) >= num_clients
540         d = self.poll(_check, 0.5)
541         d.addCallback(lambda res: None)
542         return d
543
544
545     # these four methods are the primitives for creating filenodes and
546     # dirnodes. The first takes a URI and produces a filenode or (new-style)
547     # dirnode. The other three create brand-new filenodes/dirnodes.
548
549     def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
550         # This returns synchronously.
551         # Note that it does *not* validate the write_uri and read_uri; instead we
552         # may get an opaque node if there were any problems.
553         return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
554
555     def create_dirnode(self, initial_children={}, version=None):
556         d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
557         return d
558
559     def create_immutable_dirnode(self, children, convergence=None):
560         return self.nodemaker.create_immutable_directory(children, convergence)
561
562     def create_mutable_file(self, contents=None, keysize=None, version=None):
563         return self.nodemaker.create_mutable_file(contents, keysize,
564                                                   version=version)
565
566     def upload(self, uploadable):
567         uploader = self.getServiceNamed("uploader")
568         return uploader.upload(uploadable)