]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
misc mutable-type fixes:
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
4
5 from zope.interface import implements
6 from twisted.internet import reactor, defer
7 from twisted.application import service
8 from twisted.application.internet import TimerService
9 from foolscap.api import Referenceable
10 from pycryptopp.publickey import rsa
11
12 import allmydata
13 from allmydata.storage.server import StorageServer
14 from allmydata import storage_client
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.offloaded import Helper
17 from allmydata.control import ControlServer
18 from allmydata.introducer.client import IntroducerClient
19 from allmydata.util import hashutil, base32, pollmixin, log
20 from allmydata.util.encodingutil import get_filesystem_encoding
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.stats import StatsProvider
24 from allmydata.history import History
25 from allmydata.interfaces import IStatsProducer, RIStubClient, \
26                                  SDMF_VERSION, MDMF_VERSION
27 from allmydata.nodemaker import NodeMaker
28 from allmydata.blacklist import Blacklist
29
30
31 KiB=1024
32 MiB=1024*KiB
33 GiB=1024*MiB
34 TiB=1024*GiB
35 PiB=1024*TiB
36
37 class StubClient(Referenceable):
38     implements(RIStubClient)
39
40 def _make_secret():
41     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
42
43 class SecretHolder:
44     def __init__(self, lease_secret, convergence_secret):
45         self._lease_secret = lease_secret
46         self._convergence_secret = convergence_secret
47
48     def get_renewal_secret(self):
49         return hashutil.my_renewal_secret_hash(self._lease_secret)
50
51     def get_cancel_secret(self):
52         return hashutil.my_cancel_secret_hash(self._lease_secret)
53
54     def get_convergence_secret(self):
55         return self._convergence_secret
56
57 class KeyGenerator:
58     """I create RSA keys for mutable files. Each call to generate() returns a
59     single keypair. The keysize is specified first by the keysize= argument
60     to generate(), then with a default set by set_default_keysize(), then
61     with a built-in default of 2048 bits."""
62     def __init__(self):
63         self._remote = None
64         self.default_keysize = 2048
65
66     def set_remote_generator(self, keygen):
67         self._remote = keygen
68     def set_default_keysize(self, keysize):
69         """Call this to override the size of the RSA keys created for new
70         mutable files which don't otherwise specify a size. This will affect
71         all subsequent calls to generate() without a keysize= argument. The
72         default size is 2048 bits. Test cases should call this method once
73         during setup, to cause me to create smaller keys, so the unit tests
74         run faster."""
75         self.default_keysize = keysize
76
77     def generate(self, keysize=None):
78         """I return a Deferred that fires with a (verifyingkey, signingkey)
79         pair. I accept a keysize in bits (2048 bit keys are standard, smaller
80         keys are used for testing). If you do not provide a keysize, I will
81         use my default, which is set by a call to set_default_keysize(). If
82         set_default_keysize() has never been called, I will create 2048 bit
83         keys."""
84         keysize = keysize or self.default_keysize
85         if self._remote:
86             d = self._remote.callRemote('get_rsa_key_pair', keysize)
87             def make_key_objs((verifying_key, signing_key)):
88                 v = rsa.create_verifying_key_from_string(verifying_key)
89                 s = rsa.create_signing_key_from_string(signing_key)
90                 return v, s
91             d.addCallback(make_key_objs)
92             return d
93         else:
94             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
95             # secs
96             signer = rsa.generate(keysize)
97             verifier = signer.get_verifying_key()
98             return defer.succeed( (verifier, signer) )
99
100 class Terminator(service.Service):
101     def __init__(self):
102         self._clients = weakref.WeakKeyDictionary()
103     def register(self, c):
104         self._clients[c] = None
105     def stopService(self):
106         for c in self._clients:
107             c.stop()
108         return service.Service.stopService(self)
109
110
111 class Client(node.Node, pollmixin.PollMixin):
112     implements(IStatsProducer)
113
114     PORTNUMFILE = "client.port"
115     STOREDIR = 'storage'
116     NODETYPE = "client"
117     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
118
119     # This means that if a storage server treats me as though I were a
120     # 1.0.0 storage client, it will work as they expect.
121     OLDEST_SUPPORTED_VERSION = "1.0.0"
122
123     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
124     # is the number of shares required to reconstruct a file. 'desired' means
125     # that we will abort an upload unless we can allocate space for at least
126     # this many. 'total' is the total number of shares created by encoding.
127     # If everybody has room then this is is how many we will upload.
128     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
129                                    "happy": 7,
130                                    "n": 10,
131                                    "max_segment_size": 128*KiB,
132                                    }
133
134     def __init__(self, basedir="."):
135         node.Node.__init__(self, basedir)
136         self.started_timestamp = time.time()
137         self.logSource="Client"
138         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
139         self.init_introducer_client()
140         self.init_stats_provider()
141         self.init_secrets()
142         self.init_storage()
143         self.init_control()
144         self.helper = None
145         if self.get_config("helper", "enabled", False, boolean=True):
146             self.init_helper()
147         self._key_generator = KeyGenerator()
148         key_gen_furl = self.get_config("client", "key_generator.furl", None)
149         if key_gen_furl:
150             self.init_key_gen(key_gen_furl)
151         self.init_client()
152         # ControlServer and Helper are attached after Tub startup
153         self.init_ftp_server()
154         self.init_sftp_server()
155         self.init_drop_uploader()
156
157         hotline_file = os.path.join(self.basedir,
158                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
159         if os.path.exists(hotline_file):
160             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
161             self.log("hotline file noticed (%ds old), starting timer" % age)
162             hotline = TimerService(1.0, self._check_hotline, hotline_file)
163             hotline.setServiceParent(self)
164
165         # this needs to happen last, so it can use getServiceNamed() to
166         # acquire references to StorageServer and other web-statusable things
167         webport = self.get_config("node", "web.port", None)
168         if webport:
169             self.init_web(webport) # strports string
170
171     def init_introducer_client(self):
172         self.introducer_furl = self.get_config("client", "introducer.furl")
173         ic = IntroducerClient(self.tub, self.introducer_furl,
174                               self.nickname,
175                               str(allmydata.__full_version__),
176                               str(self.OLDEST_SUPPORTED_VERSION))
177         self.introducer_client = ic
178         # hold off on starting the IntroducerClient until our tub has been
179         # started, so we'll have a useful address on our RemoteReference, so
180         # that the introducer's status page will show us.
181         d = self.when_tub_ready()
182         def _start_introducer_client(res):
183             ic.setServiceParent(self)
184         d.addCallback(_start_introducer_client)
185         d.addErrback(log.err, facility="tahoe.init",
186                      level=log.BAD, umid="URyI5w")
187
188     def init_stats_provider(self):
189         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
190         self.stats_provider = StatsProvider(self, gatherer_furl)
191         self.add_service(self.stats_provider)
192         self.stats_provider.register_producer(self)
193
194     def get_stats(self):
195         return { 'node.uptime': time.time() - self.started_timestamp }
196
197     def init_secrets(self):
198         lease_s = self.get_or_create_private_config("secret", _make_secret)
199         lease_secret = base32.a2b(lease_s)
200         convergence_s = self.get_or_create_private_config('convergence',
201                                                           _make_secret)
202         self.convergence = base32.a2b(convergence_s)
203         self._secret_holder = SecretHolder(lease_secret, self.convergence)
204
205     def init_storage(self):
206         # should we run a storage server (and publish it for others to use)?
207         if not self.get_config("storage", "enabled", True, boolean=True):
208             return
209         readonly = self.get_config("storage", "readonly", False, boolean=True)
210
211         storedir = os.path.join(self.basedir, self.STOREDIR)
212
213         data = self.get_config("storage", "reserved_space", None)
214         reserved = None
215         try:
216             reserved = parse_abbreviated_size(data)
217         except ValueError:
218             log.msg("[storage]reserved_space= contains unparseable value %s"
219                     % data)
220         if reserved is None:
221             reserved = 0
222         discard = self.get_config("storage", "debug_discard", False,
223                                   boolean=True)
224
225         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
226         if expire:
227             mode = self.get_config("storage", "expire.mode") # require a mode
228         else:
229             mode = self.get_config("storage", "expire.mode", "age")
230
231         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
232         if o_l_d is not None:
233             o_l_d = parse_duration(o_l_d)
234
235         cutoff_date = None
236         if mode == "cutoff-date":
237             cutoff_date = self.get_config("storage", "expire.cutoff_date")
238             cutoff_date = parse_date(cutoff_date)
239
240         sharetypes = []
241         if self.get_config("storage", "expire.immutable", True, boolean=True):
242             sharetypes.append("immutable")
243         if self.get_config("storage", "expire.mutable", True, boolean=True):
244             sharetypes.append("mutable")
245         expiration_sharetypes = tuple(sharetypes)
246
247         ss = StorageServer(storedir, self.nodeid,
248                            reserved_space=reserved,
249                            discard_storage=discard,
250                            readonly_storage=readonly,
251                            stats_provider=self.stats_provider,
252                            expiration_enabled=expire,
253                            expiration_mode=mode,
254                            expiration_override_lease_duration=o_l_d,
255                            expiration_cutoff_date=cutoff_date,
256                            expiration_sharetypes=expiration_sharetypes)
257         self.add_service(ss)
258
259         d = self.when_tub_ready()
260         # we can't do registerReference until the Tub is ready
261         def _publish(res):
262             furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
263             furl = self.tub.registerReference(ss, furlFile=furl_file)
264             ri_name = RIStorageServer.__remote_name__
265             self.introducer_client.publish(furl, "storage", ri_name)
266         d.addCallback(_publish)
267         d.addErrback(log.err, facility="tahoe.init",
268                      level=log.BAD, umid="aLGBKw")
269
270     def init_client(self):
271         helper_furl = self.get_config("client", "helper.furl", None)
272         DEP = self.DEFAULT_ENCODING_PARAMETERS
273         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
274         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
275         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
276
277         self.init_client_storage_broker()
278         self.history = History(self.stats_provider)
279         self.terminator = Terminator()
280         self.terminator.setServiceParent(self)
281         self.add_service(Uploader(helper_furl, self.stats_provider,
282                                   self.history))
283         self.init_stub_client()
284         self.init_blacklist()
285         self.init_nodemaker()
286
287     def init_client_storage_broker(self):
288         # create a StorageFarmBroker object, for use by Uploader/Downloader
289         # (and everybody else who wants to use storage servers)
290         sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
291         self.storage_broker = sb
292
293         # load static server specifications from tahoe.cfg, if any.
294         # Not quite ready yet.
295         #if self.config.has_section("client-server-selection"):
296         #    server_params = {} # maps serverid to dict of parameters
297         #    for (name, value) in self.config.items("client-server-selection"):
298         #        pieces = name.split(".")
299         #        if pieces[0] == "server":
300         #            serverid = pieces[1]
301         #            if serverid not in server_params:
302         #                server_params[serverid] = {}
303         #            server_params[serverid][pieces[2]] = value
304         #    for serverid, params in server_params.items():
305         #        server_type = params.pop("type")
306         #        if server_type == "tahoe-foolscap":
307         #            s = storage_client.NativeStorageClient(*params)
308         #        else:
309         #            msg = ("unrecognized server type '%s' in "
310         #                   "tahoe.cfg [client-server-selection]server.%s.type"
311         #                   % (server_type, serverid))
312         #            raise storage_client.UnknownServerTypeError(msg)
313         #        sb.add_server(s.serverid, s)
314
315         # check to see if we're supposed to use the introducer too
316         if self.get_config("client-server-selection", "use_introducer",
317                            default=True, boolean=True):
318             sb.use_introducer(self.introducer_client)
319
320     def get_storage_broker(self):
321         return self.storage_broker
322
323     def init_stub_client(self):
324         def _publish(res):
325             # we publish an empty object so that the introducer can count how
326             # many clients are connected and see what versions they're
327             # running.
328             sc = StubClient()
329             furl = self.tub.registerReference(sc)
330             ri_name = RIStubClient.__remote_name__
331             self.introducer_client.publish(furl, "stub_client", ri_name)
332         d = self.when_tub_ready()
333         d.addCallback(_publish)
334         d.addErrback(log.err, facility="tahoe.init",
335                      level=log.BAD, umid="OEHq3g")
336
337     def init_blacklist(self):
338         fn = os.path.join(self.basedir, "access.blacklist")
339         self.blacklist = Blacklist(fn)
340
341     def init_nodemaker(self):
342         default = self.get_config("client", "mutable.format", default="SDMF")
343         if default.upper() == "MDMF":
344             self.mutable_file_default = MDMF_VERSION
345         else:
346             self.mutable_file_default = SDMF_VERSION
347         self.nodemaker = NodeMaker(self.storage_broker,
348                                    self._secret_holder,
349                                    self.get_history(),
350                                    self.getServiceNamed("uploader"),
351                                    self.terminator,
352                                    self.get_encoding_parameters(),
353                                    self.mutable_file_default,
354                                    self._key_generator,
355                                    self.blacklist)
356
357     def get_history(self):
358         return self.history
359
360     def init_control(self):
361         d = self.when_tub_ready()
362         def _publish(res):
363             c = ControlServer()
364             c.setServiceParent(self)
365             control_url = self.tub.registerReference(c)
366             self.write_private_config("control.furl", control_url + "\n")
367         d.addCallback(_publish)
368         d.addErrback(log.err, facility="tahoe.init",
369                      level=log.BAD, umid="d3tNXA")
370
371     def init_helper(self):
372         d = self.when_tub_ready()
373         def _publish(self):
374             self.helper = Helper(os.path.join(self.basedir, "helper"),
375                                  self.storage_broker, self._secret_holder,
376                                  self.stats_provider, self.history)
377             # TODO: this is confusing. BASEDIR/private/helper.furl is created
378             # by the helper. BASEDIR/helper.furl is consumed by the client
379             # who wants to use the helper. I like having the filename be the
380             # same, since that makes 'cp' work smoothly, but the difference
381             # between config inputs and generated outputs is hard to see.
382             helper_furlfile = os.path.join(self.basedir,
383                                            "private", "helper.furl").encode(get_filesystem_encoding())
384             self.tub.registerReference(self.helper, furlFile=helper_furlfile)
385         d.addCallback(_publish)
386         d.addErrback(log.err, facility="tahoe.init",
387                      level=log.BAD, umid="K0mW5w")
388
389     def init_key_gen(self, key_gen_furl):
390         d = self.when_tub_ready()
391         def _subscribe(self):
392             self.tub.connectTo(key_gen_furl, self._got_key_generator)
393         d.addCallback(_subscribe)
394         d.addErrback(log.err, facility="tahoe.init",
395                      level=log.BAD, umid="z9DMzw")
396
397     def _got_key_generator(self, key_generator):
398         self._key_generator.set_remote_generator(key_generator)
399         key_generator.notifyOnDisconnect(self._lost_key_generator)
400
401     def _lost_key_generator(self):
402         self._key_generator.set_remote_generator(None)
403
404     def set_default_mutable_keysize(self, keysize):
405         self._key_generator.set_default_keysize(keysize)
406
407     def init_web(self, webport):
408         self.log("init_web(webport=%s)", args=(webport,))
409
410         from allmydata.webish import WebishServer
411         nodeurl_path = os.path.join(self.basedir, "node.url")
412         staticdir = self.get_config("node", "web.static", "public_html")
413         staticdir = os.path.expanduser(staticdir)
414         ws = WebishServer(self, webport, nodeurl_path, staticdir)
415         self.add_service(ws)
416
417     def init_ftp_server(self):
418         if self.get_config("ftpd", "enabled", False, boolean=True):
419             accountfile = self.get_config("ftpd", "accounts.file", None)
420             accounturl = self.get_config("ftpd", "accounts.url", None)
421             ftp_portstr = self.get_config("ftpd", "port", "8021")
422
423             from allmydata.frontends import ftpd
424             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
425             s.setServiceParent(self)
426
427     def init_sftp_server(self):
428         if self.get_config("sftpd", "enabled", False, boolean=True):
429             accountfile = self.get_config("sftpd", "accounts.file", None)
430             accounturl = self.get_config("sftpd", "accounts.url", None)
431             sftp_portstr = self.get_config("sftpd", "port", "8022")
432             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
433             privkey_file = self.get_config("sftpd", "host_privkey_file")
434
435             from allmydata.frontends import sftpd
436             s = sftpd.SFTPServer(self, accountfile, accounturl,
437                                  sftp_portstr, pubkey_file, privkey_file)
438             s.setServiceParent(self)
439
440     def init_drop_uploader(self):
441         if self.get_config("drop_upload", "enabled", False, boolean=True):
442             upload_dircap = self.get_config("drop_upload", "upload.dircap", None)
443             local_dir_utf8 = self.get_config("drop_upload", "local.directory", None)
444
445             if upload_dircap and local_dir_utf8:
446                 try:
447                     from allmydata.frontends import drop_upload
448                     s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
449                     s.setServiceParent(self)
450                     s.startService()
451                 except Exception, e:
452                     self.log("couldn't start drop-uploader: %r", args=(e,))
453             else:
454                 self.log("couldn't start drop-uploader: upload.dircap or local.directory not specified")
455
456     def _check_hotline(self, hotline_file):
457         if os.path.exists(hotline_file):
458             mtime = os.stat(hotline_file)[stat.ST_MTIME]
459             if mtime > time.time() - 120.0:
460                 return
461             else:
462                 self.log("hotline file too old, shutting down")
463         else:
464             self.log("hotline file missing, shutting down")
465         reactor.stop()
466
467     def get_encoding_parameters(self):
468         return self.DEFAULT_ENCODING_PARAMETERS
469
470     def connected_to_introducer(self):
471         if self.introducer_client:
472             return self.introducer_client.connected_to_introducer()
473         return False
474
475     def get_renewal_secret(self): # this will go away
476         return self._secret_holder.get_renewal_secret()
477
478     def get_cancel_secret(self):
479         return self._secret_holder.get_cancel_secret()
480
481     def debug_wait_for_client_connections(self, num_clients):
482         """Return a Deferred that fires (with None) when we have connections
483         to the given number of peers. Useful for tests that set up a
484         temporary test network and need to know when it is safe to proceed
485         with an upload or download."""
486         def _check():
487             return len(self.storage_broker.get_connected_servers()) >= num_clients
488         d = self.poll(_check, 0.5)
489         d.addCallback(lambda res: None)
490         return d
491
492
493     # these four methods are the primitives for creating filenodes and
494     # dirnodes. The first takes a URI and produces a filenode or (new-style)
495     # dirnode. The other three create brand-new filenodes/dirnodes.
496
497     def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
498         # This returns synchronously.
499         # Note that it does *not* validate the write_uri and read_uri; instead we
500         # may get an opaque node if there were any problems.
501         return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
502
503     def create_dirnode(self, initial_children={}, version=None):
504         d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
505         return d
506
507     def create_immutable_dirnode(self, children, convergence=None):
508         return self.nodemaker.create_immutable_directory(children, convergence)
509
510     def create_mutable_file(self, contents=None, keysize=None, version=None):
511         return self.nodemaker.create_mutable_file(contents, keysize,
512                                                   version=version)
513
514     def upload(self, uploadable):
515         uploader = self.getServiceNamed("uploader")
516         return uploader.upload(uploadable)