]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
Prevent mutable objects from being retrieved from an immutable directory, and associa...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
4
5 from zope.interface import implements
6 from twisted.internet import reactor, defer
7 from twisted.application.internet import TimerService
8 from foolscap.api import Referenceable
9 from pycryptopp.publickey import rsa
10
11 import allmydata
12 from allmydata.storage.server import StorageServer
13 from allmydata import storage_client
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.download import Downloader
16 from allmydata.immutable.offloaded import Helper
17 from allmydata.control import ControlServer
18 from allmydata.introducer.client import IntroducerClient
19 from allmydata.util import hashutil, base32, pollmixin, cachedir, log
20 from allmydata.util.abbreviate import parse_abbreviated_size
21 from allmydata.util.time_format import parse_duration, parse_date
22 from allmydata.stats import StatsProvider
23 from allmydata.history import History
24 from allmydata.interfaces import IStatsProducer, RIStubClient
25 from allmydata.nodemaker import NodeMaker
26
27
28 KiB=1024
29 MiB=1024*KiB
30 GiB=1024*MiB
31 TiB=1024*GiB
32 PiB=1024*TiB
33
34 class StubClient(Referenceable):
35     implements(RIStubClient)
36
37 def _make_secret():
38     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
39
40 class SecretHolder:
41     def __init__(self, lease_secret, convergence_secret):
42         self._lease_secret = lease_secret
43         self._convergence_secret = convergence_secret
44
45     def get_renewal_secret(self):
46         return hashutil.my_renewal_secret_hash(self._lease_secret)
47
48     def get_cancel_secret(self):
49         return hashutil.my_cancel_secret_hash(self._lease_secret)
50
51     def get_convergence_secret(self):
52         return self._convergence_secret
53
54 class KeyGenerator:
55     """I create RSA keys for mutable files. Each call to generate() returns a
56     single keypair. The keysize is specified first by the keysize= argument
57     to generate(), then with a default set by set_default_keysize(), then
58     with a built-in default of 2048 bits."""
59     def __init__(self):
60         self._remote = None
61         self.default_keysize = 2048
62
63     def set_remote_generator(self, keygen):
64         self._remote = keygen
65     def set_default_keysize(self, keysize):
66         """Call this to override the size of the RSA keys created for new
67         mutable files which don't otherwise specify a size. This will affect
68         all subsequent calls to generate() without a keysize= argument. The
69         default size is 2048 bits. Test cases should call this method once
70         during setup, to cause me to create smaller (522 bit) keys, so the
71         unit tests run faster."""
72         self.default_keysize = keysize
73
74     def generate(self, keysize=None):
75         """I return a Deferred that fires with a (verifyingkey, signingkey)
76         pair. I accept a keysize in bits (522 bit keys are fast for testing,
77         2048 bit keys are standard). If you do not provide a keysize, I will
78         use my default, which is set by a call to set_default_keysize(). If
79         set_default_keysize() has never been called, I will create 2048 bit
80         keys."""
81         keysize = keysize or self.default_keysize
82         if self._remote:
83             d = self._remote.callRemote('get_rsa_key_pair', keysize)
84             def make_key_objs((verifying_key, signing_key)):
85                 v = rsa.create_verifying_key_from_string(verifying_key)
86                 s = rsa.create_signing_key_from_string(signing_key)
87                 return v, s
88             d.addCallback(make_key_objs)
89             return d
90         else:
91             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
92             # secs
93             signer = rsa.generate(keysize)
94             verifier = signer.get_verifying_key()
95             return defer.succeed( (verifier, signer) )
96
97
98 class Client(node.Node, pollmixin.PollMixin):
99     implements(IStatsProducer)
100
101     PORTNUMFILE = "client.port"
102     STOREDIR = 'storage'
103     NODETYPE = "client"
104     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
105
106     # This means that if a storage server treats me as though I were a
107     # 1.0.0 storage client, it will work as they expect.
108     OLDEST_SUPPORTED_VERSION = "1.0.0"
109
110     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
111     # is the number of shares required to reconstruct a file. 'desired' means
112     # that we will abort an upload unless we can allocate space for at least
113     # this many. 'total' is the total number of shares created by encoding.
114     # If everybody has room then this is is how many we will upload.
115     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
116                                    "happy": 7,
117                                    "n": 10,
118                                    "max_segment_size": 128*KiB,
119                                    }
120
121     def __init__(self, basedir="."):
122         node.Node.__init__(self, basedir)
123         self.started_timestamp = time.time()
124         self.logSource="Client"
125         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
126         self.init_introducer_client()
127         self.init_stats_provider()
128         self.init_secrets()
129         self.init_storage()
130         self.init_control()
131         self.helper = None
132         if self.get_config("helper", "enabled", False, boolean=True):
133             self.init_helper()
134         self._key_generator = KeyGenerator()
135         key_gen_furl = self.get_config("client", "key_generator.furl", None)
136         if key_gen_furl:
137             self.init_key_gen(key_gen_furl)
138         self.init_client()
139         # ControlServer and Helper are attached after Tub startup
140         self.init_ftp_server()
141         self.init_sftp_server()
142
143         hotline_file = os.path.join(self.basedir,
144                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
145         if os.path.exists(hotline_file):
146             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
147             self.log("hotline file noticed (%ds old), starting timer" % age)
148             hotline = TimerService(1.0, self._check_hotline, hotline_file)
149             hotline.setServiceParent(self)
150
151         # this needs to happen last, so it can use getServiceNamed() to
152         # acquire references to StorageServer and other web-statusable things
153         webport = self.get_config("node", "web.port", None)
154         if webport:
155             self.init_web(webport) # strports string
156
157     def read_old_config_files(self):
158         node.Node.read_old_config_files(self)
159         copy = self._copy_config_from_file
160         copy("introducer.furl", "client", "introducer.furl")
161         copy("helper.furl", "client", "helper.furl")
162         copy("key_generator.furl", "client", "key_generator.furl")
163         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
164         if os.path.exists(os.path.join(self.basedir, "no_storage")):
165             self.set_config("storage", "enabled", "false")
166         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
167             self.set_config("storage", "readonly", "true")
168         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
169             self.set_config("storage", "debug_discard", "true")
170         if os.path.exists(os.path.join(self.basedir, "run_helper")):
171             self.set_config("helper", "enabled", "true")
172
173     def init_introducer_client(self):
174         self.introducer_furl = self.get_config("client", "introducer.furl")
175         ic = IntroducerClient(self.tub, self.introducer_furl,
176                               self.nickname,
177                               str(allmydata.__full_version__),
178                               str(self.OLDEST_SUPPORTED_VERSION))
179         self.introducer_client = ic
180         # hold off on starting the IntroducerClient until our tub has been
181         # started, so we'll have a useful address on our RemoteReference, so
182         # that the introducer's status page will show us.
183         d = self.when_tub_ready()
184         def _start_introducer_client(res):
185             ic.setServiceParent(self)
186         d.addCallback(_start_introducer_client)
187         d.addErrback(log.err, facility="tahoe.init",
188                      level=log.BAD, umid="URyI5w")
189
190     def init_stats_provider(self):
191         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
192         self.stats_provider = StatsProvider(self, gatherer_furl)
193         self.add_service(self.stats_provider)
194         self.stats_provider.register_producer(self)
195
196     def get_stats(self):
197         return { 'node.uptime': time.time() - self.started_timestamp }
198
199     def init_secrets(self):
200         lease_s = self.get_or_create_private_config("secret", _make_secret)
201         lease_secret = base32.a2b(lease_s)
202         convergence_s = self.get_or_create_private_config('convergence',
203                                                           _make_secret)
204         self.convergence = base32.a2b(convergence_s)
205         self._secret_holder = SecretHolder(lease_secret, self.convergence)
206
207     def init_storage(self):
208         # should we run a storage server (and publish it for others to use)?
209         if not self.get_config("storage", "enabled", True, boolean=True):
210             return
211         readonly = self.get_config("storage", "readonly", False, boolean=True)
212
213         storedir = os.path.join(self.basedir, self.STOREDIR)
214
215         data = self.get_config("storage", "reserved_space", None)
216         reserved = None
217         try:
218             reserved = parse_abbreviated_size(data)
219         except ValueError:
220             log.msg("[storage]reserved_space= contains unparseable value %s"
221                     % data)
222         if reserved is None:
223             reserved = 0
224         discard = self.get_config("storage", "debug_discard", False,
225                                   boolean=True)
226
227         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
228         if expire:
229             mode = self.get_config("storage", "expire.mode") # require a mode
230         else:
231             mode = self.get_config("storage", "expire.mode", "age")
232
233         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
234         if o_l_d is not None:
235             o_l_d = parse_duration(o_l_d)
236
237         cutoff_date = None
238         if mode == "cutoff-date":
239             cutoff_date = self.get_config("storage", "expire.cutoff_date")
240             cutoff_date = parse_date(cutoff_date)
241
242         sharetypes = []
243         if self.get_config("storage", "expire.immutable", True, boolean=True):
244             sharetypes.append("immutable")
245         if self.get_config("storage", "expire.mutable", True, boolean=True):
246             sharetypes.append("mutable")
247         expiration_sharetypes = tuple(sharetypes)
248
249         ss = StorageServer(storedir, self.nodeid,
250                            reserved_space=reserved,
251                            discard_storage=discard,
252                            readonly_storage=readonly,
253                            stats_provider=self.stats_provider,
254                            expiration_enabled=expire,
255                            expiration_mode=mode,
256                            expiration_override_lease_duration=o_l_d,
257                            expiration_cutoff_date=cutoff_date,
258                            expiration_sharetypes=expiration_sharetypes)
259         self.add_service(ss)
260
261         d = self.when_tub_ready()
262         # we can't do registerReference until the Tub is ready
263         def _publish(res):
264             furl_file = os.path.join(self.basedir, "private", "storage.furl")
265             furl = self.tub.registerReference(ss, furlFile=furl_file)
266             ri_name = RIStorageServer.__remote_name__
267             self.introducer_client.publish(furl, "storage", ri_name)
268         d.addCallback(_publish)
269         d.addErrback(log.err, facility="tahoe.init",
270                      level=log.BAD, umid="aLGBKw")
271
272     def init_client(self):
273         helper_furl = self.get_config("client", "helper.furl", None)
274         DEP = self.DEFAULT_ENCODING_PARAMETERS
275         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
276         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
277         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
278
279         self.init_client_storage_broker()
280         self.history = History(self.stats_provider)
281         self.add_service(Uploader(helper_furl, self.stats_provider))
282         download_cachedir = os.path.join(self.basedir,
283                                          "private", "cache", "download")
284         self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
285         self.download_cache_dirman.setServiceParent(self)
286         self.downloader = Downloader(self.storage_broker, self.stats_provider)
287         self.init_stub_client()
288         self.init_nodemaker()
289
290     def init_client_storage_broker(self):
291         # create a StorageFarmBroker object, for use by Uploader/Downloader
292         # (and everybody else who wants to use storage servers)
293         sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
294         self.storage_broker = sb
295
296         # load static server specifications from tahoe.cfg, if any.
297         # Not quite ready yet.
298         #if self.config.has_section("client-server-selection"):
299         #    server_params = {} # maps serverid to dict of parameters
300         #    for (name, value) in self.config.items("client-server-selection"):
301         #        pieces = name.split(".")
302         #        if pieces[0] == "server":
303         #            serverid = pieces[1]
304         #            if serverid not in server_params:
305         #                server_params[serverid] = {}
306         #            server_params[serverid][pieces[2]] = value
307         #    for serverid, params in server_params.items():
308         #        server_type = params.pop("type")
309         #        if server_type == "tahoe-foolscap":
310         #            s = storage_client.NativeStorageClient(*params)
311         #        else:
312         #            msg = ("unrecognized server type '%s' in "
313         #                   "tahoe.cfg [client-server-selection]server.%s.type"
314         #                   % (server_type, serverid))
315         #            raise storage_client.UnknownServerTypeError(msg)
316         #        sb.add_server(s.serverid, s)
317
318         # check to see if we're supposed to use the introducer too
319         if self.get_config("client-server-selection", "use_introducer",
320                            default=True, boolean=True):
321             sb.use_introducer(self.introducer_client)
322
323     def get_storage_broker(self):
324         return self.storage_broker
325
326     def init_stub_client(self):
327         def _publish(res):
328             # we publish an empty object so that the introducer can count how
329             # many clients are connected and see what versions they're
330             # running.
331             sc = StubClient()
332             furl = self.tub.registerReference(sc)
333             ri_name = RIStubClient.__remote_name__
334             self.introducer_client.publish(furl, "stub_client", ri_name)
335         d = self.when_tub_ready()
336         d.addCallback(_publish)
337         d.addErrback(log.err, facility="tahoe.init",
338                      level=log.BAD, umid="OEHq3g")
339
340     def init_nodemaker(self):
341         self.nodemaker = NodeMaker(self.storage_broker,
342                                    self._secret_holder,
343                                    self.get_history(),
344                                    self.getServiceNamed("uploader"),
345                                    self.downloader,
346                                    self.download_cache_dirman,
347                                    self.get_encoding_parameters(),
348                                    self._key_generator)
349
350     def get_history(self):
351         return self.history
352
353     def init_control(self):
354         d = self.when_tub_ready()
355         def _publish(res):
356             c = ControlServer()
357             c.setServiceParent(self)
358             control_url = self.tub.registerReference(c)
359             self.write_private_config("control.furl", control_url + "\n")
360         d.addCallback(_publish)
361         d.addErrback(log.err, facility="tahoe.init",
362                      level=log.BAD, umid="d3tNXA")
363
364     def init_helper(self):
365         d = self.when_tub_ready()
366         def _publish(self):
367             self.helper = Helper(os.path.join(self.basedir, "helper"),
368                                  self.storage_broker, self._secret_holder,
369                                  self.stats_provider, self.history)
370             # TODO: this is confusing. BASEDIR/private/helper.furl is created
371             # by the helper. BASEDIR/helper.furl is consumed by the client
372             # who wants to use the helper. I like having the filename be the
373             # same, since that makes 'cp' work smoothly, but the difference
374             # between config inputs and generated outputs is hard to see.
375             helper_furlfile = os.path.join(self.basedir,
376                                            "private", "helper.furl")
377             self.tub.registerReference(self.helper, furlFile=helper_furlfile)
378         d.addCallback(_publish)
379         d.addErrback(log.err, facility="tahoe.init",
380                      level=log.BAD, umid="K0mW5w")
381
382     def init_key_gen(self, key_gen_furl):
383         d = self.when_tub_ready()
384         def _subscribe(self):
385             self.tub.connectTo(key_gen_furl, self._got_key_generator)
386         d.addCallback(_subscribe)
387         d.addErrback(log.err, facility="tahoe.init",
388                      level=log.BAD, umid="z9DMzw")
389
390     def _got_key_generator(self, key_generator):
391         self._key_generator.set_remote_generator(key_generator)
392         key_generator.notifyOnDisconnect(self._lost_key_generator)
393
394     def _lost_key_generator(self):
395         self._key_generator.set_remote_generator(None)
396
397     def set_default_mutable_keysize(self, keysize):
398         self._key_generator.set_default_keysize(keysize)
399
400     def init_web(self, webport):
401         self.log("init_web(webport=%s)", args=(webport,))
402
403         from allmydata.webish import WebishServer
404         nodeurl_path = os.path.join(self.basedir, "node.url")
405         staticdir = self.get_config("node", "web.static", "public_html")
406         staticdir = os.path.expanduser(staticdir)
407         ws = WebishServer(self, webport, nodeurl_path, staticdir)
408         self.add_service(ws)
409
410     def init_ftp_server(self):
411         if self.get_config("ftpd", "enabled", False, boolean=True):
412             accountfile = self.get_config("ftpd", "accounts.file", None)
413             accounturl = self.get_config("ftpd", "accounts.url", None)
414             ftp_portstr = self.get_config("ftpd", "port", "8021")
415
416             from allmydata.frontends import ftpd
417             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
418             s.setServiceParent(self)
419
420     def init_sftp_server(self):
421         if self.get_config("sftpd", "enabled", False, boolean=True):
422             accountfile = self.get_config("sftpd", "accounts.file", None)
423             accounturl = self.get_config("sftpd", "accounts.url", None)
424             sftp_portstr = self.get_config("sftpd", "port", "8022")
425             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
426             privkey_file = self.get_config("sftpd", "host_privkey_file")
427
428             from allmydata.frontends import sftpd
429             s = sftpd.SFTPServer(self, accountfile, accounturl,
430                                  sftp_portstr, pubkey_file, privkey_file)
431             s.setServiceParent(self)
432
433     def _check_hotline(self, hotline_file):
434         if os.path.exists(hotline_file):
435             mtime = os.stat(hotline_file)[stat.ST_MTIME]
436             if mtime > time.time() - 120.0:
437                 return
438             else:
439                 self.log("hotline file too old, shutting down")
440         else:
441             self.log("hotline file missing, shutting down")
442         reactor.stop()
443
444     def get_encoding_parameters(self):
445         return self.DEFAULT_ENCODING_PARAMETERS
446
447     def connected_to_introducer(self):
448         if self.introducer_client:
449             return self.introducer_client.connected_to_introducer()
450         return False
451
452     def get_renewal_secret(self): # this will go away
453         return self._secret_holder.get_renewal_secret()
454
455     def get_cancel_secret(self):
456         return self._secret_holder.get_cancel_secret()
457
458     def debug_wait_for_client_connections(self, num_clients):
459         """Return a Deferred that fires (with None) when we have connections
460         to the given number of peers. Useful for tests that set up a
461         temporary test network and need to know when it is safe to proceed
462         with an upload or download."""
463         def _check():
464             return len(self.storage_broker.get_all_servers()) >= num_clients
465         d = self.poll(_check, 0.5)
466         d.addCallback(lambda res: None)
467         return d
468
469
470     # these four methods are the primitives for creating filenodes and
471     # dirnodes. The first takes a URI and produces a filenode or (new-style)
472     # dirnode. The other three create brand-new filenodes/dirnodes.
473
474     def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
475         # This returns synchronously.
476         # Note that it does *not* validate the write_uri and read_uri; instead we
477         # may get an opaque node if there were any problems.
478         return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
479
480     def create_dirnode(self, initial_children={}):
481         d = self.nodemaker.create_new_mutable_directory(initial_children)
482         return d
483
484     def create_immutable_dirnode(self, children, convergence=None):
485         return self.nodemaker.create_immutable_directory(children, convergence)
486
487     def create_mutable_file(self, contents=None, keysize=None):
488         return self.nodemaker.create_mutable_file(contents, keysize)
489
490     def upload(self, uploadable):
491         uploader = self.getServiceNamed("uploader")
492         return uploader.upload(uploadable, history=self.get_history())