]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
a1e99d9d4d371dcf380465f3f1d819dcad8b6160
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata import node
3
4 from zope.interface import implements
5 from twisted.internet import reactor, defer
6 from twisted.application import service
7 from twisted.application.internet import TimerService
8 from pycryptopp.publickey import rsa
9
10 import allmydata
11 from allmydata.storage.server import StorageServer
12 from allmydata import storage_client
13 from allmydata.immutable.upload import Uploader
14 from allmydata.immutable.offloaded import Helper
15 from allmydata.control import ControlServer
16 from allmydata.introducer.client import IntroducerClient
17 from allmydata.util import hashutil, base32, pollmixin, log, keyutil
18 from allmydata.util.encodingutil import get_filesystem_encoding
19 from allmydata.util.abbreviate import parse_abbreviated_size
20 from allmydata.util.time_format import parse_duration, parse_date
21 from allmydata.stats import StatsProvider
22 from allmydata.history import History
23 from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION
24 from allmydata.nodemaker import NodeMaker
25 from allmydata.blacklist import Blacklist
26 from allmydata.node import OldConfigOptionError
27
28
29 KiB=1024
30 MiB=1024*KiB
31 GiB=1024*MiB
32 TiB=1024*GiB
33 PiB=1024*TiB
34
35 def _make_secret():
36     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
37
38 class SecretHolder:
39     def __init__(self, lease_secret, convergence_secret):
40         self._lease_secret = lease_secret
41         self._convergence_secret = convergence_secret
42
43     def get_renewal_secret(self):
44         return hashutil.my_renewal_secret_hash(self._lease_secret)
45
46     def get_cancel_secret(self):
47         return hashutil.my_cancel_secret_hash(self._lease_secret)
48
49     def get_convergence_secret(self):
50         return self._convergence_secret
51
52 class KeyGenerator:
53     """I create RSA keys for mutable files. Each call to generate() returns a
54     single keypair. The keysize is specified first by the keysize= argument
55     to generate(), then with a default set by set_default_keysize(), then
56     with a built-in default of 2048 bits."""
57     def __init__(self):
58         self._remote = None
59         self.default_keysize = 2048
60
61     def set_remote_generator(self, keygen):
62         self._remote = keygen
63     def set_default_keysize(self, keysize):
64         """Call this to override the size of the RSA keys created for new
65         mutable files which don't otherwise specify a size. This will affect
66         all subsequent calls to generate() without a keysize= argument. The
67         default size is 2048 bits. Test cases should call this method once
68         during setup, to cause me to create smaller keys, so the unit tests
69         run faster."""
70         self.default_keysize = keysize
71
72     def generate(self, keysize=None):
73         """I return a Deferred that fires with a (verifyingkey, signingkey)
74         pair. I accept a keysize in bits (2048 bit keys are standard, smaller
75         keys are used for testing). If you do not provide a keysize, I will
76         use my default, which is set by a call to set_default_keysize(). If
77         set_default_keysize() has never been called, I will create 2048 bit
78         keys."""
79         keysize = keysize or self.default_keysize
80         if self._remote:
81             d = self._remote.callRemote('get_rsa_key_pair', keysize)
82             def make_key_objs((verifying_key, signing_key)):
83                 v = rsa.create_verifying_key_from_string(verifying_key)
84                 s = rsa.create_signing_key_from_string(signing_key)
85                 return v, s
86             d.addCallback(make_key_objs)
87             return d
88         else:
89             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
90             # secs
91             signer = rsa.generate(keysize)
92             verifier = signer.get_verifying_key()
93             return defer.succeed( (verifier, signer) )
94
95 class Terminator(service.Service):
96     def __init__(self):
97         self._clients = weakref.WeakKeyDictionary()
98     def register(self, c):
99         self._clients[c] = None
100     def stopService(self):
101         for c in self._clients:
102             c.stop()
103         return service.Service.stopService(self)
104
105
106 class Client(node.Node, pollmixin.PollMixin):
107     implements(IStatsProducer)
108
109     PORTNUMFILE = "client.port"
110     STOREDIR = 'storage'
111     NODETYPE = "client"
112     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
113
114     # This means that if a storage server treats me as though I were a
115     # 1.0.0 storage client, it will work as they expect.
116     OLDEST_SUPPORTED_VERSION = "1.0.0"
117
118     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
119     # is the number of shares required to reconstruct a file. 'desired' means
120     # that we will abort an upload unless we can allocate space for at least
121     # this many. 'total' is the total number of shares created by encoding.
122     # If everybody has room then this is is how many we will upload.
123     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
124                                    "happy": 7,
125                                    "n": 10,
126                                    "max_segment_size": 128*KiB,
127                                    }
128
129     def __init__(self, basedir="."):
130         node.Node.__init__(self, basedir)
131         self.started_timestamp = time.time()
132         self.logSource="Client"
133         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
134         self.init_introducer_client()
135         self.init_stats_provider()
136         self.init_secrets()
137         self.init_storage()
138         self.init_control()
139         self.helper = None
140         if self.get_config("helper", "enabled", False, boolean=True):
141             self.init_helper()
142         self._key_generator = KeyGenerator()
143         key_gen_furl = self.get_config("client", "key_generator.furl", None)
144         if key_gen_furl:
145             self.init_key_gen(key_gen_furl)
146         self.init_client()
147         # ControlServer and Helper are attached after Tub startup
148         self.init_ftp_server()
149         self.init_sftp_server()
150         self.init_drop_uploader()
151
152         hotline_file = os.path.join(self.basedir,
153                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
154         if os.path.exists(hotline_file):
155             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
156             self.log("hotline file noticed (%ds old), starting timer" % age)
157             hotline = TimerService(1.0, self._check_hotline, hotline_file)
158             hotline.setServiceParent(self)
159
160         # this needs to happen last, so it can use getServiceNamed() to
161         # acquire references to StorageServer and other web-statusable things
162         webport = self.get_config("node", "web.port", None)
163         if webport:
164             self.init_web(webport) # strports string
165
166     def _sequencer(self):
167         seqnum_s = self.get_config_from_file("announcement-seqnum")
168         if not seqnum_s:
169             seqnum_s = "0"
170         seqnum = int(seqnum_s.strip())
171         seqnum += 1 # increment
172         self.write_config("announcement-seqnum", "%d\n" % seqnum)
173         nonce = _make_secret().strip()
174         return seqnum, nonce
175
176     def init_introducer_client(self):
177         self.introducer_furl = self.get_config("client", "introducer.furl")
178         ic = IntroducerClient(self.tub, self.introducer_furl,
179                               self.nickname,
180                               str(allmydata.__full_version__),
181                               str(self.OLDEST_SUPPORTED_VERSION),
182                               self.get_app_versions(),
183                               self._sequencer)
184         self.introducer_client = ic
185         # hold off on starting the IntroducerClient until our tub has been
186         # started, so we'll have a useful address on our RemoteReference, so
187         # that the introducer's status page will show us.
188         d = self.when_tub_ready()
189         def _start_introducer_client(res):
190             ic.setServiceParent(self)
191         d.addCallback(_start_introducer_client)
192         d.addErrback(log.err, facility="tahoe.init",
193                      level=log.BAD, umid="URyI5w")
194
195     def init_stats_provider(self):
196         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
197         self.stats_provider = StatsProvider(self, gatherer_furl)
198         self.add_service(self.stats_provider)
199         self.stats_provider.register_producer(self)
200
201     def get_stats(self):
202         return { 'node.uptime': time.time() - self.started_timestamp }
203
204     def init_secrets(self):
205         lease_s = self.get_or_create_private_config("secret", _make_secret)
206         lease_secret = base32.a2b(lease_s)
207         convergence_s = self.get_or_create_private_config('convergence',
208                                                           _make_secret)
209         self.convergence = base32.a2b(convergence_s)
210         self._secret_holder = SecretHolder(lease_secret, self.convergence)
211
212     def _maybe_create_node_key(self):
213         # we only create the key once. On all subsequent runs, we re-use the
214         # existing key
215         def _make_key():
216             sk_vs,vk_vs = keyutil.make_keypair()
217             return sk_vs+"\n"
218         # for a while (between releases, before 1.10) this was known as
219         # server.privkey, but now it lives in node.privkey. This fallback can
220         # be removed after 1.10 is released.
221         sk_vs = self.get_private_config("server.privkey", None)
222         if not sk_vs:
223             sk_vs = self.get_or_create_private_config("node.privkey", _make_key)
224         sk,vk_vs = keyutil.parse_privkey(sk_vs.strip())
225         self.write_config("node.pubkey", vk_vs+"\n")
226         self._node_key = sk
227
228     def _init_permutation_seed(self, ss):
229         seed = self.get_config_from_file("permutation-seed")
230         if not seed:
231             have_shares = ss.have_shares()
232             if have_shares:
233                 # if the server has shares but not a recorded
234                 # permutation-seed, then it has been around since pre-#466
235                 # days, and the clients who uploaded those shares used our
236                 # TubID as a permutation-seed. We should keep using that same
237                 # seed to keep the shares in the same place in the permuted
238                 # ring, so those clients don't have to perform excessive
239                 # searches.
240                 seed = base32.b2a(self.nodeid)
241             else:
242                 # otherwise, we're free to use the more natural seed of our
243                 # pubkey-based serverid
244                 vk_bytes = self._node_key.get_verifying_key_bytes()
245                 seed = base32.b2a(vk_bytes)
246             self.write_config("permutation-seed", seed+"\n")
247         return seed.strip()
248
249     def init_storage(self):
250         # should we run a storage server (and publish it for others to use)?
251         if not self.get_config("storage", "enabled", True, boolean=True):
252             return
253         readonly = self.get_config("storage", "readonly", False, boolean=True)
254
255         self._maybe_create_node_key()
256
257         storedir = os.path.join(self.basedir, self.STOREDIR)
258
259         data = self.get_config("storage", "reserved_space", None)
260         try:
261             reserved = parse_abbreviated_size(data)
262         except ValueError:
263             log.msg("[storage]reserved_space= contains unparseable value %s"
264                     % data)
265             raise
266         if reserved is None:
267             reserved = 0
268         discard = self.get_config("storage", "debug_discard", False,
269                                   boolean=True)
270
271         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
272         if expire:
273             mode = self.get_config("storage", "expire.mode") # require a mode
274         else:
275             mode = self.get_config("storage", "expire.mode", "age")
276
277         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
278         if o_l_d is not None:
279             o_l_d = parse_duration(o_l_d)
280
281         cutoff_date = None
282         if mode == "cutoff-date":
283             cutoff_date = self.get_config("storage", "expire.cutoff_date")
284             cutoff_date = parse_date(cutoff_date)
285
286         sharetypes = []
287         if self.get_config("storage", "expire.immutable", True, boolean=True):
288             sharetypes.append("immutable")
289         if self.get_config("storage", "expire.mutable", True, boolean=True):
290             sharetypes.append("mutable")
291         expiration_sharetypes = tuple(sharetypes)
292
293         ss = StorageServer(storedir, self.nodeid,
294                            reserved_space=reserved,
295                            discard_storage=discard,
296                            readonly_storage=readonly,
297                            stats_provider=self.stats_provider,
298                            expiration_enabled=expire,
299                            expiration_mode=mode,
300                            expiration_override_lease_duration=o_l_d,
301                            expiration_cutoff_date=cutoff_date,
302                            expiration_sharetypes=expiration_sharetypes)
303         self.add_service(ss)
304
305         d = self.when_tub_ready()
306         # we can't do registerReference until the Tub is ready
307         def _publish(res):
308             furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
309             furl = self.tub.registerReference(ss, furlFile=furl_file)
310             ann = {"anonymous-storage-FURL": furl,
311                    "permutation-seed-base32": self._init_permutation_seed(ss),
312                    }
313             self.introducer_client.publish("storage", ann, self._node_key)
314         d.addCallback(_publish)
315         d.addErrback(log.err, facility="tahoe.init",
316                      level=log.BAD, umid="aLGBKw")
317
318     def init_client(self):
319         helper_furl = self.get_config("client", "helper.furl", None)
320         if helper_furl in ("None", ""):
321             helper_furl = None
322
323         DEP = self.DEFAULT_ENCODING_PARAMETERS
324         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
325         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
326         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
327
328         self.init_client_storage_broker()
329         self.history = History(self.stats_provider)
330         self.terminator = Terminator()
331         self.terminator.setServiceParent(self)
332         self.add_service(Uploader(helper_furl, self.stats_provider,
333                                   self.history))
334         self.init_blacklist()
335         self.init_nodemaker()
336
337     def init_client_storage_broker(self):
338         # create a StorageFarmBroker object, for use by Uploader/Downloader
339         # (and everybody else who wants to use storage servers)
340         sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
341         self.storage_broker = sb
342
343         # load static server specifications from tahoe.cfg, if any.
344         # Not quite ready yet.
345         #if self.config.has_section("client-server-selection"):
346         #    server_params = {} # maps serverid to dict of parameters
347         #    for (name, value) in self.config.items("client-server-selection"):
348         #        pieces = name.split(".")
349         #        if pieces[0] == "server":
350         #            serverid = pieces[1]
351         #            if serverid not in server_params:
352         #                server_params[serverid] = {}
353         #            server_params[serverid][pieces[2]] = value
354         #    for serverid, params in server_params.items():
355         #        server_type = params.pop("type")
356         #        if server_type == "tahoe-foolscap":
357         #            s = storage_client.NativeStorageClient(*params)
358         #        else:
359         #            msg = ("unrecognized server type '%s' in "
360         #                   "tahoe.cfg [client-server-selection]server.%s.type"
361         #                   % (server_type, serverid))
362         #            raise storage_client.UnknownServerTypeError(msg)
363         #        sb.add_server(s.serverid, s)
364
365         # check to see if we're supposed to use the introducer too
366         if self.get_config("client-server-selection", "use_introducer",
367                            default=True, boolean=True):
368             sb.use_introducer(self.introducer_client)
369
370     def get_storage_broker(self):
371         return self.storage_broker
372
373     def init_blacklist(self):
374         fn = os.path.join(self.basedir, "access.blacklist")
375         self.blacklist = Blacklist(fn)
376
377     def init_nodemaker(self):
378         default = self.get_config("client", "mutable.format", default="SDMF")
379         if default.upper() == "MDMF":
380             self.mutable_file_default = MDMF_VERSION
381         else:
382             self.mutable_file_default = SDMF_VERSION
383         self.nodemaker = NodeMaker(self.storage_broker,
384                                    self._secret_holder,
385                                    self.get_history(),
386                                    self.getServiceNamed("uploader"),
387                                    self.terminator,
388                                    self.get_encoding_parameters(),
389                                    self.mutable_file_default,
390                                    self._key_generator,
391                                    self.blacklist)
392
393     def get_history(self):
394         return self.history
395
396     def init_control(self):
397         d = self.when_tub_ready()
398         def _publish(res):
399             c = ControlServer()
400             c.setServiceParent(self)
401             control_url = self.tub.registerReference(c)
402             self.write_private_config("control.furl", control_url + "\n")
403         d.addCallback(_publish)
404         d.addErrback(log.err, facility="tahoe.init",
405                      level=log.BAD, umid="d3tNXA")
406
407     def init_helper(self):
408         d = self.when_tub_ready()
409         def _publish(self):
410             self.helper = Helper(os.path.join(self.basedir, "helper"),
411                                  self.storage_broker, self._secret_holder,
412                                  self.stats_provider, self.history)
413             # TODO: this is confusing. BASEDIR/private/helper.furl is created
414             # by the helper. BASEDIR/helper.furl is consumed by the client
415             # who wants to use the helper. I like having the filename be the
416             # same, since that makes 'cp' work smoothly, but the difference
417             # between config inputs and generated outputs is hard to see.
418             helper_furlfile = os.path.join(self.basedir,
419                                            "private", "helper.furl").encode(get_filesystem_encoding())
420             self.tub.registerReference(self.helper, furlFile=helper_furlfile)
421         d.addCallback(_publish)
422         d.addErrback(log.err, facility="tahoe.init",
423                      level=log.BAD, umid="K0mW5w")
424
425     def init_key_gen(self, key_gen_furl):
426         d = self.when_tub_ready()
427         def _subscribe(self):
428             self.tub.connectTo(key_gen_furl, self._got_key_generator)
429         d.addCallback(_subscribe)
430         d.addErrback(log.err, facility="tahoe.init",
431                      level=log.BAD, umid="z9DMzw")
432
433     def _got_key_generator(self, key_generator):
434         self._key_generator.set_remote_generator(key_generator)
435         key_generator.notifyOnDisconnect(self._lost_key_generator)
436
437     def _lost_key_generator(self):
438         self._key_generator.set_remote_generator(None)
439
440     def set_default_mutable_keysize(self, keysize):
441         self._key_generator.set_default_keysize(keysize)
442
443     def init_web(self, webport):
444         self.log("init_web(webport=%s)", args=(webport,))
445
446         from allmydata.webish import WebishServer
447         nodeurl_path = os.path.join(self.basedir, "node.url")
448         staticdir = self.get_config("node", "web.static", "public_html")
449         staticdir = os.path.expanduser(staticdir)
450         ws = WebishServer(self, webport, nodeurl_path, staticdir)
451         self.add_service(ws)
452
453     def init_ftp_server(self):
454         if self.get_config("ftpd", "enabled", False, boolean=True):
455             accountfile = self.get_config("ftpd", "accounts.file", None)
456             accounturl = self.get_config("ftpd", "accounts.url", None)
457             ftp_portstr = self.get_config("ftpd", "port", "8021")
458
459             from allmydata.frontends import ftpd
460             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
461             s.setServiceParent(self)
462
463     def init_sftp_server(self):
464         if self.get_config("sftpd", "enabled", False, boolean=True):
465             accountfile = self.get_config("sftpd", "accounts.file", None)
466             accounturl = self.get_config("sftpd", "accounts.url", None)
467             sftp_portstr = self.get_config("sftpd", "port", "8022")
468             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
469             privkey_file = self.get_config("sftpd", "host_privkey_file")
470
471             from allmydata.frontends import sftpd
472             s = sftpd.SFTPServer(self, accountfile, accounturl,
473                                  sftp_portstr, pubkey_file, privkey_file)
474             s.setServiceParent(self)
475
476     def init_drop_uploader(self):
477         if self.get_config("drop_upload", "enabled", False, boolean=True):
478             if self.get_config("drop_upload", "upload.dircap", None):
479                 raise OldConfigOptionError("The [drop_upload]upload.dircap option is no longer supported; please "
480                                            "put the cap in a 'private/drop_upload_dircap' file, and delete this option.")
481
482             upload_dircap = self.get_or_create_private_config("drop_upload_dircap")
483             local_dir_utf8 = self.get_config("drop_upload", "local.directory")
484
485             try:
486                 from allmydata.frontends import drop_upload
487                 s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
488                 s.setServiceParent(self)
489                 s.startService()
490             except Exception, e:
491                 self.log("couldn't start drop-uploader: %r", args=(e,))
492
493     def _check_hotline(self, hotline_file):
494         if os.path.exists(hotline_file):
495             mtime = os.stat(hotline_file)[stat.ST_MTIME]
496             if mtime > time.time() - 120.0:
497                 return
498             else:
499                 self.log("hotline file too old, shutting down")
500         else:
501             self.log("hotline file missing, shutting down")
502         reactor.stop()
503
504     def get_encoding_parameters(self):
505         return self.DEFAULT_ENCODING_PARAMETERS
506
507     def connected_to_introducer(self):
508         if self.introducer_client:
509             return self.introducer_client.connected_to_introducer()
510         return False
511
512     def get_renewal_secret(self): # this will go away
513         return self._secret_holder.get_renewal_secret()
514
515     def get_cancel_secret(self):
516         return self._secret_holder.get_cancel_secret()
517
518     def debug_wait_for_client_connections(self, num_clients):
519         """Return a Deferred that fires (with None) when we have connections
520         to the given number of peers. Useful for tests that set up a
521         temporary test network and need to know when it is safe to proceed
522         with an upload or download."""
523         def _check():
524             return len(self.storage_broker.get_connected_servers()) >= num_clients
525         d = self.poll(_check, 0.5)
526         d.addCallback(lambda res: None)
527         return d
528
529
530     # these four methods are the primitives for creating filenodes and
531     # dirnodes. The first takes a URI and produces a filenode or (new-style)
532     # dirnode. The other three create brand-new filenodes/dirnodes.
533
534     def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
535         # This returns synchronously.
536         # Note that it does *not* validate the write_uri and read_uri; instead we
537         # may get an opaque node if there were any problems.
538         return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
539
540     def create_dirnode(self, initial_children={}, version=None):
541         d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
542         return d
543
544     def create_immutable_dirnode(self, children, convergence=None):
545         return self.nodemaker.create_immutable_directory(children, convergence)
546
547     def create_mutable_file(self, contents=None, keysize=None, version=None):
548         return self.nodemaker.create_mutable_file(contents, keysize,
549                                                   version=version)
550
551     def upload(self, uploadable):
552         uploader = self.getServiceNamed("uploader")
553         return uploader.upload(uploadable)