]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
drop-upload: rename the 'upload.uri' parameter to 'upload.dircap', and a couple of...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
4
5 from zope.interface import implements
6 from twisted.internet import reactor, defer
7 from twisted.application import service
8 from twisted.application.internet import TimerService
9 from foolscap.api import Referenceable
10 from pycryptopp.publickey import rsa
11
12 import allmydata
13 from allmydata.storage.server import StorageServer
14 from allmydata import storage_client
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.offloaded import Helper
17 from allmydata.control import ControlServer
18 from allmydata.introducer.client import IntroducerClient
19 from allmydata.util import hashutil, base32, pollmixin, log
20 from allmydata.util.encodingutil import get_filesystem_encoding
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.stats import StatsProvider
24 from allmydata.history import History
25 from allmydata.interfaces import IStatsProducer, RIStubClient
26 from allmydata.nodemaker import NodeMaker
27
28
29 KiB=1024
30 MiB=1024*KiB
31 GiB=1024*MiB
32 TiB=1024*GiB
33 PiB=1024*TiB
34
35 class StubClient(Referenceable):
36     implements(RIStubClient)
37
38 def _make_secret():
39     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
40
41 class SecretHolder:
42     def __init__(self, lease_secret, convergence_secret):
43         self._lease_secret = lease_secret
44         self._convergence_secret = convergence_secret
45
46     def get_renewal_secret(self):
47         return hashutil.my_renewal_secret_hash(self._lease_secret)
48
49     def get_cancel_secret(self):
50         return hashutil.my_cancel_secret_hash(self._lease_secret)
51
52     def get_convergence_secret(self):
53         return self._convergence_secret
54
55 class KeyGenerator:
56     """I create RSA keys for mutable files. Each call to generate() returns a
57     single keypair. The keysize is specified first by the keysize= argument
58     to generate(), then with a default set by set_default_keysize(), then
59     with a built-in default of 2048 bits."""
60     def __init__(self):
61         self._remote = None
62         self.default_keysize = 2048
63
64     def set_remote_generator(self, keygen):
65         self._remote = keygen
66     def set_default_keysize(self, keysize):
67         """Call this to override the size of the RSA keys created for new
68         mutable files which don't otherwise specify a size. This will affect
69         all subsequent calls to generate() without a keysize= argument. The
70         default size is 2048 bits. Test cases should call this method once
71         during setup, to cause me to create smaller (522 bit) keys, so the
72         unit tests run faster."""
73         self.default_keysize = keysize
74
75     def generate(self, keysize=None):
76         """I return a Deferred that fires with a (verifyingkey, signingkey)
77         pair. I accept a keysize in bits (522 bit keys are fast for testing,
78         2048 bit keys are standard). If you do not provide a keysize, I will
79         use my default, which is set by a call to set_default_keysize(). If
80         set_default_keysize() has never been called, I will create 2048 bit
81         keys."""
82         keysize = keysize or self.default_keysize
83         if self._remote:
84             d = self._remote.callRemote('get_rsa_key_pair', keysize)
85             def make_key_objs((verifying_key, signing_key)):
86                 v = rsa.create_verifying_key_from_string(verifying_key)
87                 s = rsa.create_signing_key_from_string(signing_key)
88                 return v, s
89             d.addCallback(make_key_objs)
90             return d
91         else:
92             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
93             # secs
94             signer = rsa.generate(keysize)
95             verifier = signer.get_verifying_key()
96             return defer.succeed( (verifier, signer) )
97
98 class Terminator(service.Service):
99     def __init__(self):
100         self._clients = weakref.WeakKeyDictionary()
101     def register(self, c):
102         self._clients[c] = None
103     def stopService(self):
104         for c in self._clients:
105             c.stop()
106         return service.Service.stopService(self)
107
108
109 class Client(node.Node, pollmixin.PollMixin):
110     implements(IStatsProducer)
111
112     PORTNUMFILE = "client.port"
113     STOREDIR = 'storage'
114     NODETYPE = "client"
115     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
116
117     # This means that if a storage server treats me as though I were a
118     # 1.0.0 storage client, it will work as they expect.
119     OLDEST_SUPPORTED_VERSION = "1.0.0"
120
121     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
122     # is the number of shares required to reconstruct a file. 'desired' means
123     # that we will abort an upload unless we can allocate space for at least
124     # this many. 'total' is the total number of shares created by encoding.
125     # If everybody has room then this is is how many we will upload.
126     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
127                                    "happy": 7,
128                                    "n": 10,
129                                    "max_segment_size": 128*KiB,
130                                    }
131
132     def __init__(self, basedir="."):
133         node.Node.__init__(self, basedir)
134         self.started_timestamp = time.time()
135         self.logSource="Client"
136         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
137         self.init_introducer_client()
138         self.init_stats_provider()
139         self.init_secrets()
140         self.init_storage()
141         self.init_control()
142         self.helper = None
143         if self.get_config("helper", "enabled", False, boolean=True):
144             self.init_helper()
145         self._key_generator = KeyGenerator()
146         key_gen_furl = self.get_config("client", "key_generator.furl", None)
147         if key_gen_furl:
148             self.init_key_gen(key_gen_furl)
149         self.init_client()
150         # ControlServer and Helper are attached after Tub startup
151         self.init_ftp_server()
152         self.init_sftp_server()
153         self.init_drop_uploader()
154
155         hotline_file = os.path.join(self.basedir,
156                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
157         if os.path.exists(hotline_file):
158             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
159             self.log("hotline file noticed (%ds old), starting timer" % age)
160             hotline = TimerService(1.0, self._check_hotline, hotline_file)
161             hotline.setServiceParent(self)
162
163         # this needs to happen last, so it can use getServiceNamed() to
164         # acquire references to StorageServer and other web-statusable things
165         webport = self.get_config("node", "web.port", None)
166         if webport:
167             self.init_web(webport) # strports string
168
169     def init_introducer_client(self):
170         self.introducer_furl = self.get_config("client", "introducer.furl")
171         ic = IntroducerClient(self.tub, self.introducer_furl,
172                               self.nickname,
173                               str(allmydata.__full_version__),
174                               str(self.OLDEST_SUPPORTED_VERSION))
175         self.introducer_client = ic
176         # hold off on starting the IntroducerClient until our tub has been
177         # started, so we'll have a useful address on our RemoteReference, so
178         # that the introducer's status page will show us.
179         d = self.when_tub_ready()
180         def _start_introducer_client(res):
181             ic.setServiceParent(self)
182         d.addCallback(_start_introducer_client)
183         d.addErrback(log.err, facility="tahoe.init",
184                      level=log.BAD, umid="URyI5w")
185
186     def init_stats_provider(self):
187         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
188         self.stats_provider = StatsProvider(self, gatherer_furl)
189         self.add_service(self.stats_provider)
190         self.stats_provider.register_producer(self)
191
192     def get_stats(self):
193         return { 'node.uptime': time.time() - self.started_timestamp }
194
195     def init_secrets(self):
196         lease_s = self.get_or_create_private_config("secret", _make_secret)
197         lease_secret = base32.a2b(lease_s)
198         convergence_s = self.get_or_create_private_config('convergence',
199                                                           _make_secret)
200         self.convergence = base32.a2b(convergence_s)
201         self._secret_holder = SecretHolder(lease_secret, self.convergence)
202
203     def init_storage(self):
204         # should we run a storage server (and publish it for others to use)?
205         if not self.get_config("storage", "enabled", True, boolean=True):
206             return
207         readonly = self.get_config("storage", "readonly", False, boolean=True)
208
209         storedir = os.path.join(self.basedir, self.STOREDIR)
210
211         data = self.get_config("storage", "reserved_space", None)
212         reserved = None
213         try:
214             reserved = parse_abbreviated_size(data)
215         except ValueError:
216             log.msg("[storage]reserved_space= contains unparseable value %s"
217                     % data)
218         if reserved is None:
219             reserved = 0
220         discard = self.get_config("storage", "debug_discard", False,
221                                   boolean=True)
222
223         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
224         if expire:
225             mode = self.get_config("storage", "expire.mode") # require a mode
226         else:
227             mode = self.get_config("storage", "expire.mode", "age")
228
229         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
230         if o_l_d is not None:
231             o_l_d = parse_duration(o_l_d)
232
233         cutoff_date = None
234         if mode == "cutoff-date":
235             cutoff_date = self.get_config("storage", "expire.cutoff_date")
236             cutoff_date = parse_date(cutoff_date)
237
238         sharetypes = []
239         if self.get_config("storage", "expire.immutable", True, boolean=True):
240             sharetypes.append("immutable")
241         if self.get_config("storage", "expire.mutable", True, boolean=True):
242             sharetypes.append("mutable")
243         expiration_sharetypes = tuple(sharetypes)
244
245         ss = StorageServer(storedir, self.nodeid,
246                            reserved_space=reserved,
247                            discard_storage=discard,
248                            readonly_storage=readonly,
249                            stats_provider=self.stats_provider,
250                            expiration_enabled=expire,
251                            expiration_mode=mode,
252                            expiration_override_lease_duration=o_l_d,
253                            expiration_cutoff_date=cutoff_date,
254                            expiration_sharetypes=expiration_sharetypes)
255         self.add_service(ss)
256
257         d = self.when_tub_ready()
258         # we can't do registerReference until the Tub is ready
259         def _publish(res):
260             furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
261             furl = self.tub.registerReference(ss, furlFile=furl_file)
262             ri_name = RIStorageServer.__remote_name__
263             self.introducer_client.publish(furl, "storage", ri_name)
264         d.addCallback(_publish)
265         d.addErrback(log.err, facility="tahoe.init",
266                      level=log.BAD, umid="aLGBKw")
267
268     def init_client(self):
269         helper_furl = self.get_config("client", "helper.furl", None)
270         DEP = self.DEFAULT_ENCODING_PARAMETERS
271         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
272         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
273         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
274
275         self.init_client_storage_broker()
276         self.history = History(self.stats_provider)
277         self.terminator = Terminator()
278         self.terminator.setServiceParent(self)
279         self.add_service(Uploader(helper_furl, self.stats_provider))
280         self.init_stub_client()
281         self.init_nodemaker()
282
283     def init_client_storage_broker(self):
284         # create a StorageFarmBroker object, for use by Uploader/Downloader
285         # (and everybody else who wants to use storage servers)
286         sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
287         self.storage_broker = sb
288
289         # load static server specifications from tahoe.cfg, if any.
290         # Not quite ready yet.
291         #if self.config.has_section("client-server-selection"):
292         #    server_params = {} # maps serverid to dict of parameters
293         #    for (name, value) in self.config.items("client-server-selection"):
294         #        pieces = name.split(".")
295         #        if pieces[0] == "server":
296         #            serverid = pieces[1]
297         #            if serverid not in server_params:
298         #                server_params[serverid] = {}
299         #            server_params[serverid][pieces[2]] = value
300         #    for serverid, params in server_params.items():
301         #        server_type = params.pop("type")
302         #        if server_type == "tahoe-foolscap":
303         #            s = storage_client.NativeStorageClient(*params)
304         #        else:
305         #            msg = ("unrecognized server type '%s' in "
306         #                   "tahoe.cfg [client-server-selection]server.%s.type"
307         #                   % (server_type, serverid))
308         #            raise storage_client.UnknownServerTypeError(msg)
309         #        sb.add_server(s.serverid, s)
310
311         # check to see if we're supposed to use the introducer too
312         if self.get_config("client-server-selection", "use_introducer",
313                            default=True, boolean=True):
314             sb.use_introducer(self.introducer_client)
315
316     def get_storage_broker(self):
317         return self.storage_broker
318
319     def init_stub_client(self):
320         def _publish(res):
321             # we publish an empty object so that the introducer can count how
322             # many clients are connected and see what versions they're
323             # running.
324             sc = StubClient()
325             furl = self.tub.registerReference(sc)
326             ri_name = RIStubClient.__remote_name__
327             self.introducer_client.publish(furl, "stub_client", ri_name)
328         d = self.when_tub_ready()
329         d.addCallback(_publish)
330         d.addErrback(log.err, facility="tahoe.init",
331                      level=log.BAD, umid="OEHq3g")
332
333     def init_nodemaker(self):
334         self.nodemaker = NodeMaker(self.storage_broker,
335                                    self._secret_holder,
336                                    self.get_history(),
337                                    self.getServiceNamed("uploader"),
338                                    self.terminator,
339                                    self.get_encoding_parameters(),
340                                    self._key_generator)
341
342     def get_history(self):
343         return self.history
344
345     def init_control(self):
346         d = self.when_tub_ready()
347         def _publish(res):
348             c = ControlServer()
349             c.setServiceParent(self)
350             control_url = self.tub.registerReference(c)
351             self.write_private_config("control.furl", control_url + "\n")
352         d.addCallback(_publish)
353         d.addErrback(log.err, facility="tahoe.init",
354                      level=log.BAD, umid="d3tNXA")
355
356     def init_helper(self):
357         d = self.when_tub_ready()
358         def _publish(self):
359             self.helper = Helper(os.path.join(self.basedir, "helper"),
360                                  self.storage_broker, self._secret_holder,
361                                  self.stats_provider, self.history)
362             # TODO: this is confusing. BASEDIR/private/helper.furl is created
363             # by the helper. BASEDIR/helper.furl is consumed by the client
364             # who wants to use the helper. I like having the filename be the
365             # same, since that makes 'cp' work smoothly, but the difference
366             # between config inputs and generated outputs is hard to see.
367             helper_furlfile = os.path.join(self.basedir,
368                                            "private", "helper.furl").encode(get_filesystem_encoding())
369             self.tub.registerReference(self.helper, furlFile=helper_furlfile)
370         d.addCallback(_publish)
371         d.addErrback(log.err, facility="tahoe.init",
372                      level=log.BAD, umid="K0mW5w")
373
374     def init_key_gen(self, key_gen_furl):
375         d = self.when_tub_ready()
376         def _subscribe(self):
377             self.tub.connectTo(key_gen_furl, self._got_key_generator)
378         d.addCallback(_subscribe)
379         d.addErrback(log.err, facility="tahoe.init",
380                      level=log.BAD, umid="z9DMzw")
381
382     def _got_key_generator(self, key_generator):
383         self._key_generator.set_remote_generator(key_generator)
384         key_generator.notifyOnDisconnect(self._lost_key_generator)
385
386     def _lost_key_generator(self):
387         self._key_generator.set_remote_generator(None)
388
389     def set_default_mutable_keysize(self, keysize):
390         self._key_generator.set_default_keysize(keysize)
391
392     def init_web(self, webport):
393         self.log("init_web(webport=%s)", args=(webport,))
394
395         from allmydata.webish import WebishServer
396         nodeurl_path = os.path.join(self.basedir, "node.url")
397         staticdir = self.get_config("node", "web.static", "public_html")
398         staticdir = os.path.expanduser(staticdir)
399         ws = WebishServer(self, webport, nodeurl_path, staticdir)
400         self.add_service(ws)
401
402     def init_ftp_server(self):
403         if self.get_config("ftpd", "enabled", False, boolean=True):
404             accountfile = self.get_config("ftpd", "accounts.file", None)
405             accounturl = self.get_config("ftpd", "accounts.url", None)
406             ftp_portstr = self.get_config("ftpd", "port", "8021")
407
408             from allmydata.frontends import ftpd
409             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
410             s.setServiceParent(self)
411
412     def init_sftp_server(self):
413         if self.get_config("sftpd", "enabled", False, boolean=True):
414             accountfile = self.get_config("sftpd", "accounts.file", None)
415             accounturl = self.get_config("sftpd", "accounts.url", None)
416             sftp_portstr = self.get_config("sftpd", "port", "8022")
417             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
418             privkey_file = self.get_config("sftpd", "host_privkey_file")
419
420             from allmydata.frontends import sftpd
421             s = sftpd.SFTPServer(self, accountfile, accounturl,
422                                  sftp_portstr, pubkey_file, privkey_file)
423             s.setServiceParent(self)
424
425     def init_drop_uploader(self):
426         if self.get_config("drop_upload", "enabled", False, boolean=True):
427             upload_dircap = self.get_config("drop_upload", "upload.dircap", None)
428             local_dir_utf8 = self.get_config("drop_upload", "local.directory", None)
429
430             if upload_dircap and local_dir_utf8:
431                 try:
432                     from allmydata.frontends import drop_upload
433                     s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
434                     s.setServiceParent(self)
435                     s.start()
436                 except Exception, e:
437                     self.log("couldn't start drop-uploader: %r", args=(e,))
438             else:
439                 self.log("couldn't start drop-uploader: upload.dircap or local.directory not specified")
440
441     def _check_hotline(self, hotline_file):
442         if os.path.exists(hotline_file):
443             mtime = os.stat(hotline_file)[stat.ST_MTIME]
444             if mtime > time.time() - 120.0:
445                 return
446             else:
447                 self.log("hotline file too old, shutting down")
448         else:
449             self.log("hotline file missing, shutting down")
450         reactor.stop()
451
452     def get_encoding_parameters(self):
453         return self.DEFAULT_ENCODING_PARAMETERS
454
455     def connected_to_introducer(self):
456         if self.introducer_client:
457             return self.introducer_client.connected_to_introducer()
458         return False
459
460     def get_renewal_secret(self): # this will go away
461         return self._secret_holder.get_renewal_secret()
462
463     def get_cancel_secret(self):
464         return self._secret_holder.get_cancel_secret()
465
466     def debug_wait_for_client_connections(self, num_clients):
467         """Return a Deferred that fires (with None) when we have connections
468         to the given number of peers. Useful for tests that set up a
469         temporary test network and need to know when it is safe to proceed
470         with an upload or download."""
471         def _check():
472             return len(self.storage_broker.get_connected_servers()) >= num_clients
473         d = self.poll(_check, 0.5)
474         d.addCallback(lambda res: None)
475         return d
476
477
478     # these four methods are the primitives for creating filenodes and
479     # dirnodes. The first takes a URI and produces a filenode or (new-style)
480     # dirnode. The other three create brand-new filenodes/dirnodes.
481
482     def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
483         # This returns synchronously.
484         # Note that it does *not* validate the write_uri and read_uri; instead we
485         # may get an opaque node if there were any problems.
486         return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
487
488     def create_dirnode(self, initial_children={}):
489         d = self.nodemaker.create_new_mutable_directory(initial_children)
490         return d
491
492     def create_immutable_dirnode(self, children, convergence=None):
493         return self.nodemaker.create_immutable_directory(children, convergence)
494
495     def create_mutable_file(self, contents=None, keysize=None):
496         return self.nodemaker.create_mutable_file(contents, keysize)
497
498     def upload(self, uploadable):
499         uploader = self.getServiceNamed("uploader")
500         return uploader.upload(uploadable, history=self.get_history())