]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
4f4745cca9fcf4c73fd68d1a43fbd83a3b4c8001
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata import node
3
4 from zope.interface import implements
5 from twisted.internet import reactor, defer
6 from twisted.application import service
7 from twisted.application.internet import TimerService
8 from pycryptopp.publickey import rsa
9
10 import allmydata
11 from allmydata.storage.server import StorageServer
12 from allmydata import storage_client
13 from allmydata.immutable.upload import Uploader
14 from allmydata.immutable.offloaded import Helper
15 from allmydata.control import ControlServer
16 from allmydata.introducer.client import IntroducerClient
17 from allmydata.util import hashutil, base32, pollmixin, log, keyutil, idlib
18 from allmydata.util.encodingutil import get_filesystem_encoding
19 from allmydata.util.abbreviate import parse_abbreviated_size
20 from allmydata.util.time_format import parse_duration, parse_date
21 from allmydata.stats import StatsProvider
22 from allmydata.history import History
23 from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION
24 from allmydata.nodemaker import NodeMaker
25 from allmydata.blacklist import Blacklist
26 from allmydata.node import OldConfigOptionError
27
28
29 KiB=1024
30 MiB=1024*KiB
31 GiB=1024*MiB
32 TiB=1024*GiB
33 PiB=1024*TiB
34
35 def _make_secret():
36     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
37
38 class SecretHolder:
39     def __init__(self, lease_secret, convergence_secret):
40         self._lease_secret = lease_secret
41         self._convergence_secret = convergence_secret
42
43     def get_renewal_secret(self):
44         return hashutil.my_renewal_secret_hash(self._lease_secret)
45
46     def get_cancel_secret(self):
47         return hashutil.my_cancel_secret_hash(self._lease_secret)
48
49     def get_convergence_secret(self):
50         return self._convergence_secret
51
52 class KeyGenerator:
53     """I create RSA keys for mutable files. Each call to generate() returns a
54     single keypair. The keysize is specified first by the keysize= argument
55     to generate(), then with a default set by set_default_keysize(), then
56     with a built-in default of 2048 bits."""
57     def __init__(self):
58         self._remote = None
59         self.default_keysize = 2048
60
61     def set_remote_generator(self, keygen):
62         self._remote = keygen
63     def set_default_keysize(self, keysize):
64         """Call this to override the size of the RSA keys created for new
65         mutable files which don't otherwise specify a size. This will affect
66         all subsequent calls to generate() without a keysize= argument. The
67         default size is 2048 bits. Test cases should call this method once
68         during setup, to cause me to create smaller keys, so the unit tests
69         run faster."""
70         self.default_keysize = keysize
71
72     def generate(self, keysize=None):
73         """I return a Deferred that fires with a (verifyingkey, signingkey)
74         pair. I accept a keysize in bits (2048 bit keys are standard, smaller
75         keys are used for testing). If you do not provide a keysize, I will
76         use my default, which is set by a call to set_default_keysize(). If
77         set_default_keysize() has never been called, I will create 2048 bit
78         keys."""
79         keysize = keysize or self.default_keysize
80         if self._remote:
81             d = self._remote.callRemote('get_rsa_key_pair', keysize)
82             def make_key_objs((verifying_key, signing_key)):
83                 v = rsa.create_verifying_key_from_string(verifying_key)
84                 s = rsa.create_signing_key_from_string(signing_key)
85                 return v, s
86             d.addCallback(make_key_objs)
87             return d
88         else:
89             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
90             # secs
91             signer = rsa.generate(keysize)
92             verifier = signer.get_verifying_key()
93             return defer.succeed( (verifier, signer) )
94
95 class Terminator(service.Service):
96     def __init__(self):
97         self._clients = weakref.WeakKeyDictionary()
98     def register(self, c):
99         self._clients[c] = None
100     def stopService(self):
101         for c in self._clients:
102             c.stop()
103         return service.Service.stopService(self)
104
105
106 class Client(node.Node, pollmixin.PollMixin):
107     implements(IStatsProducer)
108
109     PORTNUMFILE = "client.port"
110     STOREDIR = 'storage'
111     NODETYPE = "client"
112     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
113
114     # This means that if a storage server treats me as though I were a
115     # 1.0.0 storage client, it will work as they expect.
116     OLDEST_SUPPORTED_VERSION = "1.0.0"
117
118     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
119     # is the number of shares required to reconstruct a file. 'desired' means
120     # that we will abort an upload unless we can allocate space for at least
121     # this many. 'total' is the total number of shares created by encoding.
122     # If everybody has room then this is is how many we will upload.
123     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
124                                    "happy": 7,
125                                    "n": 10,
126                                    "max_segment_size": 128*KiB,
127                                    }
128
129     def __init__(self, basedir="."):
130         node.Node.__init__(self, basedir)
131         self.started_timestamp = time.time()
132         self.logSource="Client"
133         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
134         self.init_introducer_client()
135         self.init_stats_provider()
136         self.init_secrets()
137         self.init_storage()
138         self.init_control()
139         self.helper = None
140         if self.get_config("helper", "enabled", False, boolean=True):
141             self.init_helper()
142         self._key_generator = KeyGenerator()
143         key_gen_furl = self.get_config("client", "key_generator.furl", None)
144         if key_gen_furl:
145             self.init_key_gen(key_gen_furl)
146         self.init_client()
147         # ControlServer and Helper are attached after Tub startup
148         self.init_ftp_server()
149         self.init_sftp_server()
150         self.init_drop_uploader()
151
152         hotline_file = os.path.join(self.basedir,
153                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
154         if os.path.exists(hotline_file):
155             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
156             self.log("hotline file noticed (%ds old), starting timer" % age)
157             hotline = TimerService(1.0, self._check_hotline, hotline_file)
158             hotline.setServiceParent(self)
159
160         # this needs to happen last, so it can use getServiceNamed() to
161         # acquire references to StorageServer and other web-statusable things
162         webport = self.get_config("node", "web.port", None)
163         if webport:
164             self.init_web(webport) # strports string
165
166     def _sequencer(self):
167         seqnum_s = self.get_config_from_file("announcement-seqnum")
168         if not seqnum_s:
169             seqnum_s = "0"
170         seqnum = int(seqnum_s.strip())
171         seqnum += 1 # increment
172         self.write_config("announcement-seqnum", "%d\n" % seqnum)
173         nonce = _make_secret().strip()
174         return seqnum, nonce
175
176     def init_introducer_client(self):
177         self.introducer_furl = self.get_config("client", "introducer.furl")
178         ic = IntroducerClient(self.tub, self.introducer_furl,
179                               self.nickname,
180                               str(allmydata.__full_version__),
181                               str(self.OLDEST_SUPPORTED_VERSION),
182                               self.get_app_versions(),
183                               self._sequencer)
184         self.introducer_client = ic
185         # hold off on starting the IntroducerClient until our tub has been
186         # started, so we'll have a useful address on our RemoteReference, so
187         # that the introducer's status page will show us.
188         d = self.when_tub_ready()
189         def _start_introducer_client(res):
190             ic.setServiceParent(self)
191         d.addCallback(_start_introducer_client)
192         d.addErrback(log.err, facility="tahoe.init",
193                      level=log.BAD, umid="URyI5w")
194
195     def init_stats_provider(self):
196         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
197         self.stats_provider = StatsProvider(self, gatherer_furl)
198         self.add_service(self.stats_provider)
199         self.stats_provider.register_producer(self)
200
201     def get_stats(self):
202         return { 'node.uptime': time.time() - self.started_timestamp }
203
204     def init_secrets(self):
205         lease_s = self.get_or_create_private_config("secret", _make_secret)
206         lease_secret = base32.a2b(lease_s)
207         convergence_s = self.get_or_create_private_config('convergence',
208                                                           _make_secret)
209         self.convergence = base32.a2b(convergence_s)
210         self._secret_holder = SecretHolder(lease_secret, self.convergence)
211
212     def _maybe_create_node_key(self):
213         # we only create the key once. On all subsequent runs, we re-use the
214         # existing key
215         def _make_key():
216             sk_vs,vk_vs = keyutil.make_keypair()
217             return sk_vs+"\n"
218         sk_vs = self.get_or_create_private_config("node.privkey", _make_key)
219         sk,vk_vs = keyutil.parse_privkey(sk_vs.strip())
220         self.write_config("node.pubkey", vk_vs+"\n")
221         self._node_key = sk
222
223     def get_long_nodeid(self):
224         # this matches what IServer.get_longname() says about us elsewhere
225         vk_bytes = self._node_key.get_verifying_key_bytes()
226         return "v0-"+base32.b2a(vk_bytes)
227
228     def get_long_tubid(self):
229         return idlib.nodeid_b2a(self.nodeid)
230
231     def _init_permutation_seed(self, ss):
232         seed = self.get_config_from_file("permutation-seed")
233         if not seed:
234             have_shares = ss.have_shares()
235             if have_shares:
236                 # if the server has shares but not a recorded
237                 # permutation-seed, then it has been around since pre-#466
238                 # days, and the clients who uploaded those shares used our
239                 # TubID as a permutation-seed. We should keep using that same
240                 # seed to keep the shares in the same place in the permuted
241                 # ring, so those clients don't have to perform excessive
242                 # searches.
243                 seed = base32.b2a(self.nodeid)
244             else:
245                 # otherwise, we're free to use the more natural seed of our
246                 # pubkey-based serverid
247                 vk_bytes = self._node_key.get_verifying_key_bytes()
248                 seed = base32.b2a(vk_bytes)
249             self.write_config("permutation-seed", seed+"\n")
250         return seed.strip()
251
252     def init_storage(self):
253         # should we run a storage server (and publish it for others to use)?
254         if not self.get_config("storage", "enabled", True, boolean=True):
255             return
256         readonly = self.get_config("storage", "readonly", False, boolean=True)
257
258         self._maybe_create_node_key()
259
260         storedir = os.path.join(self.basedir, self.STOREDIR)
261
262         data = self.get_config("storage", "reserved_space", None)
263         try:
264             reserved = parse_abbreviated_size(data)
265         except ValueError:
266             log.msg("[storage]reserved_space= contains unparseable value %s"
267                     % data)
268             raise
269         if reserved is None:
270             reserved = 0
271         discard = self.get_config("storage", "debug_discard", False,
272                                   boolean=True)
273
274         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
275         if expire:
276             mode = self.get_config("storage", "expire.mode") # require a mode
277         else:
278             mode = self.get_config("storage", "expire.mode", "age")
279
280         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
281         if o_l_d is not None:
282             o_l_d = parse_duration(o_l_d)
283
284         cutoff_date = None
285         if mode == "cutoff-date":
286             cutoff_date = self.get_config("storage", "expire.cutoff_date")
287             cutoff_date = parse_date(cutoff_date)
288
289         sharetypes = []
290         if self.get_config("storage", "expire.immutable", True, boolean=True):
291             sharetypes.append("immutable")
292         if self.get_config("storage", "expire.mutable", True, boolean=True):
293             sharetypes.append("mutable")
294         expiration_sharetypes = tuple(sharetypes)
295
296         ss = StorageServer(storedir, self.nodeid,
297                            reserved_space=reserved,
298                            discard_storage=discard,
299                            readonly_storage=readonly,
300                            stats_provider=self.stats_provider,
301                            expiration_enabled=expire,
302                            expiration_mode=mode,
303                            expiration_override_lease_duration=o_l_d,
304                            expiration_cutoff_date=cutoff_date,
305                            expiration_sharetypes=expiration_sharetypes)
306         self.add_service(ss)
307
308         d = self.when_tub_ready()
309         # we can't do registerReference until the Tub is ready
310         def _publish(res):
311             furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
312             furl = self.tub.registerReference(ss, furlFile=furl_file)
313             ann = {"anonymous-storage-FURL": furl,
314                    "permutation-seed-base32": self._init_permutation_seed(ss),
315                    }
316             self.introducer_client.publish("storage", ann, self._node_key)
317         d.addCallback(_publish)
318         d.addErrback(log.err, facility="tahoe.init",
319                      level=log.BAD, umid="aLGBKw")
320
321     def init_client(self):
322         helper_furl = self.get_config("client", "helper.furl", None)
323         if helper_furl in ("None", ""):
324             helper_furl = None
325
326         DEP = self.DEFAULT_ENCODING_PARAMETERS
327         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
328         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
329         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
330
331         self.init_client_storage_broker()
332         self.history = History(self.stats_provider)
333         self.terminator = Terminator()
334         self.terminator.setServiceParent(self)
335         self.add_service(Uploader(helper_furl, self.stats_provider,
336                                   self.history))
337         self.init_blacklist()
338         self.init_nodemaker()
339
340     def init_client_storage_broker(self):
341         # create a StorageFarmBroker object, for use by Uploader/Downloader
342         # (and everybody else who wants to use storage servers)
343         sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
344         self.storage_broker = sb
345
346         # load static server specifications from tahoe.cfg, if any.
347         # Not quite ready yet.
348         #if self.config.has_section("client-server-selection"):
349         #    server_params = {} # maps serverid to dict of parameters
350         #    for (name, value) in self.config.items("client-server-selection"):
351         #        pieces = name.split(".")
352         #        if pieces[0] == "server":
353         #            serverid = pieces[1]
354         #            if serverid not in server_params:
355         #                server_params[serverid] = {}
356         #            server_params[serverid][pieces[2]] = value
357         #    for serverid, params in server_params.items():
358         #        server_type = params.pop("type")
359         #        if server_type == "tahoe-foolscap":
360         #            s = storage_client.NativeStorageClient(*params)
361         #        else:
362         #            msg = ("unrecognized server type '%s' in "
363         #                   "tahoe.cfg [client-server-selection]server.%s.type"
364         #                   % (server_type, serverid))
365         #            raise storage_client.UnknownServerTypeError(msg)
366         #        sb.add_server(s.serverid, s)
367
368         # check to see if we're supposed to use the introducer too
369         if self.get_config("client-server-selection", "use_introducer",
370                            default=True, boolean=True):
371             sb.use_introducer(self.introducer_client)
372
373     def get_storage_broker(self):
374         return self.storage_broker
375
376     def init_blacklist(self):
377         fn = os.path.join(self.basedir, "access.blacklist")
378         self.blacklist = Blacklist(fn)
379
380     def init_nodemaker(self):
381         default = self.get_config("client", "mutable.format", default="SDMF")
382         if default.upper() == "MDMF":
383             self.mutable_file_default = MDMF_VERSION
384         else:
385             self.mutable_file_default = SDMF_VERSION
386         self.nodemaker = NodeMaker(self.storage_broker,
387                                    self._secret_holder,
388                                    self.get_history(),
389                                    self.getServiceNamed("uploader"),
390                                    self.terminator,
391                                    self.get_encoding_parameters(),
392                                    self.mutable_file_default,
393                                    self._key_generator,
394                                    self.blacklist)
395
396     def get_history(self):
397         return self.history
398
399     def init_control(self):
400         d = self.when_tub_ready()
401         def _publish(res):
402             c = ControlServer()
403             c.setServiceParent(self)
404             control_url = self.tub.registerReference(c)
405             self.write_private_config("control.furl", control_url + "\n")
406         d.addCallback(_publish)
407         d.addErrback(log.err, facility="tahoe.init",
408                      level=log.BAD, umid="d3tNXA")
409
410     def init_helper(self):
411         d = self.when_tub_ready()
412         def _publish(self):
413             self.helper = Helper(os.path.join(self.basedir, "helper"),
414                                  self.storage_broker, self._secret_holder,
415                                  self.stats_provider, self.history)
416             # TODO: this is confusing. BASEDIR/private/helper.furl is created
417             # by the helper. BASEDIR/helper.furl is consumed by the client
418             # who wants to use the helper. I like having the filename be the
419             # same, since that makes 'cp' work smoothly, but the difference
420             # between config inputs and generated outputs is hard to see.
421             helper_furlfile = os.path.join(self.basedir,
422                                            "private", "helper.furl").encode(get_filesystem_encoding())
423             self.tub.registerReference(self.helper, furlFile=helper_furlfile)
424         d.addCallback(_publish)
425         d.addErrback(log.err, facility="tahoe.init",
426                      level=log.BAD, umid="K0mW5w")
427
428     def init_key_gen(self, key_gen_furl):
429         d = self.when_tub_ready()
430         def _subscribe(self):
431             self.tub.connectTo(key_gen_furl, self._got_key_generator)
432         d.addCallback(_subscribe)
433         d.addErrback(log.err, facility="tahoe.init",
434                      level=log.BAD, umid="z9DMzw")
435
436     def _got_key_generator(self, key_generator):
437         self._key_generator.set_remote_generator(key_generator)
438         key_generator.notifyOnDisconnect(self._lost_key_generator)
439
440     def _lost_key_generator(self):
441         self._key_generator.set_remote_generator(None)
442
443     def set_default_mutable_keysize(self, keysize):
444         self._key_generator.set_default_keysize(keysize)
445
446     def init_web(self, webport):
447         self.log("init_web(webport=%s)", args=(webport,))
448
449         from allmydata.webish import WebishServer
450         nodeurl_path = os.path.join(self.basedir, "node.url")
451         staticdir = self.get_config("node", "web.static", "public_html")
452         staticdir = os.path.expanduser(staticdir)
453         ws = WebishServer(self, webport, nodeurl_path, staticdir)
454         self.add_service(ws)
455
456     def init_ftp_server(self):
457         if self.get_config("ftpd", "enabled", False, boolean=True):
458             accountfile = self.get_config("ftpd", "accounts.file", None)
459             accounturl = self.get_config("ftpd", "accounts.url", None)
460             ftp_portstr = self.get_config("ftpd", "port", "8021")
461
462             from allmydata.frontends import ftpd
463             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
464             s.setServiceParent(self)
465
466     def init_sftp_server(self):
467         if self.get_config("sftpd", "enabled", False, boolean=True):
468             accountfile = self.get_config("sftpd", "accounts.file", None)
469             accounturl = self.get_config("sftpd", "accounts.url", None)
470             sftp_portstr = self.get_config("sftpd", "port", "8022")
471             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
472             privkey_file = self.get_config("sftpd", "host_privkey_file")
473
474             from allmydata.frontends import sftpd
475             s = sftpd.SFTPServer(self, accountfile, accounturl,
476                                  sftp_portstr, pubkey_file, privkey_file)
477             s.setServiceParent(self)
478
479     def init_drop_uploader(self):
480         if self.get_config("drop_upload", "enabled", False, boolean=True):
481             if self.get_config("drop_upload", "upload.dircap", None):
482                 raise OldConfigOptionError("The [drop_upload]upload.dircap option is no longer supported; please "
483                                            "put the cap in a 'private/drop_upload_dircap' file, and delete this option.")
484
485             upload_dircap = self.get_or_create_private_config("drop_upload_dircap")
486             local_dir_utf8 = self.get_config("drop_upload", "local.directory")
487
488             try:
489                 from allmydata.frontends import drop_upload
490                 s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
491                 s.setServiceParent(self)
492                 s.startService()
493             except Exception, e:
494                 self.log("couldn't start drop-uploader: %r", args=(e,))
495
496     def _check_hotline(self, hotline_file):
497         if os.path.exists(hotline_file):
498             mtime = os.stat(hotline_file)[stat.ST_MTIME]
499             if mtime > time.time() - 120.0:
500                 return
501             else:
502                 self.log("hotline file too old, shutting down")
503         else:
504             self.log("hotline file missing, shutting down")
505         reactor.stop()
506
507     def get_encoding_parameters(self):
508         return self.DEFAULT_ENCODING_PARAMETERS
509
510     def connected_to_introducer(self):
511         if self.introducer_client:
512             return self.introducer_client.connected_to_introducer()
513         return False
514
515     def get_renewal_secret(self): # this will go away
516         return self._secret_holder.get_renewal_secret()
517
518     def get_cancel_secret(self):
519         return self._secret_holder.get_cancel_secret()
520
521     def debug_wait_for_client_connections(self, num_clients):
522         """Return a Deferred that fires (with None) when we have connections
523         to the given number of peers. Useful for tests that set up a
524         temporary test network and need to know when it is safe to proceed
525         with an upload or download."""
526         def _check():
527             return len(self.storage_broker.get_connected_servers()) >= num_clients
528         d = self.poll(_check, 0.5)
529         d.addCallback(lambda res: None)
530         return d
531
532
533     # these four methods are the primitives for creating filenodes and
534     # dirnodes. The first takes a URI and produces a filenode or (new-style)
535     # dirnode. The other three create brand-new filenodes/dirnodes.
536
537     def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
538         # This returns synchronously.
539         # Note that it does *not* validate the write_uri and read_uri; instead we
540         # may get an opaque node if there were any problems.
541         return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
542
543     def create_dirnode(self, initial_children={}, version=None):
544         d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
545         return d
546
547     def create_immutable_dirnode(self, children, convergence=None):
548         return self.nodemaker.create_immutable_directory(children, convergence)
549
550     def create_mutable_file(self, contents=None, keysize=None, version=None):
551         return self.nodemaker.create_mutable_file(contents, keysize,
552                                                   version=version)
553
554     def upload(self, uploadable):
555         uploader = self.getServiceNamed("uploader")
556         return uploader.upload(uploadable)