]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
b030867bc19ef6a293df02bd42c030a401cf4e30
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
4
5 from zope.interface import implements
6 from twisted.internet import reactor, defer
7 from twisted.application import service
8 from twisted.application.internet import TimerService
9 from foolscap.api import Referenceable
10 from pycryptopp.publickey import rsa
11
12 import allmydata
13 from allmydata.storage.server import StorageServer
14 from allmydata import storage_client
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.offloaded import Helper
17 from allmydata.control import ControlServer
18 from allmydata.introducer.client import IntroducerClient
19 from allmydata.util import hashutil, base32, pollmixin, log
20 from allmydata.util.encodingutil import get_filesystem_encoding
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.stats import StatsProvider
24 from allmydata.history import History
25 from allmydata.interfaces import IStatsProducer, RIStubClient, \
26                                  SDMF_VERSION, MDMF_VERSION
27 from allmydata.nodemaker import NodeMaker
28 from allmydata.blacklist import Blacklist
29
30
31 KiB=1024
32 MiB=1024*KiB
33 GiB=1024*MiB
34 TiB=1024*GiB
35 PiB=1024*TiB
36
37 class StubClient(Referenceable):
38     implements(RIStubClient)
39
40 def _make_secret():
41     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
42
43 class SecretHolder:
44     def __init__(self, lease_secret, convergence_secret):
45         self._lease_secret = lease_secret
46         self._convergence_secret = convergence_secret
47
48     def get_renewal_secret(self):
49         return hashutil.my_renewal_secret_hash(self._lease_secret)
50
51     def get_cancel_secret(self):
52         return hashutil.my_cancel_secret_hash(self._lease_secret)
53
54     def get_convergence_secret(self):
55         return self._convergence_secret
56
57 class KeyGenerator:
58     """I create RSA keys for mutable files. Each call to generate() returns a
59     single keypair. The keysize is specified first by the keysize= argument
60     to generate(), then with a default set by set_default_keysize(), then
61     with a built-in default of 2048 bits."""
62     def __init__(self):
63         self._remote = None
64         self.default_keysize = 2048
65
66     def set_remote_generator(self, keygen):
67         self._remote = keygen
68     def set_default_keysize(self, keysize):
69         """Call this to override the size of the RSA keys created for new
70         mutable files which don't otherwise specify a size. This will affect
71         all subsequent calls to generate() without a keysize= argument. The
72         default size is 2048 bits. Test cases should call this method once
73         during setup, to cause me to create smaller keys, so the unit tests
74         run faster."""
75         self.default_keysize = keysize
76
77     def generate(self, keysize=None):
78         """I return a Deferred that fires with a (verifyingkey, signingkey)
79         pair. I accept a keysize in bits (2048 bit keys are standard, smaller
80         keys are used for testing). If you do not provide a keysize, I will
81         use my default, which is set by a call to set_default_keysize(). If
82         set_default_keysize() has never been called, I will create 2048 bit
83         keys."""
84         keysize = keysize or self.default_keysize
85         if self._remote:
86             d = self._remote.callRemote('get_rsa_key_pair', keysize)
87             def make_key_objs((verifying_key, signing_key)):
88                 v = rsa.create_verifying_key_from_string(verifying_key)
89                 s = rsa.create_signing_key_from_string(signing_key)
90                 return v, s
91             d.addCallback(make_key_objs)
92             return d
93         else:
94             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
95             # secs
96             signer = rsa.generate(keysize)
97             verifier = signer.get_verifying_key()
98             return defer.succeed( (verifier, signer) )
99
100 class Terminator(service.Service):
101     def __init__(self):
102         self._clients = weakref.WeakKeyDictionary()
103     def register(self, c):
104         self._clients[c] = None
105     def stopService(self):
106         for c in self._clients:
107             c.stop()
108         return service.Service.stopService(self)
109
110
111 class Client(node.Node, pollmixin.PollMixin):
112     implements(IStatsProducer)
113
114     PORTNUMFILE = "client.port"
115     STOREDIR = 'storage'
116     NODETYPE = "client"
117     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
118
119     # This means that if a storage server treats me as though I were a
120     # 1.0.0 storage client, it will work as they expect.
121     OLDEST_SUPPORTED_VERSION = "1.0.0"
122
123     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
124     # is the number of shares required to reconstruct a file. 'desired' means
125     # that we will abort an upload unless we can allocate space for at least
126     # this many. 'total' is the total number of shares created by encoding.
127     # If everybody has room then this is is how many we will upload.
128     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
129                                    "happy": 7,
130                                    "n": 10,
131                                    "max_segment_size": 128*KiB,
132                                    }
133
134     def __init__(self, basedir="."):
135         node.Node.__init__(self, basedir)
136         self.started_timestamp = time.time()
137         self.logSource="Client"
138         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
139         self.init_introducer_client()
140         self.init_stats_provider()
141         self.init_secrets()
142         self.init_storage()
143         self.init_control()
144         self.helper = None
145         if self.get_config("helper", "enabled", False, boolean=True):
146             self.init_helper()
147         self._key_generator = KeyGenerator()
148         key_gen_furl = self.get_config("client", "key_generator.furl", None)
149         if key_gen_furl:
150             self.init_key_gen(key_gen_furl)
151         self.init_client()
152         # ControlServer and Helper are attached after Tub startup
153         self.init_ftp_server()
154         self.init_sftp_server()
155         self.init_drop_uploader()
156
157         hotline_file = os.path.join(self.basedir,
158                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
159         if os.path.exists(hotline_file):
160             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
161             self.log("hotline file noticed (%ds old), starting timer" % age)
162             hotline = TimerService(1.0, self._check_hotline, hotline_file)
163             hotline.setServiceParent(self)
164
165         # this needs to happen last, so it can use getServiceNamed() to
166         # acquire references to StorageServer and other web-statusable things
167         webport = self.get_config("node", "web.port", None)
168         if webport:
169             self.init_web(webport) # strports string
170
171     def init_introducer_client(self):
172         self.introducer_furl = self.get_config("client", "introducer.furl")
173         ic = IntroducerClient(self.tub, self.introducer_furl,
174                               self.nickname,
175                               str(allmydata.__full_version__),
176                               str(self.OLDEST_SUPPORTED_VERSION))
177         self.introducer_client = ic
178         # hold off on starting the IntroducerClient until our tub has been
179         # started, so we'll have a useful address on our RemoteReference, so
180         # that the introducer's status page will show us.
181         d = self.when_tub_ready()
182         def _start_introducer_client(res):
183             ic.setServiceParent(self)
184         d.addCallback(_start_introducer_client)
185         d.addErrback(log.err, facility="tahoe.init",
186                      level=log.BAD, umid="URyI5w")
187
188     def init_stats_provider(self):
189         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
190         self.stats_provider = StatsProvider(self, gatherer_furl)
191         self.add_service(self.stats_provider)
192         self.stats_provider.register_producer(self)
193
194     def get_stats(self):
195         return { 'node.uptime': time.time() - self.started_timestamp }
196
197     def init_secrets(self):
198         lease_s = self.get_or_create_private_config("secret", _make_secret)
199         lease_secret = base32.a2b(lease_s)
200         convergence_s = self.get_or_create_private_config('convergence',
201                                                           _make_secret)
202         self.convergence = base32.a2b(convergence_s)
203         self._secret_holder = SecretHolder(lease_secret, self.convergence)
204
205     def init_storage(self):
206         # should we run a storage server (and publish it for others to use)?
207         if not self.get_config("storage", "enabled", True, boolean=True):
208             return
209         readonly = self.get_config("storage", "readonly", False, boolean=True)
210
211         storedir = os.path.join(self.basedir, self.STOREDIR)
212
213         data = self.get_config("storage", "reserved_space", None)
214         reserved = None
215         try:
216             reserved = parse_abbreviated_size(data)
217         except ValueError:
218             log.msg("[storage]reserved_space= contains unparseable value %s"
219                     % data)
220         if reserved is None:
221             reserved = 0
222         discard = self.get_config("storage", "debug_discard", False,
223                                   boolean=True)
224
225         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
226         if expire:
227             mode = self.get_config("storage", "expire.mode") # require a mode
228         else:
229             mode = self.get_config("storage", "expire.mode", "age")
230
231         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
232         if o_l_d is not None:
233             o_l_d = parse_duration(o_l_d)
234
235         cutoff_date = None
236         if mode == "cutoff-date":
237             cutoff_date = self.get_config("storage", "expire.cutoff_date")
238             cutoff_date = parse_date(cutoff_date)
239
240         sharetypes = []
241         if self.get_config("storage", "expire.immutable", True, boolean=True):
242             sharetypes.append("immutable")
243         if self.get_config("storage", "expire.mutable", True, boolean=True):
244             sharetypes.append("mutable")
245         expiration_sharetypes = tuple(sharetypes)
246
247         ss = StorageServer(storedir, self.nodeid,
248                            reserved_space=reserved,
249                            discard_storage=discard,
250                            readonly_storage=readonly,
251                            stats_provider=self.stats_provider,
252                            expiration_enabled=expire,
253                            expiration_mode=mode,
254                            expiration_override_lease_duration=o_l_d,
255                            expiration_cutoff_date=cutoff_date,
256                            expiration_sharetypes=expiration_sharetypes)
257         self.add_service(ss)
258
259         d = self.when_tub_ready()
260         # we can't do registerReference until the Tub is ready
261         def _publish(res):
262             furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
263             furl = self.tub.registerReference(ss, furlFile=furl_file)
264             ri_name = RIStorageServer.__remote_name__
265             self.introducer_client.publish(furl, "storage", ri_name)
266         d.addCallback(_publish)
267         d.addErrback(log.err, facility="tahoe.init",
268                      level=log.BAD, umid="aLGBKw")
269
270     def init_client(self):
271         helper_furl = self.get_config("client", "helper.furl", None)
272         DEP = self.DEFAULT_ENCODING_PARAMETERS
273         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
274         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
275         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
276
277         self.init_client_storage_broker()
278         self.history = History(self.stats_provider)
279         self.terminator = Terminator()
280         self.terminator.setServiceParent(self)
281         self.add_service(Uploader(helper_furl, self.stats_provider))
282         self.init_stub_client()
283         self.init_blacklist()
284         self.init_nodemaker()
285
286     def init_client_storage_broker(self):
287         # create a StorageFarmBroker object, for use by Uploader/Downloader
288         # (and everybody else who wants to use storage servers)
289         sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
290         self.storage_broker = sb
291
292         # load static server specifications from tahoe.cfg, if any.
293         # Not quite ready yet.
294         #if self.config.has_section("client-server-selection"):
295         #    server_params = {} # maps serverid to dict of parameters
296         #    for (name, value) in self.config.items("client-server-selection"):
297         #        pieces = name.split(".")
298         #        if pieces[0] == "server":
299         #            serverid = pieces[1]
300         #            if serverid not in server_params:
301         #                server_params[serverid] = {}
302         #            server_params[serverid][pieces[2]] = value
303         #    for serverid, params in server_params.items():
304         #        server_type = params.pop("type")
305         #        if server_type == "tahoe-foolscap":
306         #            s = storage_client.NativeStorageClient(*params)
307         #        else:
308         #            msg = ("unrecognized server type '%s' in "
309         #                   "tahoe.cfg [client-server-selection]server.%s.type"
310         #                   % (server_type, serverid))
311         #            raise storage_client.UnknownServerTypeError(msg)
312         #        sb.add_server(s.serverid, s)
313
314         # check to see if we're supposed to use the introducer too
315         if self.get_config("client-server-selection", "use_introducer",
316                            default=True, boolean=True):
317             sb.use_introducer(self.introducer_client)
318
319     def get_storage_broker(self):
320         return self.storage_broker
321
322     def init_stub_client(self):
323         def _publish(res):
324             # we publish an empty object so that the introducer can count how
325             # many clients are connected and see what versions they're
326             # running.
327             sc = StubClient()
328             furl = self.tub.registerReference(sc)
329             ri_name = RIStubClient.__remote_name__
330             self.introducer_client.publish(furl, "stub_client", ri_name)
331         d = self.when_tub_ready()
332         d.addCallback(_publish)
333         d.addErrback(log.err, facility="tahoe.init",
334                      level=log.BAD, umid="OEHq3g")
335
336     def init_blacklist(self):
337         fn = os.path.join(self.basedir, "access.blacklist")
338         self.blacklist = Blacklist(fn)
339
340     def init_nodemaker(self):
341         self.nodemaker = NodeMaker(self.storage_broker,
342                                    self._secret_holder,
343                                    self.get_history(),
344                                    self.getServiceNamed("uploader"),
345                                    self.terminator,
346                                    self.get_encoding_parameters(),
347                                    self._key_generator,
348                                    self.blacklist)
349         default = self.get_config("client", "mutable.format", default="sdmf")
350         if default == "mdmf":
351             self.mutable_file_default = MDMF_VERSION
352         else:
353             self.mutable_file_default = SDMF_VERSION
354
355     def get_history(self):
356         return self.history
357
358     def init_control(self):
359         d = self.when_tub_ready()
360         def _publish(res):
361             c = ControlServer()
362             c.setServiceParent(self)
363             control_url = self.tub.registerReference(c)
364             self.write_private_config("control.furl", control_url + "\n")
365         d.addCallback(_publish)
366         d.addErrback(log.err, facility="tahoe.init",
367                      level=log.BAD, umid="d3tNXA")
368
369     def init_helper(self):
370         d = self.when_tub_ready()
371         def _publish(self):
372             self.helper = Helper(os.path.join(self.basedir, "helper"),
373                                  self.storage_broker, self._secret_holder,
374                                  self.stats_provider, self.history)
375             # TODO: this is confusing. BASEDIR/private/helper.furl is created
376             # by the helper. BASEDIR/helper.furl is consumed by the client
377             # who wants to use the helper. I like having the filename be the
378             # same, since that makes 'cp' work smoothly, but the difference
379             # between config inputs and generated outputs is hard to see.
380             helper_furlfile = os.path.join(self.basedir,
381                                            "private", "helper.furl").encode(get_filesystem_encoding())
382             self.tub.registerReference(self.helper, furlFile=helper_furlfile)
383         d.addCallback(_publish)
384         d.addErrback(log.err, facility="tahoe.init",
385                      level=log.BAD, umid="K0mW5w")
386
387     def init_key_gen(self, key_gen_furl):
388         d = self.when_tub_ready()
389         def _subscribe(self):
390             self.tub.connectTo(key_gen_furl, self._got_key_generator)
391         d.addCallback(_subscribe)
392         d.addErrback(log.err, facility="tahoe.init",
393                      level=log.BAD, umid="z9DMzw")
394
395     def _got_key_generator(self, key_generator):
396         self._key_generator.set_remote_generator(key_generator)
397         key_generator.notifyOnDisconnect(self._lost_key_generator)
398
399     def _lost_key_generator(self):
400         self._key_generator.set_remote_generator(None)
401
402     def set_default_mutable_keysize(self, keysize):
403         self._key_generator.set_default_keysize(keysize)
404
405     def init_web(self, webport):
406         self.log("init_web(webport=%s)", args=(webport,))
407
408         from allmydata.webish import WebishServer
409         nodeurl_path = os.path.join(self.basedir, "node.url")
410         staticdir = self.get_config("node", "web.static", "public_html")
411         staticdir = os.path.expanduser(staticdir)
412         ws = WebishServer(self, webport, nodeurl_path, staticdir)
413         self.add_service(ws)
414
415     def init_ftp_server(self):
416         if self.get_config("ftpd", "enabled", False, boolean=True):
417             accountfile = self.get_config("ftpd", "accounts.file", None)
418             accounturl = self.get_config("ftpd", "accounts.url", None)
419             ftp_portstr = self.get_config("ftpd", "port", "8021")
420
421             from allmydata.frontends import ftpd
422             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
423             s.setServiceParent(self)
424
425     def init_sftp_server(self):
426         if self.get_config("sftpd", "enabled", False, boolean=True):
427             accountfile = self.get_config("sftpd", "accounts.file", None)
428             accounturl = self.get_config("sftpd", "accounts.url", None)
429             sftp_portstr = self.get_config("sftpd", "port", "8022")
430             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
431             privkey_file = self.get_config("sftpd", "host_privkey_file")
432
433             from allmydata.frontends import sftpd
434             s = sftpd.SFTPServer(self, accountfile, accounturl,
435                                  sftp_portstr, pubkey_file, privkey_file)
436             s.setServiceParent(self)
437
438     def init_drop_uploader(self):
439         if self.get_config("drop_upload", "enabled", False, boolean=True):
440             upload_dircap = self.get_config("drop_upload", "upload.dircap", None)
441             local_dir_utf8 = self.get_config("drop_upload", "local.directory", None)
442
443             if upload_dircap and local_dir_utf8:
444                 try:
445                     from allmydata.frontends import drop_upload
446                     s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
447                     s.setServiceParent(self)
448                     s.startService()
449                 except Exception, e:
450                     self.log("couldn't start drop-uploader: %r", args=(e,))
451             else:
452                 self.log("couldn't start drop-uploader: upload.dircap or local.directory not specified")
453
454     def _check_hotline(self, hotline_file):
455         if os.path.exists(hotline_file):
456             mtime = os.stat(hotline_file)[stat.ST_MTIME]
457             if mtime > time.time() - 120.0:
458                 return
459             else:
460                 self.log("hotline file too old, shutting down")
461         else:
462             self.log("hotline file missing, shutting down")
463         reactor.stop()
464
465     def get_encoding_parameters(self):
466         return self.DEFAULT_ENCODING_PARAMETERS
467
468     def connected_to_introducer(self):
469         if self.introducer_client:
470             return self.introducer_client.connected_to_introducer()
471         return False
472
473     def get_renewal_secret(self): # this will go away
474         return self._secret_holder.get_renewal_secret()
475
476     def get_cancel_secret(self):
477         return self._secret_holder.get_cancel_secret()
478
479     def debug_wait_for_client_connections(self, num_clients):
480         """Return a Deferred that fires (with None) when we have connections
481         to the given number of peers. Useful for tests that set up a
482         temporary test network and need to know when it is safe to proceed
483         with an upload or download."""
484         def _check():
485             return len(self.storage_broker.get_connected_servers()) >= num_clients
486         d = self.poll(_check, 0.5)
487         d.addCallback(lambda res: None)
488         return d
489
490
491     # these four methods are the primitives for creating filenodes and
492     # dirnodes. The first takes a URI and produces a filenode or (new-style)
493     # dirnode. The other three create brand-new filenodes/dirnodes.
494
495     def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
496         # This returns synchronously.
497         # Note that it does *not* validate the write_uri and read_uri; instead we
498         # may get an opaque node if there were any problems.
499         return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
500
501     def create_dirnode(self, initial_children={}, version=SDMF_VERSION):
502         d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
503         return d
504
505     def create_immutable_dirnode(self, children, convergence=None):
506         return self.nodemaker.create_immutable_directory(children, convergence)
507
508     def create_mutable_file(self, contents=None, keysize=None, version=None):
509         if not version:
510             version = self.mutable_file_default
511         return self.nodemaker.create_mutable_file(contents, keysize,
512                                                   version=version)
513
514     def upload(self, uploadable):
515         uploader = self.getServiceNamed("uploader")
516         return uploader.upload(uploadable, history=self.get_history())