]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
42c2ff384fe2ef6dff487dcc749c09a18d3c5f41
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata import node
3
4 from zope.interface import implements
5 from twisted.internet import reactor, defer
6 from twisted.application import service
7 from twisted.application.internet import TimerService
8 from pycryptopp.publickey import rsa
9
10 import allmydata
11 from allmydata.storage.server import StorageServer
12 from allmydata import storage_client
13 from allmydata.immutable.upload import Uploader
14 from allmydata.immutable.offloaded import Helper
15 from allmydata.control import ControlServer
16 from allmydata.introducer.client import IntroducerClient
17 from allmydata.util import hashutil, base32, pollmixin, log, keyutil, idlib
18 from allmydata.util.encodingutil import get_filesystem_encoding
19 from allmydata.util.fileutil import abspath_expanduser_unicode
20 from allmydata.util.abbreviate import parse_abbreviated_size
21 from allmydata.util.time_format import parse_duration, parse_date
22 from allmydata.stats import StatsProvider
23 from allmydata.history import History
24 from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION
25 from allmydata.nodemaker import NodeMaker
26 from allmydata.blacklist import Blacklist
27 from allmydata.node import OldConfigOptionError
28
29
30 KiB=1024
31 MiB=1024*KiB
32 GiB=1024*MiB
33 TiB=1024*GiB
34 PiB=1024*TiB
35
36 def _make_secret():
37     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
38
39 class SecretHolder:
40     def __init__(self, lease_secret, convergence_secret):
41         self._lease_secret = lease_secret
42         self._convergence_secret = convergence_secret
43
44     def get_renewal_secret(self):
45         return hashutil.my_renewal_secret_hash(self._lease_secret)
46
47     def get_cancel_secret(self):
48         return hashutil.my_cancel_secret_hash(self._lease_secret)
49
50     def get_convergence_secret(self):
51         return self._convergence_secret
52
53 class KeyGenerator:
54     """I create RSA keys for mutable files. Each call to generate() returns a
55     single keypair. The keysize is specified first by the keysize= argument
56     to generate(), then with a default set by set_default_keysize(), then
57     with a built-in default of 2048 bits."""
58     def __init__(self):
59         self._remote = None
60         self.default_keysize = 2048
61
62     def set_remote_generator(self, keygen):
63         self._remote = keygen
64     def set_default_keysize(self, keysize):
65         """Call this to override the size of the RSA keys created for new
66         mutable files which don't otherwise specify a size. This will affect
67         all subsequent calls to generate() without a keysize= argument. The
68         default size is 2048 bits. Test cases should call this method once
69         during setup, to cause me to create smaller keys, so the unit tests
70         run faster."""
71         self.default_keysize = keysize
72
73     def generate(self, keysize=None):
74         """I return a Deferred that fires with a (verifyingkey, signingkey)
75         pair. I accept a keysize in bits (2048 bit keys are standard, smaller
76         keys are used for testing). If you do not provide a keysize, I will
77         use my default, which is set by a call to set_default_keysize(). If
78         set_default_keysize() has never been called, I will create 2048 bit
79         keys."""
80         keysize = keysize or self.default_keysize
81         if self._remote:
82             d = self._remote.callRemote('get_rsa_key_pair', keysize)
83             def make_key_objs((verifying_key, signing_key)):
84                 v = rsa.create_verifying_key_from_string(verifying_key)
85                 s = rsa.create_signing_key_from_string(signing_key)
86                 return v, s
87             d.addCallback(make_key_objs)
88             return d
89         else:
90             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
91             # secs
92             signer = rsa.generate(keysize)
93             verifier = signer.get_verifying_key()
94             return defer.succeed( (verifier, signer) )
95
96 class Terminator(service.Service):
97     def __init__(self):
98         self._clients = weakref.WeakKeyDictionary()
99     def register(self, c):
100         self._clients[c] = None
101     def stopService(self):
102         for c in self._clients:
103             c.stop()
104         return service.Service.stopService(self)
105
106
107 class Client(node.Node, pollmixin.PollMixin):
108     implements(IStatsProducer)
109
110     PORTNUMFILE = "client.port"
111     STOREDIR = 'storage'
112     NODETYPE = "client"
113     EXIT_TRIGGER_FILE = "exit_trigger"
114
115     # This means that if a storage server treats me as though I were a
116     # 1.0.0 storage client, it will work as they expect.
117     OLDEST_SUPPORTED_VERSION = "1.0.0"
118
119     # This is a dictionary of (needed, desired, total, max_segment_size). 'needed'
120     # is the number of shares required to reconstruct a file. 'desired' means
121     # that we will abort an upload unless we can allocate space for at least
122     # this many. 'total' is the total number of shares created by encoding.
123     # If everybody has room then this is is how many we will upload.
124     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
125                                    "happy": 7,
126                                    "n": 10,
127                                    "max_segment_size": 128*KiB,
128                                    }
129
130     def __init__(self, basedir="."):
131         node.Node.__init__(self, basedir)
132         self.started_timestamp = time.time()
133         self.logSource="Client"
134         self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
135         self.init_introducer_client()
136         self.init_stats_provider()
137         self.init_secrets()
138         self.init_node_key()
139         self.init_storage()
140         self.init_control()
141         self.helper = None
142         if self.get_config("helper", "enabled", False, boolean=True):
143             self.init_helper()
144         self._key_generator = KeyGenerator()
145         key_gen_furl = self.get_config("client", "key_generator.furl", None)
146         if key_gen_furl:
147             self.init_key_gen(key_gen_furl)
148         self.init_client()
149         # ControlServer and Helper are attached after Tub startup
150         self.init_ftp_server()
151         self.init_sftp_server()
152         self.init_drop_uploader()
153
154         # If the node sees an exit_trigger file, it will poll every second to see
155         # whether the file still exists, and what its mtime is. If the file does not
156         # exist or has not been modified for a given timeout, the node will exit.
157         exit_trigger_file = os.path.join(self.basedir,
158                                          self.EXIT_TRIGGER_FILE)
159         if os.path.exists(exit_trigger_file):
160             age = time.time() - os.stat(exit_trigger_file)[stat.ST_MTIME]
161             self.log("%s file noticed (%ds old), starting timer" % (self.EXIT_TRIGGER_FILE, age))
162             exit_trigger = TimerService(1.0, self._check_exit_trigger, exit_trigger_file)
163             exit_trigger.setServiceParent(self)
164
165         # this needs to happen last, so it can use getServiceNamed() to
166         # acquire references to StorageServer and other web-statusable things
167         webport = self.get_config("node", "web.port", None)
168         if webport:
169             self.init_web(webport) # strports string
170
171     def _sequencer(self):
172         seqnum_s = self.get_config_from_file("announcement-seqnum")
173         if not seqnum_s:
174             seqnum_s = "0"
175         seqnum = int(seqnum_s.strip())
176         seqnum += 1 # increment
177         self.write_config("announcement-seqnum", "%d\n" % seqnum)
178         nonce = _make_secret().strip()
179         return seqnum, nonce
180
181     def init_introducer_client(self):
182         self.introducer_furl = self.get_config("client", "introducer.furl")
183         ic = IntroducerClient(self.tub, self.introducer_furl,
184                               self.nickname,
185                               str(allmydata.__full_version__),
186                               str(self.OLDEST_SUPPORTED_VERSION),
187                               self.get_app_versions(),
188                               self._sequencer)
189         self.introducer_client = ic
190         # hold off on starting the IntroducerClient until our tub has been
191         # started, so we'll have a useful address on our RemoteReference, so
192         # that the introducer's status page will show us.
193         d = self.when_tub_ready()
194         def _start_introducer_client(res):
195             ic.setServiceParent(self)
196         d.addCallback(_start_introducer_client)
197         d.addErrback(log.err, facility="tahoe.init",
198                      level=log.BAD, umid="URyI5w")
199
200     def init_stats_provider(self):
201         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
202         self.stats_provider = StatsProvider(self, gatherer_furl)
203         self.add_service(self.stats_provider)
204         self.stats_provider.register_producer(self)
205
206     def get_stats(self):
207         return { 'node.uptime': time.time() - self.started_timestamp }
208
209     def init_secrets(self):
210         lease_s = self.get_or_create_private_config("secret", _make_secret)
211         lease_secret = base32.a2b(lease_s)
212         convergence_s = self.get_or_create_private_config('convergence',
213                                                           _make_secret)
214         self.convergence = base32.a2b(convergence_s)
215         self._secret_holder = SecretHolder(lease_secret, self.convergence)
216
217     def init_node_key(self):
218         # we only create the key once. On all subsequent runs, we re-use the
219         # existing key
220         def _make_key():
221             sk_vs,vk_vs = keyutil.make_keypair()
222             return sk_vs+"\n"
223         sk_vs = self.get_or_create_private_config("node.privkey", _make_key)
224         sk,vk_vs = keyutil.parse_privkey(sk_vs.strip())
225         self.write_config("node.pubkey", vk_vs+"\n")
226         self._node_key = sk
227
228     def get_long_nodeid(self):
229         # this matches what IServer.get_longname() says about us elsewhere
230         vk_bytes = self._node_key.get_verifying_key_bytes()
231         return "v0-"+base32.b2a(vk_bytes)
232
233     def get_long_tubid(self):
234         return idlib.nodeid_b2a(self.nodeid)
235
236     def _init_permutation_seed(self, ss):
237         seed = self.get_config_from_file("permutation-seed")
238         if not seed:
239             have_shares = ss.have_shares()
240             if have_shares:
241                 # if the server has shares but not a recorded
242                 # permutation-seed, then it has been around since pre-#466
243                 # days, and the clients who uploaded those shares used our
244                 # TubID as a permutation-seed. We should keep using that same
245                 # seed to keep the shares in the same place in the permuted
246                 # ring, so those clients don't have to perform excessive
247                 # searches.
248                 seed = base32.b2a(self.nodeid)
249             else:
250                 # otherwise, we're free to use the more natural seed of our
251                 # pubkey-based serverid
252                 vk_bytes = self._node_key.get_verifying_key_bytes()
253                 seed = base32.b2a(vk_bytes)
254             self.write_config("permutation-seed", seed+"\n")
255         return seed.strip()
256
257     def init_storage(self):
258         # should we run a storage server (and publish it for others to use)?
259         if not self.get_config("storage", "enabled", True, boolean=True):
260             return
261         readonly = self.get_config("storage", "readonly", False, boolean=True)
262
263         storedir = os.path.join(self.basedir, self.STOREDIR)
264
265         data = self.get_config("storage", "reserved_space", None)
266         try:
267             reserved = parse_abbreviated_size(data)
268         except ValueError:
269             log.msg("[storage]reserved_space= contains unparseable value %s"
270                     % data)
271             raise
272         if reserved is None:
273             reserved = 0
274         discard = self.get_config("storage", "debug_discard", False,
275                                   boolean=True)
276
277         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
278         if expire:
279             mode = self.get_config("storage", "expire.mode") # require a mode
280         else:
281             mode = self.get_config("storage", "expire.mode", "age")
282
283         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
284         if o_l_d is not None:
285             o_l_d = parse_duration(o_l_d)
286
287         cutoff_date = None
288         if mode == "cutoff-date":
289             cutoff_date = self.get_config("storage", "expire.cutoff_date")
290             cutoff_date = parse_date(cutoff_date)
291
292         sharetypes = []
293         if self.get_config("storage", "expire.immutable", True, boolean=True):
294             sharetypes.append("immutable")
295         if self.get_config("storage", "expire.mutable", True, boolean=True):
296             sharetypes.append("mutable")
297         expiration_sharetypes = tuple(sharetypes)
298
299         ss = StorageServer(storedir, self.nodeid,
300                            reserved_space=reserved,
301                            discard_storage=discard,
302                            readonly_storage=readonly,
303                            stats_provider=self.stats_provider,
304                            expiration_enabled=expire,
305                            expiration_mode=mode,
306                            expiration_override_lease_duration=o_l_d,
307                            expiration_cutoff_date=cutoff_date,
308                            expiration_sharetypes=expiration_sharetypes)
309         self.add_service(ss)
310
311         d = self.when_tub_ready()
312         # we can't do registerReference until the Tub is ready
313         def _publish(res):
314             furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
315             furl = self.tub.registerReference(ss, furlFile=furl_file)
316             ann = {"anonymous-storage-FURL": furl,
317                    "permutation-seed-base32": self._init_permutation_seed(ss),
318                    }
319             self.introducer_client.publish("storage", ann, self._node_key)
320         d.addCallback(_publish)
321         d.addErrback(log.err, facility="tahoe.init",
322                      level=log.BAD, umid="aLGBKw")
323
324     def init_client(self):
325         helper_furl = self.get_config("client", "helper.furl", None)
326         if helper_furl in ("None", ""):
327             helper_furl = None
328
329         DEP = self.encoding_params
330         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
331         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
332         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
333
334         self.init_client_storage_broker()
335         self.history = History(self.stats_provider)
336         self.terminator = Terminator()
337         self.terminator.setServiceParent(self)
338         self.add_service(Uploader(helper_furl, self.stats_provider,
339                                   self.history))
340         self.init_blacklist()
341         self.init_nodemaker()
342
343     def init_client_storage_broker(self):
344         # create a StorageFarmBroker object, for use by Uploader/Downloader
345         # (and everybody else who wants to use storage servers)
346         sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
347         self.storage_broker = sb
348
349         # load static server specifications from tahoe.cfg, if any.
350         # Not quite ready yet.
351         #if self.config.has_section("client-server-selection"):
352         #    server_params = {} # maps serverid to dict of parameters
353         #    for (name, value) in self.config.items("client-server-selection"):
354         #        pieces = name.split(".")
355         #        if pieces[0] == "server":
356         #            serverid = pieces[1]
357         #            if serverid not in server_params:
358         #                server_params[serverid] = {}
359         #            server_params[serverid][pieces[2]] = value
360         #    for serverid, params in server_params.items():
361         #        server_type = params.pop("type")
362         #        if server_type == "tahoe-foolscap":
363         #            s = storage_client.NativeStorageClient(*params)
364         #        else:
365         #            msg = ("unrecognized server type '%s' in "
366         #                   "tahoe.cfg [client-server-selection]server.%s.type"
367         #                   % (server_type, serverid))
368         #            raise storage_client.UnknownServerTypeError(msg)
369         #        sb.add_server(s.serverid, s)
370
371         # check to see if we're supposed to use the introducer too
372         if self.get_config("client-server-selection", "use_introducer",
373                            default=True, boolean=True):
374             sb.use_introducer(self.introducer_client)
375
376     def get_storage_broker(self):
377         return self.storage_broker
378
379     def init_blacklist(self):
380         fn = os.path.join(self.basedir, "access.blacklist")
381         self.blacklist = Blacklist(fn)
382
383     def init_nodemaker(self):
384         default = self.get_config("client", "mutable.format", default="SDMF")
385         if default.upper() == "MDMF":
386             self.mutable_file_default = MDMF_VERSION
387         else:
388             self.mutable_file_default = SDMF_VERSION
389         self.nodemaker = NodeMaker(self.storage_broker,
390                                    self._secret_holder,
391                                    self.get_history(),
392                                    self.getServiceNamed("uploader"),
393                                    self.terminator,
394                                    self.get_encoding_parameters(),
395                                    self.mutable_file_default,
396                                    self._key_generator,
397                                    self.blacklist)
398
399     def get_history(self):
400         return self.history
401
402     def init_control(self):
403         d = self.when_tub_ready()
404         def _publish(res):
405             c = ControlServer()
406             c.setServiceParent(self)
407             control_url = self.tub.registerReference(c)
408             self.write_private_config("control.furl", control_url + "\n")
409         d.addCallback(_publish)
410         d.addErrback(log.err, facility="tahoe.init",
411                      level=log.BAD, umid="d3tNXA")
412
413     def init_helper(self):
414         d = self.when_tub_ready()
415         def _publish(self):
416             self.helper = Helper(os.path.join(self.basedir, "helper"),
417                                  self.storage_broker, self._secret_holder,
418                                  self.stats_provider, self.history)
419             # TODO: this is confusing. BASEDIR/private/helper.furl is created
420             # by the helper. BASEDIR/helper.furl is consumed by the client
421             # who wants to use the helper. I like having the filename be the
422             # same, since that makes 'cp' work smoothly, but the difference
423             # between config inputs and generated outputs is hard to see.
424             helper_furlfile = os.path.join(self.basedir,
425                                            "private", "helper.furl").encode(get_filesystem_encoding())
426             self.tub.registerReference(self.helper, furlFile=helper_furlfile)
427         d.addCallback(_publish)
428         d.addErrback(log.err, facility="tahoe.init",
429                      level=log.BAD, umid="K0mW5w")
430
431     def init_key_gen(self, key_gen_furl):
432         d = self.when_tub_ready()
433         def _subscribe(self):
434             self.tub.connectTo(key_gen_furl, self._got_key_generator)
435         d.addCallback(_subscribe)
436         d.addErrback(log.err, facility="tahoe.init",
437                      level=log.BAD, umid="z9DMzw")
438
439     def _got_key_generator(self, key_generator):
440         self._key_generator.set_remote_generator(key_generator)
441         key_generator.notifyOnDisconnect(self._lost_key_generator)
442
443     def _lost_key_generator(self):
444         self._key_generator.set_remote_generator(None)
445
446     def set_default_mutable_keysize(self, keysize):
447         self._key_generator.set_default_keysize(keysize)
448
449     def init_web(self, webport):
450         self.log("init_web(webport=%s)", args=(webport,))
451
452         from allmydata.webish import WebishServer
453         nodeurl_path = os.path.join(self.basedir, "node.url")
454         staticdir_config = self.get_config("node", "web.static", "public_html").decode("utf-8")
455         staticdir = abspath_expanduser_unicode(staticdir_config, base=self.basedir)
456         ws = WebishServer(self, webport, nodeurl_path, staticdir)
457         self.add_service(ws)
458
459     def init_ftp_server(self):
460         if self.get_config("ftpd", "enabled", False, boolean=True):
461             accountfile = self.get_config("ftpd", "accounts.file", None)
462             accounturl = self.get_config("ftpd", "accounts.url", None)
463             ftp_portstr = self.get_config("ftpd", "port", "8021")
464
465             from allmydata.frontends import ftpd
466             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
467             s.setServiceParent(self)
468
469     def init_sftp_server(self):
470         if self.get_config("sftpd", "enabled", False, boolean=True):
471             accountfile = self.get_config("sftpd", "accounts.file", None)
472             accounturl = self.get_config("sftpd", "accounts.url", None)
473             sftp_portstr = self.get_config("sftpd", "port", "8022")
474             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
475             privkey_file = self.get_config("sftpd", "host_privkey_file")
476
477             from allmydata.frontends import sftpd
478             s = sftpd.SFTPServer(self, accountfile, accounturl,
479                                  sftp_portstr, pubkey_file, privkey_file)
480             s.setServiceParent(self)
481
482     def init_drop_uploader(self):
483         if self.get_config("drop_upload", "enabled", False, boolean=True):
484             if self.get_config("drop_upload", "upload.dircap", None):
485                 raise OldConfigOptionError("The [drop_upload]upload.dircap option is no longer supported; please "
486                                            "put the cap in a 'private/drop_upload_dircap' file, and delete this option.")
487
488             upload_dircap = self.get_or_create_private_config("drop_upload_dircap")
489             local_dir_utf8 = self.get_config("drop_upload", "local.directory")
490
491             try:
492                 from allmydata.frontends import drop_upload
493                 s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
494                 s.setServiceParent(self)
495                 s.startService()
496             except Exception, e:
497                 self.log("couldn't start drop-uploader: %r", args=(e,))
498
499     def _check_exit_trigger(self, exit_trigger_file):
500         if os.path.exists(exit_trigger_file):
501             mtime = os.stat(exit_trigger_file)[stat.ST_MTIME]
502             if mtime > time.time() - 120.0:
503                 return
504             else:
505                 self.log("%s file too old, shutting down" % (self.EXIT_TRIGGER_FILE,))
506         else:
507             self.log("%s file missing, shutting down" % (self.EXIT_TRIGGER_FILE,))
508         reactor.stop()
509
510     def get_encoding_parameters(self):
511         return self.encoding_params
512
513     def connected_to_introducer(self):
514         if self.introducer_client:
515             return self.introducer_client.connected_to_introducer()
516         return False
517
518     def get_renewal_secret(self): # this will go away
519         return self._secret_holder.get_renewal_secret()
520
521     def get_cancel_secret(self):
522         return self._secret_holder.get_cancel_secret()
523
524     def debug_wait_for_client_connections(self, num_clients):
525         """Return a Deferred that fires (with None) when we have connections
526         to the given number of peers. Useful for tests that set up a
527         temporary test network and need to know when it is safe to proceed
528         with an upload or download."""
529         def _check():
530             return len(self.storage_broker.get_connected_servers()) >= num_clients
531         d = self.poll(_check, 0.5)
532         d.addCallback(lambda res: None)
533         return d
534
535
536     # these four methods are the primitives for creating filenodes and
537     # dirnodes. The first takes a URI and produces a filenode or (new-style)
538     # dirnode. The other three create brand-new filenodes/dirnodes.
539
540     def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
541         # This returns synchronously.
542         # Note that it does *not* validate the write_uri and read_uri; instead we
543         # may get an opaque node if there were any problems.
544         return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
545
546     def create_dirnode(self, initial_children={}, version=None):
547         d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
548         return d
549
550     def create_immutable_dirnode(self, children, convergence=None):
551         return self.nodemaker.create_immutable_directory(children, convergence)
552
553     def create_mutable_file(self, contents=None, keysize=None, version=None):
554         return self.nodemaker.create_mutable_file(contents, keysize,
555                                                   version=version)
556
557     def upload(self, uploadable):
558         uploader = self.getServiceNamed("uploader")
559         return uploader.upload(uploadable)