]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
replace Client.create_empty_dirnode() with create_dirnode(), in anticipation
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
4
5 from zope.interface import implements
6 from twisted.internet import reactor, defer
7 from twisted.application.internet import TimerService
8 from foolscap.api import Referenceable
9 from pycryptopp.publickey import rsa
10
11 import allmydata
12 from allmydata.storage.server import StorageServer
13 from allmydata import storage_client
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.download import Downloader
16 from allmydata.immutable.offloaded import Helper
17 from allmydata.control import ControlServer
18 from allmydata.introducer.client import IntroducerClient
19 from allmydata.util import hashutil, base32, pollmixin, cachedir, log
20 from allmydata.util.abbreviate import parse_abbreviated_size
21 from allmydata.util.time_format import parse_duration, parse_date
22 from allmydata.stats import StatsProvider
23 from allmydata.history import History
24 from allmydata.interfaces import IStatsProducer, RIStubClient
25 from allmydata.nodemaker import NodeMaker
26
27
28 KiB=1024
29 MiB=1024*KiB
30 GiB=1024*MiB
31 TiB=1024*GiB
32 PiB=1024*TiB
33
34 class StubClient(Referenceable):
35     implements(RIStubClient)
36
37 def _make_secret():
38     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
39
40 class SecretHolder:
41     def __init__(self, lease_secret):
42         self._lease_secret = lease_secret
43
44     def get_renewal_secret(self):
45         return hashutil.my_renewal_secret_hash(self._lease_secret)
46
47     def get_cancel_secret(self):
48         return hashutil.my_cancel_secret_hash(self._lease_secret)
49
50 class KeyGenerator:
51     def __init__(self):
52         self._remote = None
53         self.default_keysize = 2048
54
55     def set_remote_generator(self, keygen):
56         self._remote = keygen
57     def set_default_keysize(self, keysize):
58         """Call this to override the size of the RSA keys created for new
59         mutable files. The default of None means to let mutable.filenode
60         choose its own size, which means 2048 bits."""
61         self.default_keysize = keysize
62
63     def generate(self, keysize=None):
64         keysize = keysize or self.default_keysize
65         if self._remote:
66             d = self._remote.callRemote('get_rsa_key_pair', keysize)
67             def make_key_objs((verifying_key, signing_key)):
68                 v = rsa.create_verifying_key_from_string(verifying_key)
69                 s = rsa.create_signing_key_from_string(signing_key)
70                 return v, s
71             d.addCallback(make_key_objs)
72             return d
73         else:
74             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
75             # secs
76             signer = rsa.generate(keysize)
77             verifier = signer.get_verifying_key()
78             return defer.succeed( (verifier, signer) )
79
80
81 class Client(node.Node, pollmixin.PollMixin):
82     implements(IStatsProducer)
83
84     PORTNUMFILE = "client.port"
85     STOREDIR = 'storage'
86     NODETYPE = "client"
87     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
88
89     # This means that if a storage server treats me as though I were a
90     # 1.0.0 storage client, it will work as they expect.
91     OLDEST_SUPPORTED_VERSION = "1.0.0"
92
93     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
94     # is the number of shares required to reconstruct a file. 'desired' means
95     # that we will abort an upload unless we can allocate space for at least
96     # this many. 'total' is the total number of shares created by encoding.
97     # If everybody has room then this is is how many we will upload.
98     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
99                                    "happy": 7,
100                                    "n": 10,
101                                    "max_segment_size": 128*KiB,
102                                    }
103
104     def __init__(self, basedir="."):
105         node.Node.__init__(self, basedir)
106         self.started_timestamp = time.time()
107         self.logSource="Client"
108         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
109         self.init_introducer_client()
110         self.init_stats_provider()
111         self.init_lease_secret()
112         self.init_storage()
113         self.init_control()
114         self.helper = None
115         if self.get_config("helper", "enabled", False, boolean=True):
116             self.init_helper()
117         self._key_generator = KeyGenerator()
118         key_gen_furl = self.get_config("client", "key_generator.furl", None)
119         if key_gen_furl:
120             self.init_key_gen(key_gen_furl)
121         self.init_client()
122         # ControlServer and Helper are attached after Tub startup
123         self.init_ftp_server()
124         self.init_sftp_server()
125
126         hotline_file = os.path.join(self.basedir,
127                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
128         if os.path.exists(hotline_file):
129             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
130             self.log("hotline file noticed (%ds old), starting timer" % age)
131             hotline = TimerService(1.0, self._check_hotline, hotline_file)
132             hotline.setServiceParent(self)
133
134         # this needs to happen last, so it can use getServiceNamed() to
135         # acquire references to StorageServer and other web-statusable things
136         webport = self.get_config("node", "web.port", None)
137         if webport:
138             self.init_web(webport) # strports string
139
140     def read_old_config_files(self):
141         node.Node.read_old_config_files(self)
142         copy = self._copy_config_from_file
143         copy("introducer.furl", "client", "introducer.furl")
144         copy("helper.furl", "client", "helper.furl")
145         copy("key_generator.furl", "client", "key_generator.furl")
146         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
147         if os.path.exists(os.path.join(self.basedir, "no_storage")):
148             self.set_config("storage", "enabled", "false")
149         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
150             self.set_config("storage", "readonly", "true")
151         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
152             self.set_config("storage", "debug_discard", "true")
153         if os.path.exists(os.path.join(self.basedir, "run_helper")):
154             self.set_config("helper", "enabled", "true")
155
156     def init_introducer_client(self):
157         self.introducer_furl = self.get_config("client", "introducer.furl")
158         ic = IntroducerClient(self.tub, self.introducer_furl,
159                               self.nickname,
160                               str(allmydata.__full_version__),
161                               str(self.OLDEST_SUPPORTED_VERSION))
162         self.introducer_client = ic
163         # hold off on starting the IntroducerClient until our tub has been
164         # started, so we'll have a useful address on our RemoteReference, so
165         # that the introducer's status page will show us.
166         d = self.when_tub_ready()
167         def _start_introducer_client(res):
168             ic.setServiceParent(self)
169         d.addCallback(_start_introducer_client)
170         d.addErrback(log.err, facility="tahoe.init",
171                      level=log.BAD, umid="URyI5w")
172
173     def init_stats_provider(self):
174         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
175         self.stats_provider = StatsProvider(self, gatherer_furl)
176         self.add_service(self.stats_provider)
177         self.stats_provider.register_producer(self)
178
179     def get_stats(self):
180         return { 'node.uptime': time.time() - self.started_timestamp }
181
182     def init_lease_secret(self):
183         secret_s = self.get_or_create_private_config("secret", _make_secret)
184         lease_secret = base32.a2b(secret_s)
185         self._secret_holder = SecretHolder(lease_secret)
186
187     def init_storage(self):
188         # should we run a storage server (and publish it for others to use)?
189         if not self.get_config("storage", "enabled", True, boolean=True):
190             return
191         readonly = self.get_config("storage", "readonly", False, boolean=True)
192
193         storedir = os.path.join(self.basedir, self.STOREDIR)
194
195         data = self.get_config("storage", "reserved_space", None)
196         reserved = None
197         try:
198             reserved = parse_abbreviated_size(data)
199         except ValueError:
200             log.msg("[storage]reserved_space= contains unparseable value %s"
201                     % data)
202         if reserved is None:
203             reserved = 0
204         discard = self.get_config("storage", "debug_discard", False,
205                                   boolean=True)
206
207         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
208         if expire:
209             mode = self.get_config("storage", "expire.mode") # require a mode
210         else:
211             mode = self.get_config("storage", "expire.mode", "age")
212
213         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
214         if o_l_d is not None:
215             o_l_d = parse_duration(o_l_d)
216
217         cutoff_date = None
218         if mode == "cutoff-date":
219             cutoff_date = self.get_config("storage", "expire.cutoff_date")
220             cutoff_date = parse_date(cutoff_date)
221
222         sharetypes = []
223         if self.get_config("storage", "expire.immutable", True, boolean=True):
224             sharetypes.append("immutable")
225         if self.get_config("storage", "expire.mutable", True, boolean=True):
226             sharetypes.append("mutable")
227         expiration_sharetypes = tuple(sharetypes)
228
229         ss = StorageServer(storedir, self.nodeid,
230                            reserved_space=reserved,
231                            discard_storage=discard,
232                            readonly_storage=readonly,
233                            stats_provider=self.stats_provider,
234                            expiration_enabled=expire,
235                            expiration_mode=mode,
236                            expiration_override_lease_duration=o_l_d,
237                            expiration_cutoff_date=cutoff_date,
238                            expiration_sharetypes=expiration_sharetypes)
239         self.add_service(ss)
240
241         d = self.when_tub_ready()
242         # we can't do registerReference until the Tub is ready
243         def _publish(res):
244             furl_file = os.path.join(self.basedir, "private", "storage.furl")
245             furl = self.tub.registerReference(ss, furlFile=furl_file)
246             ri_name = RIStorageServer.__remote_name__
247             self.introducer_client.publish(furl, "storage", ri_name)
248         d.addCallback(_publish)
249         d.addErrback(log.err, facility="tahoe.init",
250                      level=log.BAD, umid="aLGBKw")
251
252     def init_client(self):
253         helper_furl = self.get_config("client", "helper.furl", None)
254         DEP = self.DEFAULT_ENCODING_PARAMETERS
255         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
256         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
257         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
258         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
259         self.convergence = base32.a2b(convergence_s)
260
261         self.init_client_storage_broker()
262         self.history = History(self.stats_provider)
263         self.add_service(Uploader(helper_furl, self.stats_provider))
264         download_cachedir = os.path.join(self.basedir,
265                                          "private", "cache", "download")
266         self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
267         self.download_cache_dirman.setServiceParent(self)
268         self.downloader = Downloader(self.storage_broker, self.stats_provider)
269         self.init_stub_client()
270         self.init_nodemaker()
271
272     def init_client_storage_broker(self):
273         # create a StorageFarmBroker object, for use by Uploader/Downloader
274         # (and everybody else who wants to use storage servers)
275         sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
276         self.storage_broker = sb
277
278         # load static server specifications from tahoe.cfg, if any.
279         # Not quite ready yet.
280         #if self.config.has_section("client-server-selection"):
281         #    server_params = {} # maps serverid to dict of parameters
282         #    for (name, value) in self.config.items("client-server-selection"):
283         #        pieces = name.split(".")
284         #        if pieces[0] == "server":
285         #            serverid = pieces[1]
286         #            if serverid not in server_params:
287         #                server_params[serverid] = {}
288         #            server_params[serverid][pieces[2]] = value
289         #    for serverid, params in server_params.items():
290         #        server_type = params.pop("type")
291         #        if server_type == "tahoe-foolscap":
292         #            s = storage_client.NativeStorageClient(*params)
293         #        else:
294         #            msg = ("unrecognized server type '%s' in "
295         #                   "tahoe.cfg [client-server-selection]server.%s.type"
296         #                   % (server_type, serverid))
297         #            raise storage_client.UnknownServerTypeError(msg)
298         #        sb.add_server(s.serverid, s)
299
300         # check to see if we're supposed to use the introducer too
301         if self.get_config("client-server-selection", "use_introducer",
302                            default=True, boolean=True):
303             sb.use_introducer(self.introducer_client)
304
305     def get_storage_broker(self):
306         return self.storage_broker
307
308     def init_stub_client(self):
309         def _publish(res):
310             # we publish an empty object so that the introducer can count how
311             # many clients are connected and see what versions they're
312             # running.
313             sc = StubClient()
314             furl = self.tub.registerReference(sc)
315             ri_name = RIStubClient.__remote_name__
316             self.introducer_client.publish(furl, "stub_client", ri_name)
317         d = self.when_tub_ready()
318         d.addCallback(_publish)
319         d.addErrback(log.err, facility="tahoe.init",
320                      level=log.BAD, umid="OEHq3g")
321
322     def init_nodemaker(self):
323         self.nodemaker = NodeMaker(self.storage_broker,
324                                    self._secret_holder,
325                                    self.get_history(),
326                                    self.getServiceNamed("uploader"),
327                                    self.downloader,
328                                    self.download_cache_dirman,
329                                    self.get_encoding_parameters(),
330                                    self._key_generator)
331
332     def get_history(self):
333         return self.history
334
335     def init_control(self):
336         d = self.when_tub_ready()
337         def _publish(res):
338             c = ControlServer()
339             c.setServiceParent(self)
340             control_url = self.tub.registerReference(c)
341             self.write_private_config("control.furl", control_url + "\n")
342         d.addCallback(_publish)
343         d.addErrback(log.err, facility="tahoe.init",
344                      level=log.BAD, umid="d3tNXA")
345
346     def init_helper(self):
347         d = self.when_tub_ready()
348         def _publish(self):
349             self.helper = Helper(os.path.join(self.basedir, "helper"),
350                                  self.storage_broker, self._secret_holder,
351                                  self.stats_provider, self.history)
352             # TODO: this is confusing. BASEDIR/private/helper.furl is created
353             # by the helper. BASEDIR/helper.furl is consumed by the client
354             # who wants to use the helper. I like having the filename be the
355             # same, since that makes 'cp' work smoothly, but the difference
356             # between config inputs and generated outputs is hard to see.
357             helper_furlfile = os.path.join(self.basedir,
358                                            "private", "helper.furl")
359             self.tub.registerReference(self.helper, furlFile=helper_furlfile)
360         d.addCallback(_publish)
361         d.addErrback(log.err, facility="tahoe.init",
362                      level=log.BAD, umid="K0mW5w")
363
364     def init_key_gen(self, key_gen_furl):
365         d = self.when_tub_ready()
366         def _subscribe(self):
367             self.tub.connectTo(key_gen_furl, self._got_key_generator)
368         d.addCallback(_subscribe)
369         d.addErrback(log.err, facility="tahoe.init",
370                      level=log.BAD, umid="z9DMzw")
371
372     def _got_key_generator(self, key_generator):
373         self._key_generator.set_remote_generator(key_generator)
374         key_generator.notifyOnDisconnect(self._lost_key_generator)
375
376     def _lost_key_generator(self):
377         self._key_generator.set_remote_generator(None)
378
379     def set_default_mutable_keysize(self, keysize):
380         self._key_generator.set_default_keysize(keysize)
381
382     def init_web(self, webport):
383         self.log("init_web(webport=%s)", args=(webport,))
384
385         from allmydata.webish import WebishServer
386         nodeurl_path = os.path.join(self.basedir, "node.url")
387         staticdir = self.get_config("node", "web.static", "public_html")
388         staticdir = os.path.expanduser(staticdir)
389         ws = WebishServer(self, webport, nodeurl_path, staticdir)
390         self.add_service(ws)
391
392     def init_ftp_server(self):
393         if self.get_config("ftpd", "enabled", False, boolean=True):
394             accountfile = self.get_config("ftpd", "accounts.file", None)
395             accounturl = self.get_config("ftpd", "accounts.url", None)
396             ftp_portstr = self.get_config("ftpd", "port", "8021")
397
398             from allmydata.frontends import ftpd
399             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
400             s.setServiceParent(self)
401
402     def init_sftp_server(self):
403         if self.get_config("sftpd", "enabled", False, boolean=True):
404             accountfile = self.get_config("sftpd", "accounts.file", None)
405             accounturl = self.get_config("sftpd", "accounts.url", None)
406             sftp_portstr = self.get_config("sftpd", "port", "8022")
407             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
408             privkey_file = self.get_config("sftpd", "host_privkey_file")
409
410             from allmydata.frontends import sftpd
411             s = sftpd.SFTPServer(self, accountfile, accounturl,
412                                  sftp_portstr, pubkey_file, privkey_file)
413             s.setServiceParent(self)
414
415     def _check_hotline(self, hotline_file):
416         if os.path.exists(hotline_file):
417             mtime = os.stat(hotline_file)[stat.ST_MTIME]
418             if mtime > time.time() - 120.0:
419                 return
420             else:
421                 self.log("hotline file too old, shutting down")
422         else:
423             self.log("hotline file missing, shutting down")
424         reactor.stop()
425
426     def get_encoding_parameters(self):
427         return self.DEFAULT_ENCODING_PARAMETERS
428
429     def connected_to_introducer(self):
430         if self.introducer_client:
431             return self.introducer_client.connected_to_introducer()
432         return False
433
434     def get_renewal_secret(self): # this will go away
435         return self._secret_holder.get_renewal_secret()
436
437     def get_cancel_secret(self):
438         return self._secret_holder.get_cancel_secret()
439
440     def debug_wait_for_client_connections(self, num_clients):
441         """Return a Deferred that fires (with None) when we have connections
442         to the given number of peers. Useful for tests that set up a
443         temporary test network and need to know when it is safe to proceed
444         with an upload or download."""
445         def _check():
446             return len(self.storage_broker.get_all_servers()) >= num_clients
447         d = self.poll(_check, 0.5)
448         d.addCallback(lambda res: None)
449         return d
450
451
452     # these four methods are the primitives for creating filenodes and
453     # dirnodes. The first takes a URI and produces a filenode or (new-style)
454     # dirnode. The other three create brand-new filenodes/dirnodes.
455
456     def create_node_from_uri(self, writecap, readcap=None):
457         # this returns synchronously.
458         return self.nodemaker.create_from_cap(writecap, readcap)
459
460     def create_dirnode(self, initial_children={}):
461         d = self.nodemaker.create_new_mutable_directory()
462         assert not initial_children, "not ready yet: %s" % (initial_children,)
463         if initial_children:
464             d.addCallback(lambda n: n.set_children(initial_children))
465         return d
466
467     def create_mutable_file(self, contents="", keysize=None):
468         return self.nodemaker.create_mutable_file(contents, keysize)
469
470     def upload(self, uploadable):
471         uploader = self.getServiceNamed("uploader")
472         return uploader.upload(uploadable, history=self.get_history())