]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
Fix test failures due to Unicode basedir patches.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
4
5 from zope.interface import implements
6 from twisted.internet import reactor, defer
7 from twisted.application.internet import TimerService
8 from foolscap.api import Referenceable
9 from pycryptopp.publickey import rsa
10
11 import allmydata
12 from allmydata.storage.server import StorageServer
13 from allmydata import storage_client
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.download import Downloader
16 from allmydata.immutable.offloaded import Helper
17 from allmydata.control import ControlServer
18 from allmydata.introducer.client import IntroducerClient
19 from allmydata.util import hashutil, base32, pollmixin, cachedir, log
20 from allmydata.util.encodingutil import get_filesystem_encoding
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.stats import StatsProvider
24 from allmydata.history import History
25 from allmydata.interfaces import IStatsProducer, RIStubClient
26 from allmydata.nodemaker import NodeMaker
27
28
29 KiB=1024
30 MiB=1024*KiB
31 GiB=1024*MiB
32 TiB=1024*GiB
33 PiB=1024*TiB
34
35 class StubClient(Referenceable):
36     implements(RIStubClient)
37
38 def _make_secret():
39     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
40
41 class SecretHolder:
42     def __init__(self, lease_secret, convergence_secret):
43         self._lease_secret = lease_secret
44         self._convergence_secret = convergence_secret
45
46     def get_renewal_secret(self):
47         return hashutil.my_renewal_secret_hash(self._lease_secret)
48
49     def get_cancel_secret(self):
50         return hashutil.my_cancel_secret_hash(self._lease_secret)
51
52     def get_convergence_secret(self):
53         return self._convergence_secret
54
55 class KeyGenerator:
56     """I create RSA keys for mutable files. Each call to generate() returns a
57     single keypair. The keysize is specified first by the keysize= argument
58     to generate(), then with a default set by set_default_keysize(), then
59     with a built-in default of 2048 bits."""
60     def __init__(self):
61         self._remote = None
62         self.default_keysize = 2048
63
64     def set_remote_generator(self, keygen):
65         self._remote = keygen
66     def set_default_keysize(self, keysize):
67         """Call this to override the size of the RSA keys created for new
68         mutable files which don't otherwise specify a size. This will affect
69         all subsequent calls to generate() without a keysize= argument. The
70         default size is 2048 bits. Test cases should call this method once
71         during setup, to cause me to create smaller (522 bit) keys, so the
72         unit tests run faster."""
73         self.default_keysize = keysize
74
75     def generate(self, keysize=None):
76         """I return a Deferred that fires with a (verifyingkey, signingkey)
77         pair. I accept a keysize in bits (522 bit keys are fast for testing,
78         2048 bit keys are standard). If you do not provide a keysize, I will
79         use my default, which is set by a call to set_default_keysize(). If
80         set_default_keysize() has never been called, I will create 2048 bit
81         keys."""
82         keysize = keysize or self.default_keysize
83         if self._remote:
84             d = self._remote.callRemote('get_rsa_key_pair', keysize)
85             def make_key_objs((verifying_key, signing_key)):
86                 v = rsa.create_verifying_key_from_string(verifying_key)
87                 s = rsa.create_signing_key_from_string(signing_key)
88                 return v, s
89             d.addCallback(make_key_objs)
90             return d
91         else:
92             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
93             # secs
94             signer = rsa.generate(keysize)
95             verifier = signer.get_verifying_key()
96             return defer.succeed( (verifier, signer) )
97
98
99 class Client(node.Node, pollmixin.PollMixin):
100     implements(IStatsProducer)
101
102     PORTNUMFILE = "client.port"
103     STOREDIR = 'storage'
104     NODETYPE = "client"
105     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
106
107     # This means that if a storage server treats me as though I were a
108     # 1.0.0 storage client, it will work as they expect.
109     OLDEST_SUPPORTED_VERSION = "1.0.0"
110
111     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
112     # is the number of shares required to reconstruct a file. 'desired' means
113     # that we will abort an upload unless we can allocate space for at least
114     # this many. 'total' is the total number of shares created by encoding.
115     # If everybody has room then this is is how many we will upload.
116     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
117                                    "happy": 7,
118                                    "n": 10,
119                                    "max_segment_size": 128*KiB,
120                                    }
121
122     def __init__(self, basedir="."):
123         node.Node.__init__(self, basedir)
124         self.started_timestamp = time.time()
125         self.logSource="Client"
126         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
127         self.init_introducer_client()
128         self.init_stats_provider()
129         self.init_secrets()
130         self.init_storage()
131         self.init_control()
132         self.helper = None
133         if self.get_config("helper", "enabled", False, boolean=True):
134             self.init_helper()
135         self._key_generator = KeyGenerator()
136         key_gen_furl = self.get_config("client", "key_generator.furl", None)
137         if key_gen_furl:
138             self.init_key_gen(key_gen_furl)
139         self.init_client()
140         # ControlServer and Helper are attached after Tub startup
141         self.init_ftp_server()
142         self.init_sftp_server()
143
144         hotline_file = os.path.join(self.basedir,
145                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
146         if os.path.exists(hotline_file):
147             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
148             self.log("hotline file noticed (%ds old), starting timer" % age)
149             hotline = TimerService(1.0, self._check_hotline, hotline_file)
150             hotline.setServiceParent(self)
151
152         # this needs to happen last, so it can use getServiceNamed() to
153         # acquire references to StorageServer and other web-statusable things
154         webport = self.get_config("node", "web.port", None)
155         if webport:
156             self.init_web(webport) # strports string
157
158     def read_old_config_files(self):
159         node.Node.read_old_config_files(self)
160         copy = self._copy_config_from_file
161         copy("introducer.furl", "client", "introducer.furl")
162         copy("helper.furl", "client", "helper.furl")
163         copy("key_generator.furl", "client", "key_generator.furl")
164         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
165         if os.path.exists(os.path.join(self.basedir, "no_storage")):
166             self.set_config("storage", "enabled", "false")
167         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
168             self.set_config("storage", "readonly", "true")
169         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
170             self.set_config("storage", "debug_discard", "true")
171         if os.path.exists(os.path.join(self.basedir, "run_helper")):
172             self.set_config("helper", "enabled", "true")
173
174     def init_introducer_client(self):
175         self.introducer_furl = self.get_config("client", "introducer.furl")
176         ic = IntroducerClient(self.tub, self.introducer_furl,
177                               self.nickname,
178                               str(allmydata.__full_version__),
179                               str(self.OLDEST_SUPPORTED_VERSION))
180         self.introducer_client = ic
181         # hold off on starting the IntroducerClient until our tub has been
182         # started, so we'll have a useful address on our RemoteReference, so
183         # that the introducer's status page will show us.
184         d = self.when_tub_ready()
185         def _start_introducer_client(res):
186             ic.setServiceParent(self)
187         d.addCallback(_start_introducer_client)
188         d.addErrback(log.err, facility="tahoe.init",
189                      level=log.BAD, umid="URyI5w")
190
191     def init_stats_provider(self):
192         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
193         self.stats_provider = StatsProvider(self, gatherer_furl)
194         self.add_service(self.stats_provider)
195         self.stats_provider.register_producer(self)
196
197     def get_stats(self):
198         return { 'node.uptime': time.time() - self.started_timestamp }
199
200     def init_secrets(self):
201         lease_s = self.get_or_create_private_config("secret", _make_secret)
202         lease_secret = base32.a2b(lease_s)
203         convergence_s = self.get_or_create_private_config('convergence',
204                                                           _make_secret)
205         self.convergence = base32.a2b(convergence_s)
206         self._secret_holder = SecretHolder(lease_secret, self.convergence)
207
208     def init_storage(self):
209         # should we run a storage server (and publish it for others to use)?
210         if not self.get_config("storage", "enabled", True, boolean=True):
211             return
212         readonly = self.get_config("storage", "readonly", False, boolean=True)
213
214         storedir = os.path.join(self.basedir, self.STOREDIR)
215
216         data = self.get_config("storage", "reserved_space", None)
217         reserved = None
218         try:
219             reserved = parse_abbreviated_size(data)
220         except ValueError:
221             log.msg("[storage]reserved_space= contains unparseable value %s"
222                     % data)
223         if reserved is None:
224             reserved = 0
225         discard = self.get_config("storage", "debug_discard", False,
226                                   boolean=True)
227
228         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
229         if expire:
230             mode = self.get_config("storage", "expire.mode") # require a mode
231         else:
232             mode = self.get_config("storage", "expire.mode", "age")
233
234         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
235         if o_l_d is not None:
236             o_l_d = parse_duration(o_l_d)
237
238         cutoff_date = None
239         if mode == "cutoff-date":
240             cutoff_date = self.get_config("storage", "expire.cutoff_date")
241             cutoff_date = parse_date(cutoff_date)
242
243         sharetypes = []
244         if self.get_config("storage", "expire.immutable", True, boolean=True):
245             sharetypes.append("immutable")
246         if self.get_config("storage", "expire.mutable", True, boolean=True):
247             sharetypes.append("mutable")
248         expiration_sharetypes = tuple(sharetypes)
249
250         ss = StorageServer(storedir, self.nodeid,
251                            reserved_space=reserved,
252                            discard_storage=discard,
253                            readonly_storage=readonly,
254                            stats_provider=self.stats_provider,
255                            expiration_enabled=expire,
256                            expiration_mode=mode,
257                            expiration_override_lease_duration=o_l_d,
258                            expiration_cutoff_date=cutoff_date,
259                            expiration_sharetypes=expiration_sharetypes)
260         self.add_service(ss)
261
262         d = self.when_tub_ready()
263         # we can't do registerReference until the Tub is ready
264         def _publish(res):
265             furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
266             furl = self.tub.registerReference(ss, furlFile=furl_file)
267             ri_name = RIStorageServer.__remote_name__
268             self.introducer_client.publish(furl, "storage", ri_name)
269         d.addCallback(_publish)
270         d.addErrback(log.err, facility="tahoe.init",
271                      level=log.BAD, umid="aLGBKw")
272
273     def init_client(self):
274         helper_furl = self.get_config("client", "helper.furl", None)
275         DEP = self.DEFAULT_ENCODING_PARAMETERS
276         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
277         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
278         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
279
280         self.init_client_storage_broker()
281         self.history = History(self.stats_provider)
282         self.add_service(Uploader(helper_furl, self.stats_provider))
283         download_cachedir = os.path.join(self.basedir,
284                                          "private", "cache", "download")
285         self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
286         self.download_cache_dirman.setServiceParent(self)
287         self.downloader = Downloader(self.storage_broker, self.stats_provider)
288         self.init_stub_client()
289         self.init_nodemaker()
290
291     def init_client_storage_broker(self):
292         # create a StorageFarmBroker object, for use by Uploader/Downloader
293         # (and everybody else who wants to use storage servers)
294         sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
295         self.storage_broker = sb
296
297         # load static server specifications from tahoe.cfg, if any.
298         # Not quite ready yet.
299         #if self.config.has_section("client-server-selection"):
300         #    server_params = {} # maps serverid to dict of parameters
301         #    for (name, value) in self.config.items("client-server-selection"):
302         #        pieces = name.split(".")
303         #        if pieces[0] == "server":
304         #            serverid = pieces[1]
305         #            if serverid not in server_params:
306         #                server_params[serverid] = {}
307         #            server_params[serverid][pieces[2]] = value
308         #    for serverid, params in server_params.items():
309         #        server_type = params.pop("type")
310         #        if server_type == "tahoe-foolscap":
311         #            s = storage_client.NativeStorageClient(*params)
312         #        else:
313         #            msg = ("unrecognized server type '%s' in "
314         #                   "tahoe.cfg [client-server-selection]server.%s.type"
315         #                   % (server_type, serverid))
316         #            raise storage_client.UnknownServerTypeError(msg)
317         #        sb.add_server(s.serverid, s)
318
319         # check to see if we're supposed to use the introducer too
320         if self.get_config("client-server-selection", "use_introducer",
321                            default=True, boolean=True):
322             sb.use_introducer(self.introducer_client)
323
324     def get_storage_broker(self):
325         return self.storage_broker
326
327     def init_stub_client(self):
328         def _publish(res):
329             # we publish an empty object so that the introducer can count how
330             # many clients are connected and see what versions they're
331             # running.
332             sc = StubClient()
333             furl = self.tub.registerReference(sc)
334             ri_name = RIStubClient.__remote_name__
335             self.introducer_client.publish(furl, "stub_client", ri_name)
336         d = self.when_tub_ready()
337         d.addCallback(_publish)
338         d.addErrback(log.err, facility="tahoe.init",
339                      level=log.BAD, umid="OEHq3g")
340
341     def init_nodemaker(self):
342         self.nodemaker = NodeMaker(self.storage_broker,
343                                    self._secret_holder,
344                                    self.get_history(),
345                                    self.getServiceNamed("uploader"),
346                                    self.downloader,
347                                    self.download_cache_dirman,
348                                    self.get_encoding_parameters(),
349                                    self._key_generator)
350
351     def get_history(self):
352         return self.history
353
354     def init_control(self):
355         d = self.when_tub_ready()
356         def _publish(res):
357             c = ControlServer()
358             c.setServiceParent(self)
359             control_url = self.tub.registerReference(c)
360             self.write_private_config("control.furl", control_url + "\n")
361         d.addCallback(_publish)
362         d.addErrback(log.err, facility="tahoe.init",
363                      level=log.BAD, umid="d3tNXA")
364
365     def init_helper(self):
366         d = self.when_tub_ready()
367         def _publish(self):
368             self.helper = Helper(os.path.join(self.basedir, "helper"),
369                                  self.storage_broker, self._secret_holder,
370                                  self.stats_provider, self.history)
371             # TODO: this is confusing. BASEDIR/private/helper.furl is created
372             # by the helper. BASEDIR/helper.furl is consumed by the client
373             # who wants to use the helper. I like having the filename be the
374             # same, since that makes 'cp' work smoothly, but the difference
375             # between config inputs and generated outputs is hard to see.
376             helper_furlfile = os.path.join(self.basedir,
377                                            "private", "helper.furl").encode(get_filesystem_encoding())
378             self.tub.registerReference(self.helper, furlFile=helper_furlfile)
379         d.addCallback(_publish)
380         d.addErrback(log.err, facility="tahoe.init",
381                      level=log.BAD, umid="K0mW5w")
382
383     def init_key_gen(self, key_gen_furl):
384         d = self.when_tub_ready()
385         def _subscribe(self):
386             self.tub.connectTo(key_gen_furl, self._got_key_generator)
387         d.addCallback(_subscribe)
388         d.addErrback(log.err, facility="tahoe.init",
389                      level=log.BAD, umid="z9DMzw")
390
391     def _got_key_generator(self, key_generator):
392         self._key_generator.set_remote_generator(key_generator)
393         key_generator.notifyOnDisconnect(self._lost_key_generator)
394
395     def _lost_key_generator(self):
396         self._key_generator.set_remote_generator(None)
397
398     def set_default_mutable_keysize(self, keysize):
399         self._key_generator.set_default_keysize(keysize)
400
401     def init_web(self, webport):
402         self.log("init_web(webport=%s)", args=(webport,))
403
404         from allmydata.webish import WebishServer
405         nodeurl_path = os.path.join(self.basedir, "node.url")
406         staticdir = self.get_config("node", "web.static", "public_html")
407         staticdir = os.path.expanduser(staticdir)
408         ws = WebishServer(self, webport, nodeurl_path, staticdir)
409         self.add_service(ws)
410
411     def init_ftp_server(self):
412         if self.get_config("ftpd", "enabled", False, boolean=True):
413             accountfile = self.get_config("ftpd", "accounts.file", None)
414             accounturl = self.get_config("ftpd", "accounts.url", None)
415             ftp_portstr = self.get_config("ftpd", "port", "8021")
416
417             from allmydata.frontends import ftpd
418             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
419             s.setServiceParent(self)
420
421     def init_sftp_server(self):
422         if self.get_config("sftpd", "enabled", False, boolean=True):
423             accountfile = self.get_config("sftpd", "accounts.file", None)
424             accounturl = self.get_config("sftpd", "accounts.url", None)
425             sftp_portstr = self.get_config("sftpd", "port", "8022")
426             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
427             privkey_file = self.get_config("sftpd", "host_privkey_file")
428
429             from allmydata.frontends import sftpd
430             s = sftpd.SFTPServer(self, accountfile, accounturl,
431                                  sftp_portstr, pubkey_file, privkey_file)
432             s.setServiceParent(self)
433
434     def _check_hotline(self, hotline_file):
435         if os.path.exists(hotline_file):
436             mtime = os.stat(hotline_file)[stat.ST_MTIME]
437             if mtime > time.time() - 120.0:
438                 return
439             else:
440                 self.log("hotline file too old, shutting down")
441         else:
442             self.log("hotline file missing, shutting down")
443         reactor.stop()
444
445     def get_encoding_parameters(self):
446         return self.DEFAULT_ENCODING_PARAMETERS
447
448     def connected_to_introducer(self):
449         if self.introducer_client:
450             return self.introducer_client.connected_to_introducer()
451         return False
452
453     def get_renewal_secret(self): # this will go away
454         return self._secret_holder.get_renewal_secret()
455
456     def get_cancel_secret(self):
457         return self._secret_holder.get_cancel_secret()
458
459     def debug_wait_for_client_connections(self, num_clients):
460         """Return a Deferred that fires (with None) when we have connections
461         to the given number of peers. Useful for tests that set up a
462         temporary test network and need to know when it is safe to proceed
463         with an upload or download."""
464         def _check():
465             return len(self.storage_broker.get_all_servers()) >= num_clients
466         d = self.poll(_check, 0.5)
467         d.addCallback(lambda res: None)
468         return d
469
470
471     # these four methods are the primitives for creating filenodes and
472     # dirnodes. The first takes a URI and produces a filenode or (new-style)
473     # dirnode. The other three create brand-new filenodes/dirnodes.
474
475     def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
476         # This returns synchronously.
477         # Note that it does *not* validate the write_uri and read_uri; instead we
478         # may get an opaque node if there were any problems.
479         return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
480
481     def create_dirnode(self, initial_children={}):
482         d = self.nodemaker.create_new_mutable_directory(initial_children)
483         return d
484
485     def create_immutable_dirnode(self, children, convergence=None):
486         return self.nodemaker.create_immutable_directory(children, convergence)
487
488     def create_mutable_file(self, contents=None, keysize=None):
489         return self.nodemaker.create_mutable_file(contents, keysize)
490
491     def upload(self, uploadable):
492         uploader = self.getServiceNamed("uploader")
493         return uploader.upload(uploadable, history=self.get_history())