]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
Use a private/drop_upload_dircap file instead of the [drop_upload]upload.dircap optio...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
4
5 from zope.interface import implements
6 from twisted.internet import reactor, defer
7 from twisted.application import service
8 from twisted.application.internet import TimerService
9 from foolscap.api import Referenceable
10 from pycryptopp.publickey import rsa
11
12 import allmydata
13 from allmydata.storage.server import StorageServer
14 from allmydata import storage_client
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.offloaded import Helper
17 from allmydata.control import ControlServer
18 from allmydata.introducer.client import IntroducerClient
19 from allmydata.util import hashutil, base32, pollmixin, log
20 from allmydata.util.encodingutil import get_filesystem_encoding
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.stats import StatsProvider
24 from allmydata.history import History
25 from allmydata.interfaces import IStatsProducer, RIStubClient, \
26                                  SDMF_VERSION, MDMF_VERSION
27 from allmydata.nodemaker import NodeMaker
28 from allmydata.blacklist import Blacklist
29 from allmydata.node import OldConfigOptionError
30
31
32 KiB=1024
33 MiB=1024*KiB
34 GiB=1024*MiB
35 TiB=1024*GiB
36 PiB=1024*TiB
37
38 class StubClient(Referenceable):
39     implements(RIStubClient)
40
41 def _make_secret():
42     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
43
44 class SecretHolder:
45     def __init__(self, lease_secret, convergence_secret):
46         self._lease_secret = lease_secret
47         self._convergence_secret = convergence_secret
48
49     def get_renewal_secret(self):
50         return hashutil.my_renewal_secret_hash(self._lease_secret)
51
52     def get_cancel_secret(self):
53         return hashutil.my_cancel_secret_hash(self._lease_secret)
54
55     def get_convergence_secret(self):
56         return self._convergence_secret
57
58 class KeyGenerator:
59     """I create RSA keys for mutable files. Each call to generate() returns a
60     single keypair. The keysize is specified first by the keysize= argument
61     to generate(), then with a default set by set_default_keysize(), then
62     with a built-in default of 2048 bits."""
63     def __init__(self):
64         self._remote = None
65         self.default_keysize = 2048
66
67     def set_remote_generator(self, keygen):
68         self._remote = keygen
69     def set_default_keysize(self, keysize):
70         """Call this to override the size of the RSA keys created for new
71         mutable files which don't otherwise specify a size. This will affect
72         all subsequent calls to generate() without a keysize= argument. The
73         default size is 2048 bits. Test cases should call this method once
74         during setup, to cause me to create smaller keys, so the unit tests
75         run faster."""
76         self.default_keysize = keysize
77
78     def generate(self, keysize=None):
79         """I return a Deferred that fires with a (verifyingkey, signingkey)
80         pair. I accept a keysize in bits (2048 bit keys are standard, smaller
81         keys are used for testing). If you do not provide a keysize, I will
82         use my default, which is set by a call to set_default_keysize(). If
83         set_default_keysize() has never been called, I will create 2048 bit
84         keys."""
85         keysize = keysize or self.default_keysize
86         if self._remote:
87             d = self._remote.callRemote('get_rsa_key_pair', keysize)
88             def make_key_objs((verifying_key, signing_key)):
89                 v = rsa.create_verifying_key_from_string(verifying_key)
90                 s = rsa.create_signing_key_from_string(signing_key)
91                 return v, s
92             d.addCallback(make_key_objs)
93             return d
94         else:
95             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
96             # secs
97             signer = rsa.generate(keysize)
98             verifier = signer.get_verifying_key()
99             return defer.succeed( (verifier, signer) )
100
101 class Terminator(service.Service):
102     def __init__(self):
103         self._clients = weakref.WeakKeyDictionary()
104     def register(self, c):
105         self._clients[c] = None
106     def stopService(self):
107         for c in self._clients:
108             c.stop()
109         return service.Service.stopService(self)
110
111
112 class Client(node.Node, pollmixin.PollMixin):
113     implements(IStatsProducer)
114
115     PORTNUMFILE = "client.port"
116     STOREDIR = 'storage'
117     NODETYPE = "client"
118     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
119
120     # This means that if a storage server treats me as though I were a
121     # 1.0.0 storage client, it will work as they expect.
122     OLDEST_SUPPORTED_VERSION = "1.0.0"
123
124     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
125     # is the number of shares required to reconstruct a file. 'desired' means
126     # that we will abort an upload unless we can allocate space for at least
127     # this many. 'total' is the total number of shares created by encoding.
128     # If everybody has room then this is is how many we will upload.
129     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
130                                    "happy": 7,
131                                    "n": 10,
132                                    "max_segment_size": 128*KiB,
133                                    }
134
135     def __init__(self, basedir="."):
136         node.Node.__init__(self, basedir)
137         self.started_timestamp = time.time()
138         self.logSource="Client"
139         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
140         self.init_introducer_client()
141         self.init_stats_provider()
142         self.init_secrets()
143         self.init_storage()
144         self.init_control()
145         self.helper = None
146         if self.get_config("helper", "enabled", False, boolean=True):
147             self.init_helper()
148         self._key_generator = KeyGenerator()
149         key_gen_furl = self.get_config("client", "key_generator.furl", None)
150         if key_gen_furl:
151             self.init_key_gen(key_gen_furl)
152         self.init_client()
153         # ControlServer and Helper are attached after Tub startup
154         self.init_ftp_server()
155         self.init_sftp_server()
156         self.init_drop_uploader()
157
158         hotline_file = os.path.join(self.basedir,
159                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
160         if os.path.exists(hotline_file):
161             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
162             self.log("hotline file noticed (%ds old), starting timer" % age)
163             hotline = TimerService(1.0, self._check_hotline, hotline_file)
164             hotline.setServiceParent(self)
165
166         # this needs to happen last, so it can use getServiceNamed() to
167         # acquire references to StorageServer and other web-statusable things
168         webport = self.get_config("node", "web.port", None)
169         if webport:
170             self.init_web(webport) # strports string
171
172     def init_introducer_client(self):
173         self.introducer_furl = self.get_config("client", "introducer.furl")
174         ic = IntroducerClient(self.tub, self.introducer_furl,
175                               self.nickname,
176                               str(allmydata.__full_version__),
177                               str(self.OLDEST_SUPPORTED_VERSION))
178         self.introducer_client = ic
179         # hold off on starting the IntroducerClient until our tub has been
180         # started, so we'll have a useful address on our RemoteReference, so
181         # that the introducer's status page will show us.
182         d = self.when_tub_ready()
183         def _start_introducer_client(res):
184             ic.setServiceParent(self)
185         d.addCallback(_start_introducer_client)
186         d.addErrback(log.err, facility="tahoe.init",
187                      level=log.BAD, umid="URyI5w")
188
189     def init_stats_provider(self):
190         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
191         self.stats_provider = StatsProvider(self, gatherer_furl)
192         self.add_service(self.stats_provider)
193         self.stats_provider.register_producer(self)
194
195     def get_stats(self):
196         return { 'node.uptime': time.time() - self.started_timestamp }
197
198     def init_secrets(self):
199         lease_s = self.get_or_create_private_config("secret", _make_secret)
200         lease_secret = base32.a2b(lease_s)
201         convergence_s = self.get_or_create_private_config('convergence',
202                                                           _make_secret)
203         self.convergence = base32.a2b(convergence_s)
204         self._secret_holder = SecretHolder(lease_secret, self.convergence)
205
206     def init_storage(self):
207         # should we run a storage server (and publish it for others to use)?
208         if not self.get_config("storage", "enabled", True, boolean=True):
209             return
210         readonly = self.get_config("storage", "readonly", False, boolean=True)
211
212         storedir = os.path.join(self.basedir, self.STOREDIR)
213
214         data = self.get_config("storage", "reserved_space", None)
215         reserved = None
216         try:
217             reserved = parse_abbreviated_size(data)
218         except ValueError:
219             log.msg("[storage]reserved_space= contains unparseable value %s"
220                     % data)
221         if reserved is None:
222             reserved = 0
223         discard = self.get_config("storage", "debug_discard", False,
224                                   boolean=True)
225
226         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
227         if expire:
228             mode = self.get_config("storage", "expire.mode") # require a mode
229         else:
230             mode = self.get_config("storage", "expire.mode", "age")
231
232         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
233         if o_l_d is not None:
234             o_l_d = parse_duration(o_l_d)
235
236         cutoff_date = None
237         if mode == "cutoff-date":
238             cutoff_date = self.get_config("storage", "expire.cutoff_date")
239             cutoff_date = parse_date(cutoff_date)
240
241         sharetypes = []
242         if self.get_config("storage", "expire.immutable", True, boolean=True):
243             sharetypes.append("immutable")
244         if self.get_config("storage", "expire.mutable", True, boolean=True):
245             sharetypes.append("mutable")
246         expiration_sharetypes = tuple(sharetypes)
247
248         ss = StorageServer(storedir, self.nodeid,
249                            reserved_space=reserved,
250                            discard_storage=discard,
251                            readonly_storage=readonly,
252                            stats_provider=self.stats_provider,
253                            expiration_enabled=expire,
254                            expiration_mode=mode,
255                            expiration_override_lease_duration=o_l_d,
256                            expiration_cutoff_date=cutoff_date,
257                            expiration_sharetypes=expiration_sharetypes)
258         self.add_service(ss)
259
260         d = self.when_tub_ready()
261         # we can't do registerReference until the Tub is ready
262         def _publish(res):
263             furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
264             furl = self.tub.registerReference(ss, furlFile=furl_file)
265             ri_name = RIStorageServer.__remote_name__
266             self.introducer_client.publish(furl, "storage", ri_name)
267         d.addCallback(_publish)
268         d.addErrback(log.err, facility="tahoe.init",
269                      level=log.BAD, umid="aLGBKw")
270
271     def init_client(self):
272         helper_furl = self.get_config("client", "helper.furl", None)
273         DEP = self.DEFAULT_ENCODING_PARAMETERS
274         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
275         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
276         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
277
278         self.init_client_storage_broker()
279         self.history = History(self.stats_provider)
280         self.terminator = Terminator()
281         self.terminator.setServiceParent(self)
282         self.add_service(Uploader(helper_furl, self.stats_provider,
283                                   self.history))
284         self.init_stub_client()
285         self.init_blacklist()
286         self.init_nodemaker()
287
288     def init_client_storage_broker(self):
289         # create a StorageFarmBroker object, for use by Uploader/Downloader
290         # (and everybody else who wants to use storage servers)
291         sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
292         self.storage_broker = sb
293
294         # load static server specifications from tahoe.cfg, if any.
295         # Not quite ready yet.
296         #if self.config.has_section("client-server-selection"):
297         #    server_params = {} # maps serverid to dict of parameters
298         #    for (name, value) in self.config.items("client-server-selection"):
299         #        pieces = name.split(".")
300         #        if pieces[0] == "server":
301         #            serverid = pieces[1]
302         #            if serverid not in server_params:
303         #                server_params[serverid] = {}
304         #            server_params[serverid][pieces[2]] = value
305         #    for serverid, params in server_params.items():
306         #        server_type = params.pop("type")
307         #        if server_type == "tahoe-foolscap":
308         #            s = storage_client.NativeStorageClient(*params)
309         #        else:
310         #            msg = ("unrecognized server type '%s' in "
311         #                   "tahoe.cfg [client-server-selection]server.%s.type"
312         #                   % (server_type, serverid))
313         #            raise storage_client.UnknownServerTypeError(msg)
314         #        sb.add_server(s.serverid, s)
315
316         # check to see if we're supposed to use the introducer too
317         if self.get_config("client-server-selection", "use_introducer",
318                            default=True, boolean=True):
319             sb.use_introducer(self.introducer_client)
320
321     def get_storage_broker(self):
322         return self.storage_broker
323
324     def init_stub_client(self):
325         def _publish(res):
326             # we publish an empty object so that the introducer can count how
327             # many clients are connected and see what versions they're
328             # running.
329             sc = StubClient()
330             furl = self.tub.registerReference(sc)
331             ri_name = RIStubClient.__remote_name__
332             self.introducer_client.publish(furl, "stub_client", ri_name)
333         d = self.when_tub_ready()
334         d.addCallback(_publish)
335         d.addErrback(log.err, facility="tahoe.init",
336                      level=log.BAD, umid="OEHq3g")
337
338     def init_blacklist(self):
339         fn = os.path.join(self.basedir, "access.blacklist")
340         self.blacklist = Blacklist(fn)
341
342     def init_nodemaker(self):
343         default = self.get_config("client", "mutable.format", default="SDMF")
344         if default.upper() == "MDMF":
345             self.mutable_file_default = MDMF_VERSION
346         else:
347             self.mutable_file_default = SDMF_VERSION
348         self.nodemaker = NodeMaker(self.storage_broker,
349                                    self._secret_holder,
350                                    self.get_history(),
351                                    self.getServiceNamed("uploader"),
352                                    self.terminator,
353                                    self.get_encoding_parameters(),
354                                    self.mutable_file_default,
355                                    self._key_generator,
356                                    self.blacklist)
357
358     def get_history(self):
359         return self.history
360
361     def init_control(self):
362         d = self.when_tub_ready()
363         def _publish(res):
364             c = ControlServer()
365             c.setServiceParent(self)
366             control_url = self.tub.registerReference(c)
367             self.write_private_config("control.furl", control_url + "\n")
368         d.addCallback(_publish)
369         d.addErrback(log.err, facility="tahoe.init",
370                      level=log.BAD, umid="d3tNXA")
371
372     def init_helper(self):
373         d = self.when_tub_ready()
374         def _publish(self):
375             self.helper = Helper(os.path.join(self.basedir, "helper"),
376                                  self.storage_broker, self._secret_holder,
377                                  self.stats_provider, self.history)
378             # TODO: this is confusing. BASEDIR/private/helper.furl is created
379             # by the helper. BASEDIR/helper.furl is consumed by the client
380             # who wants to use the helper. I like having the filename be the
381             # same, since that makes 'cp' work smoothly, but the difference
382             # between config inputs and generated outputs is hard to see.
383             helper_furlfile = os.path.join(self.basedir,
384                                            "private", "helper.furl").encode(get_filesystem_encoding())
385             self.tub.registerReference(self.helper, furlFile=helper_furlfile)
386         d.addCallback(_publish)
387         d.addErrback(log.err, facility="tahoe.init",
388                      level=log.BAD, umid="K0mW5w")
389
390     def init_key_gen(self, key_gen_furl):
391         d = self.when_tub_ready()
392         def _subscribe(self):
393             self.tub.connectTo(key_gen_furl, self._got_key_generator)
394         d.addCallback(_subscribe)
395         d.addErrback(log.err, facility="tahoe.init",
396                      level=log.BAD, umid="z9DMzw")
397
398     def _got_key_generator(self, key_generator):
399         self._key_generator.set_remote_generator(key_generator)
400         key_generator.notifyOnDisconnect(self._lost_key_generator)
401
402     def _lost_key_generator(self):
403         self._key_generator.set_remote_generator(None)
404
405     def set_default_mutable_keysize(self, keysize):
406         self._key_generator.set_default_keysize(keysize)
407
408     def init_web(self, webport):
409         self.log("init_web(webport=%s)", args=(webport,))
410
411         from allmydata.webish import WebishServer
412         nodeurl_path = os.path.join(self.basedir, "node.url")
413         staticdir = self.get_config("node", "web.static", "public_html")
414         staticdir = os.path.expanduser(staticdir)
415         ws = WebishServer(self, webport, nodeurl_path, staticdir)
416         self.add_service(ws)
417
418     def init_ftp_server(self):
419         if self.get_config("ftpd", "enabled", False, boolean=True):
420             accountfile = self.get_config("ftpd", "accounts.file", None)
421             accounturl = self.get_config("ftpd", "accounts.url", None)
422             ftp_portstr = self.get_config("ftpd", "port", "8021")
423
424             from allmydata.frontends import ftpd
425             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
426             s.setServiceParent(self)
427
428     def init_sftp_server(self):
429         if self.get_config("sftpd", "enabled", False, boolean=True):
430             accountfile = self.get_config("sftpd", "accounts.file", None)
431             accounturl = self.get_config("sftpd", "accounts.url", None)
432             sftp_portstr = self.get_config("sftpd", "port", "8022")
433             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
434             privkey_file = self.get_config("sftpd", "host_privkey_file")
435
436             from allmydata.frontends import sftpd
437             s = sftpd.SFTPServer(self, accountfile, accounturl,
438                                  sftp_portstr, pubkey_file, privkey_file)
439             s.setServiceParent(self)
440
441     def init_drop_uploader(self):
442         if self.get_config("drop_upload", "enabled", False, boolean=True):
443             if self.get_config("drop_upload", "upload.dircap", None):
444                 raise OldConfigOptionError("The [drop_upload]upload.dircap option is no longer supported; please "
445                                            "put the cap in a 'private/drop_upload_dircap' file, and delete this option.")
446
447             upload_dircap = self.get_or_create_private_config("drop_upload_dircap")
448             local_dir_utf8 = self.get_config("drop_upload", "local.directory")
449
450             try:
451                 from allmydata.frontends import drop_upload
452                 s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
453                 s.setServiceParent(self)
454                 s.startService()
455             except Exception, e:
456                 self.log("couldn't start drop-uploader: %r", args=(e,))
457
458     def _check_hotline(self, hotline_file):
459         if os.path.exists(hotline_file):
460             mtime = os.stat(hotline_file)[stat.ST_MTIME]
461             if mtime > time.time() - 120.0:
462                 return
463             else:
464                 self.log("hotline file too old, shutting down")
465         else:
466             self.log("hotline file missing, shutting down")
467         reactor.stop()
468
469     def get_encoding_parameters(self):
470         return self.DEFAULT_ENCODING_PARAMETERS
471
472     def connected_to_introducer(self):
473         if self.introducer_client:
474             return self.introducer_client.connected_to_introducer()
475         return False
476
477     def get_renewal_secret(self): # this will go away
478         return self._secret_holder.get_renewal_secret()
479
480     def get_cancel_secret(self):
481         return self._secret_holder.get_cancel_secret()
482
483     def debug_wait_for_client_connections(self, num_clients):
484         """Return a Deferred that fires (with None) when we have connections
485         to the given number of peers. Useful for tests that set up a
486         temporary test network and need to know when it is safe to proceed
487         with an upload or download."""
488         def _check():
489             return len(self.storage_broker.get_connected_servers()) >= num_clients
490         d = self.poll(_check, 0.5)
491         d.addCallback(lambda res: None)
492         return d
493
494
495     # these four methods are the primitives for creating filenodes and
496     # dirnodes. The first takes a URI and produces a filenode or (new-style)
497     # dirnode. The other three create brand-new filenodes/dirnodes.
498
499     def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
500         # This returns synchronously.
501         # Note that it does *not* validate the write_uri and read_uri; instead we
502         # may get an opaque node if there were any problems.
503         return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
504
505     def create_dirnode(self, initial_children={}, version=None):
506         d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
507         return d
508
509     def create_immutable_dirnode(self, children, convergence=None):
510         return self.nodemaker.create_immutable_directory(children, convergence)
511
512     def create_mutable_file(self, contents=None, keysize=None, version=None):
513         return self.nodemaker.create_mutable_file(contents, keysize,
514                                                   version=version)
515
516     def upload(self, uploadable):
517         uploader = self.getServiceNamed("uploader")
518         return uploader.upload(uploadable)