]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
4
5 from zope.interface import implements
6 from twisted.internet import reactor, defer
7 from twisted.application.internet import TimerService
8 from foolscap.api import Referenceable
9 from pycryptopp.publickey import rsa
10
11 import allmydata
12 from allmydata.storage.server import StorageServer
13 from allmydata import storage_client
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.download import Downloader
16 from allmydata.immutable.offloaded import Helper
17 from allmydata.control import ControlServer
18 from allmydata.introducer.client import IntroducerClient
19 from allmydata.util import hashutil, base32, pollmixin, cachedir, log
20 from allmydata.util.abbreviate import parse_abbreviated_size
21 from allmydata.util.time_format import parse_duration, parse_date
22 from allmydata.stats import StatsProvider
23 from allmydata.history import History
24 from allmydata.interfaces import IStatsProducer, RIStubClient
25 from allmydata.nodemaker import NodeMaker
26
27
28 KiB=1024
29 MiB=1024*KiB
30 GiB=1024*MiB
31 TiB=1024*GiB
32 PiB=1024*TiB
33
34 class StubClient(Referenceable):
35     implements(RIStubClient)
36
37 def _make_secret():
38     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
39
40 class SecretHolder:
41     def __init__(self, lease_secret):
42         self._lease_secret = lease_secret
43
44     def get_renewal_secret(self):
45         return hashutil.my_renewal_secret_hash(self._lease_secret)
46
47     def get_cancel_secret(self):
48         return hashutil.my_cancel_secret_hash(self._lease_secret)
49
50 class KeyGenerator:
51     def __init__(self):
52         self._remote = None
53         self.default_keysize = 2048
54
55     def set_remote_generator(self, keygen):
56         self._remote = keygen
57     def set_default_keysize(self, keysize):
58         """Call this to override the size of the RSA keys created for new
59         mutable files. The default of None means to let mutable.filenode
60         choose its own size, which means 2048 bits."""
61         self.default_keysize = keysize
62
63     def generate(self, keysize=None):
64         keysize = keysize or self.default_keysize
65         if self._remote:
66             d = self._remote.callRemote('get_rsa_key_pair', keysize)
67             def make_key_objs((verifying_key, signing_key)):
68                 v = rsa.create_verifying_key_from_string(verifying_key)
69                 s = rsa.create_signing_key_from_string(signing_key)
70                 return v, s
71             d.addCallback(make_key_objs)
72             return d
73         else:
74             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
75             # secs
76             signer = rsa.generate(keysize)
77             verifier = signer.get_verifying_key()
78             return defer.succeed( (verifier, signer) )
79
80
81 class Client(node.Node, pollmixin.PollMixin):
82     implements(IStatsProducer)
83
84     PORTNUMFILE = "client.port"
85     STOREDIR = 'storage'
86     NODETYPE = "client"
87     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
88
89     # This means that if a storage server treats me as though I were a
90     # 1.0.0 storage client, it will work as they expect.
91     OLDEST_SUPPORTED_VERSION = "1.0.0"
92
93     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
94     # is the number of shares required to reconstruct a file. 'desired' means
95     # that we will abort an upload unless we can allocate space for at least
96     # this many. 'total' is the total number of shares created by encoding.
97     # If everybody has room then this is is how many we will upload.
98     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
99                                    "happy": 7,
100                                    "n": 10,
101                                    "max_segment_size": 128*KiB,
102                                    }
103
104     def __init__(self, basedir="."):
105         node.Node.__init__(self, basedir)
106         self.started_timestamp = time.time()
107         self.logSource="Client"
108         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
109         self.init_introducer_client()
110         self.init_stats_provider()
111         self.init_lease_secret()
112         self.init_storage()
113         self.init_control()
114         if self.get_config("helper", "enabled", False, boolean=True):
115             self.init_helper()
116         self._key_generator = KeyGenerator()
117         key_gen_furl = self.get_config("client", "key_generator.furl", None)
118         if key_gen_furl:
119             self.init_key_gen(key_gen_furl)
120         self.init_client()
121         # ControlServer and Helper are attached after Tub startup
122         self.init_ftp_server()
123         self.init_sftp_server()
124
125         hotline_file = os.path.join(self.basedir,
126                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
127         if os.path.exists(hotline_file):
128             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
129             self.log("hotline file noticed (%ds old), starting timer" % age)
130             hotline = TimerService(1.0, self._check_hotline, hotline_file)
131             hotline.setServiceParent(self)
132
133         # this needs to happen last, so it can use getServiceNamed() to
134         # acquire references to StorageServer and other web-statusable things
135         webport = self.get_config("node", "web.port", None)
136         if webport:
137             self.init_web(webport) # strports string
138
139     def read_old_config_files(self):
140         node.Node.read_old_config_files(self)
141         copy = self._copy_config_from_file
142         copy("introducer.furl", "client", "introducer.furl")
143         copy("helper.furl", "client", "helper.furl")
144         copy("key_generator.furl", "client", "key_generator.furl")
145         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
146         if os.path.exists(os.path.join(self.basedir, "no_storage")):
147             self.set_config("storage", "enabled", "false")
148         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
149             self.set_config("storage", "readonly", "true")
150         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
151             self.set_config("storage", "debug_discard", "true")
152         if os.path.exists(os.path.join(self.basedir, "run_helper")):
153             self.set_config("helper", "enabled", "true")
154
155     def init_introducer_client(self):
156         self.introducer_furl = self.get_config("client", "introducer.furl")
157         ic = IntroducerClient(self.tub, self.introducer_furl,
158                               self.nickname,
159                               str(allmydata.__full_version__),
160                               str(self.OLDEST_SUPPORTED_VERSION))
161         self.introducer_client = ic
162         # hold off on starting the IntroducerClient until our tub has been
163         # started, so we'll have a useful address on our RemoteReference, so
164         # that the introducer's status page will show us.
165         d = self.when_tub_ready()
166         def _start_introducer_client(res):
167             ic.setServiceParent(self)
168         d.addCallback(_start_introducer_client)
169         d.addErrback(log.err, facility="tahoe.init",
170                      level=log.BAD, umid="URyI5w")
171
172     def init_stats_provider(self):
173         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
174         self.stats_provider = StatsProvider(self, gatherer_furl)
175         self.add_service(self.stats_provider)
176         self.stats_provider.register_producer(self)
177
178     def get_stats(self):
179         return { 'node.uptime': time.time() - self.started_timestamp }
180
181     def init_lease_secret(self):
182         secret_s = self.get_or_create_private_config("secret", _make_secret)
183         lease_secret = base32.a2b(secret_s)
184         self._secret_holder = SecretHolder(lease_secret)
185
186     def init_storage(self):
187         # should we run a storage server (and publish it for others to use)?
188         if not self.get_config("storage", "enabled", True, boolean=True):
189             return
190         readonly = self.get_config("storage", "readonly", False, boolean=True)
191
192         storedir = os.path.join(self.basedir, self.STOREDIR)
193
194         data = self.get_config("storage", "reserved_space", None)
195         reserved = None
196         try:
197             reserved = parse_abbreviated_size(data)
198         except ValueError:
199             log.msg("[storage]reserved_space= contains unparseable value %s"
200                     % data)
201         if reserved is None:
202             reserved = 0
203         discard = self.get_config("storage", "debug_discard", False,
204                                   boolean=True)
205
206         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
207         if expire:
208             mode = self.get_config("storage", "expire.mode") # require a mode
209         else:
210             mode = self.get_config("storage", "expire.mode", "age")
211
212         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
213         if o_l_d is not None:
214             o_l_d = parse_duration(o_l_d)
215
216         cutoff_date = None
217         if mode == "cutoff-date":
218             cutoff_date = self.get_config("storage", "expire.cutoff_date")
219             cutoff_date = parse_date(cutoff_date)
220
221         sharetypes = []
222         if self.get_config("storage", "expire.immutable", True, boolean=True):
223             sharetypes.append("immutable")
224         if self.get_config("storage", "expire.mutable", True, boolean=True):
225             sharetypes.append("mutable")
226         expiration_sharetypes = tuple(sharetypes)
227
228         ss = StorageServer(storedir, self.nodeid,
229                            reserved_space=reserved,
230                            discard_storage=discard,
231                            readonly_storage=readonly,
232                            stats_provider=self.stats_provider,
233                            expiration_enabled=expire,
234                            expiration_mode=mode,
235                            expiration_override_lease_duration=o_l_d,
236                            expiration_cutoff_date=cutoff_date,
237                            expiration_sharetypes=expiration_sharetypes)
238         self.add_service(ss)
239
240         d = self.when_tub_ready()
241         # we can't do registerReference until the Tub is ready
242         def _publish(res):
243             furl_file = os.path.join(self.basedir, "private", "storage.furl")
244             furl = self.tub.registerReference(ss, furlFile=furl_file)
245             ri_name = RIStorageServer.__remote_name__
246             self.introducer_client.publish(furl, "storage", ri_name)
247         d.addCallback(_publish)
248         d.addErrback(log.err, facility="tahoe.init",
249                      level=log.BAD, umid="aLGBKw")
250
251     def init_client(self):
252         helper_furl = self.get_config("client", "helper.furl", None)
253         DEP = self.DEFAULT_ENCODING_PARAMETERS
254         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
255         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
256         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
257         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
258         self.convergence = base32.a2b(convergence_s)
259
260         self.init_client_storage_broker()
261         self.history = self.add_service(History(self.stats_provider))
262         self.add_service(Uploader(helper_furl, self.stats_provider))
263         download_cachedir = os.path.join(self.basedir,
264                                          "private", "cache", "download")
265         self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
266         self.download_cache_dirman.setServiceParent(self)
267         self.add_service(Downloader(self.stats_provider))
268         self.init_stub_client()
269         self.init_nodemaker()
270
271     def init_client_storage_broker(self):
272         # create a StorageFarmBroker object, for use by Uploader/Downloader
273         # (and everybody else who wants to use storage servers)
274         sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
275         self.storage_broker = sb
276
277         # load static server specifications from tahoe.cfg, if any.
278         # Not quite ready yet.
279         #if self.config.has_section("client-server-selection"):
280         #    server_params = {} # maps serverid to dict of parameters
281         #    for (name, value) in self.config.items("client-server-selection"):
282         #        pieces = name.split(".")
283         #        if pieces[0] == "server":
284         #            serverid = pieces[1]
285         #            if serverid not in server_params:
286         #                server_params[serverid] = {}
287         #            server_params[serverid][pieces[2]] = value
288         #    for serverid, params in server_params.items():
289         #        server_type = params.pop("type")
290         #        if server_type == "tahoe-foolscap":
291         #            s = storage_client.NativeStorageClient(*params)
292         #        else:
293         #            msg = ("unrecognized server type '%s' in "
294         #                   "tahoe.cfg [client-server-selection]server.%s.type"
295         #                   % (server_type, serverid))
296         #            raise storage_client.UnknownServerTypeError(msg)
297         #        sb.add_server(s.serverid, s)
298
299         # check to see if we're supposed to use the introducer too
300         if self.get_config("client-server-selection", "use_introducer",
301                            default=True, boolean=True):
302             sb.use_introducer(self.introducer_client)
303
304     def get_storage_broker(self):
305         return self.storage_broker
306
307     def init_stub_client(self):
308         def _publish(res):
309             # we publish an empty object so that the introducer can count how
310             # many clients are connected and see what versions they're
311             # running.
312             sc = StubClient()
313             furl = self.tub.registerReference(sc)
314             ri_name = RIStubClient.__remote_name__
315             self.introducer_client.publish(furl, "stub_client", ri_name)
316         d = self.when_tub_ready()
317         d.addCallback(_publish)
318         d.addErrback(log.err, facility="tahoe.init",
319                      level=log.BAD, umid="OEHq3g")
320
321     def init_nodemaker(self):
322         self.nodemaker = NodeMaker(self.storage_broker,
323                                    self._secret_holder,
324                                    self.get_history(),
325                                    self.getServiceNamed("uploader"),
326                                    self.getServiceNamed("downloader"),
327                                    self.download_cache_dirman,
328                                    self.get_encoding_parameters(),
329                                    self._key_generator)
330
331     def get_history(self):
332         return self.getServiceNamed("history")
333
334     def init_control(self):
335         d = self.when_tub_ready()
336         def _publish(res):
337             c = ControlServer()
338             c.setServiceParent(self)
339             control_url = self.tub.registerReference(c)
340             self.write_private_config("control.furl", control_url + "\n")
341         d.addCallback(_publish)
342         d.addErrback(log.err, facility="tahoe.init",
343                      level=log.BAD, umid="d3tNXA")
344
345     def init_helper(self):
346         d = self.when_tub_ready()
347         def _publish(self):
348             h = Helper(os.path.join(self.basedir, "helper"),
349                        self.stats_provider, self.history)
350             h.setServiceParent(self)
351             # TODO: this is confusing. BASEDIR/private/helper.furl is created
352             # by the helper. BASEDIR/helper.furl is consumed by the client
353             # who wants to use the helper. I like having the filename be the
354             # same, since that makes 'cp' work smoothly, but the difference
355             # between config inputs and generated outputs is hard to see.
356             helper_furlfile = os.path.join(self.basedir,
357                                            "private", "helper.furl")
358             self.tub.registerReference(h, furlFile=helper_furlfile)
359         d.addCallback(_publish)
360         d.addErrback(log.err, facility="tahoe.init",
361                      level=log.BAD, umid="K0mW5w")
362
363     def init_key_gen(self, key_gen_furl):
364         d = self.when_tub_ready()
365         def _subscribe(self):
366             self.tub.connectTo(key_gen_furl, self._got_key_generator)
367         d.addCallback(_subscribe)
368         d.addErrback(log.err, facility="tahoe.init",
369                      level=log.BAD, umid="z9DMzw")
370
371     def _got_key_generator(self, key_generator):
372         self._key_generator.set_remote_generator(key_generator)
373         key_generator.notifyOnDisconnect(self._lost_key_generator)
374
375     def _lost_key_generator(self):
376         self._key_generator.set_remote_generator(None)
377
378     def set_default_mutable_keysize(self, keysize):
379         self._key_generator.set_default_keysize(keysize)
380
381     def init_web(self, webport):
382         self.log("init_web(webport=%s)", args=(webport,))
383
384         from allmydata.webish import WebishServer
385         nodeurl_path = os.path.join(self.basedir, "node.url")
386         staticdir = self.get_config("node", "web.static", "public_html")
387         staticdir = os.path.expanduser(staticdir)
388         ws = WebishServer(self, webport, nodeurl_path, staticdir)
389         self.add_service(ws)
390
391     def init_ftp_server(self):
392         if self.get_config("ftpd", "enabled", False, boolean=True):
393             accountfile = self.get_config("ftpd", "accounts.file", None)
394             accounturl = self.get_config("ftpd", "accounts.url", None)
395             ftp_portstr = self.get_config("ftpd", "port", "8021")
396
397             from allmydata.frontends import ftpd
398             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
399             s.setServiceParent(self)
400
401     def init_sftp_server(self):
402         if self.get_config("sftpd", "enabled", False, boolean=True):
403             accountfile = self.get_config("sftpd", "accounts.file", None)
404             accounturl = self.get_config("sftpd", "accounts.url", None)
405             sftp_portstr = self.get_config("sftpd", "port", "8022")
406             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
407             privkey_file = self.get_config("sftpd", "host_privkey_file")
408
409             from allmydata.frontends import sftpd
410             s = sftpd.SFTPServer(self, accountfile, accounturl,
411                                  sftp_portstr, pubkey_file, privkey_file)
412             s.setServiceParent(self)
413
414     def _check_hotline(self, hotline_file):
415         if os.path.exists(hotline_file):
416             mtime = os.stat(hotline_file)[stat.ST_MTIME]
417             if mtime > time.time() - 120.0:
418                 return
419             else:
420                 self.log("hotline file too old, shutting down")
421         else:
422             self.log("hotline file missing, shutting down")
423         reactor.stop()
424
425     def get_encoding_parameters(self):
426         return self.DEFAULT_ENCODING_PARAMETERS
427
428     def connected_to_introducer(self):
429         if self.introducer_client:
430             return self.introducer_client.connected_to_introducer()
431         return False
432
433     def get_renewal_secret(self): # this will go away
434         return self._secret_holder.get_renewal_secret()
435
436     def get_cancel_secret(self):
437         return self._secret_holder.get_cancel_secret()
438
439     def debug_wait_for_client_connections(self, num_clients):
440         """Return a Deferred that fires (with None) when we have connections
441         to the given number of peers. Useful for tests that set up a
442         temporary test network and need to know when it is safe to proceed
443         with an upload or download."""
444         def _check():
445             return len(self.storage_broker.get_all_servers()) >= num_clients
446         d = self.poll(_check, 0.5)
447         d.addCallback(lambda res: None)
448         return d
449
450
451     # these four methods are the primitives for creating filenodes and
452     # dirnodes. The first takes a URI and produces a filenode or (new-style)
453     # dirnode. The other three create brand-new filenodes/dirnodes.
454
455     def create_node_from_uri(self, writecap, readcap=None):
456         # this returns synchronously.
457         return self.nodemaker.create_from_cap(writecap, readcap)
458
459     def create_empty_dirnode(self):
460         return self.nodemaker.create_new_mutable_directory()
461
462     def create_mutable_file(self, contents="", keysize=None):
463         return self.nodemaker.create_mutable_file(contents, keysize)
464
465     def upload(self, uploadable):
466         uploader = self.getServiceNamed("uploader")
467         return uploader.upload(uploadable, history=self.get_history())