]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
3cb4cd66644722480529311ebae93c9d7a151696
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
4
5 from zope.interface import implements
6 from twisted.internet import reactor, defer
7 from twisted.application import service
8 from twisted.application.internet import TimerService
9 from foolscap.api import Referenceable
10 from pycryptopp.publickey import rsa
11
12 import allmydata
13 from allmydata.storage.server import StorageServer
14 from allmydata import storage_client
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.offloaded import Helper
17 from allmydata.control import ControlServer
18 from allmydata.introducer.client import IntroducerClient
19 from allmydata.util import hashutil, base32, pollmixin, log
20 from allmydata.util.encodingutil import get_filesystem_encoding
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.stats import StatsProvider
24 from allmydata.history import History
25 from allmydata.interfaces import IStatsProducer, RIStubClient, \
26                                  SDMF_VERSION, MDMF_VERSION
27 from allmydata.nodemaker import NodeMaker
28
29
30 KiB=1024
31 MiB=1024*KiB
32 GiB=1024*MiB
33 TiB=1024*GiB
34 PiB=1024*TiB
35
36 class StubClient(Referenceable):
37     implements(RIStubClient)
38
39 def _make_secret():
40     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
41
42 class SecretHolder:
43     def __init__(self, lease_secret, convergence_secret):
44         self._lease_secret = lease_secret
45         self._convergence_secret = convergence_secret
46
47     def get_renewal_secret(self):
48         return hashutil.my_renewal_secret_hash(self._lease_secret)
49
50     def get_cancel_secret(self):
51         return hashutil.my_cancel_secret_hash(self._lease_secret)
52
53     def get_convergence_secret(self):
54         return self._convergence_secret
55
56 class KeyGenerator:
57     """I create RSA keys for mutable files. Each call to generate() returns a
58     single keypair. The keysize is specified first by the keysize= argument
59     to generate(), then with a default set by set_default_keysize(), then
60     with a built-in default of 2048 bits."""
61     def __init__(self):
62         self._remote = None
63         self.default_keysize = 2048
64
65     def set_remote_generator(self, keygen):
66         self._remote = keygen
67     def set_default_keysize(self, keysize):
68         """Call this to override the size of the RSA keys created for new
69         mutable files which don't otherwise specify a size. This will affect
70         all subsequent calls to generate() without a keysize= argument. The
71         default size is 2048 bits. Test cases should call this method once
72         during setup, to cause me to create smaller keys, so the unit tests
73         run faster."""
74         self.default_keysize = keysize
75
76     def generate(self, keysize=None):
77         """I return a Deferred that fires with a (verifyingkey, signingkey)
78         pair. I accept a keysize in bits (2048 bit keys are standard, smaller
79         keys are used for testing). If you do not provide a keysize, I will
80         use my default, which is set by a call to set_default_keysize(). If
81         set_default_keysize() has never been called, I will create 2048 bit
82         keys."""
83         keysize = keysize or self.default_keysize
84         if self._remote:
85             d = self._remote.callRemote('get_rsa_key_pair', keysize)
86             def make_key_objs((verifying_key, signing_key)):
87                 v = rsa.create_verifying_key_from_string(verifying_key)
88                 s = rsa.create_signing_key_from_string(signing_key)
89                 return v, s
90             d.addCallback(make_key_objs)
91             return d
92         else:
93             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
94             # secs
95             signer = rsa.generate(keysize)
96             verifier = signer.get_verifying_key()
97             return defer.succeed( (verifier, signer) )
98
99 class Terminator(service.Service):
100     def __init__(self):
101         self._clients = weakref.WeakKeyDictionary()
102     def register(self, c):
103         self._clients[c] = None
104     def stopService(self):
105         for c in self._clients:
106             c.stop()
107         return service.Service.stopService(self)
108
109
110 class Client(node.Node, pollmixin.PollMixin):
111     implements(IStatsProducer)
112
113     PORTNUMFILE = "client.port"
114     STOREDIR = 'storage'
115     NODETYPE = "client"
116     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
117
118     # This means that if a storage server treats me as though I were a
119     # 1.0.0 storage client, it will work as they expect.
120     OLDEST_SUPPORTED_VERSION = "1.0.0"
121
122     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
123     # is the number of shares required to reconstruct a file. 'desired' means
124     # that we will abort an upload unless we can allocate space for at least
125     # this many. 'total' is the total number of shares created by encoding.
126     # If everybody has room then this is is how many we will upload.
127     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
128                                    "happy": 7,
129                                    "n": 10,
130                                    "max_segment_size": 128*KiB,
131                                    }
132
133     def __init__(self, basedir="."):
134         node.Node.__init__(self, basedir)
135         self.started_timestamp = time.time()
136         self.logSource="Client"
137         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
138         self.init_introducer_client()
139         self.init_stats_provider()
140         self.init_secrets()
141         self.init_storage()
142         self.init_control()
143         self.helper = None
144         if self.get_config("helper", "enabled", False, boolean=True):
145             self.init_helper()
146         self._key_generator = KeyGenerator()
147         key_gen_furl = self.get_config("client", "key_generator.furl", None)
148         if key_gen_furl:
149             self.init_key_gen(key_gen_furl)
150         self.init_client()
151         # ControlServer and Helper are attached after Tub startup
152         self.init_ftp_server()
153         self.init_sftp_server()
154         self.init_drop_uploader()
155
156         hotline_file = os.path.join(self.basedir,
157                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
158         if os.path.exists(hotline_file):
159             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
160             self.log("hotline file noticed (%ds old), starting timer" % age)
161             hotline = TimerService(1.0, self._check_hotline, hotline_file)
162             hotline.setServiceParent(self)
163
164         # this needs to happen last, so it can use getServiceNamed() to
165         # acquire references to StorageServer and other web-statusable things
166         webport = self.get_config("node", "web.port", None)
167         if webport:
168             self.init_web(webport) # strports string
169
170     def init_introducer_client(self):
171         self.introducer_furl = self.get_config("client", "introducer.furl")
172         ic = IntroducerClient(self.tub, self.introducer_furl,
173                               self.nickname,
174                               str(allmydata.__full_version__),
175                               str(self.OLDEST_SUPPORTED_VERSION))
176         self.introducer_client = ic
177         # hold off on starting the IntroducerClient until our tub has been
178         # started, so we'll have a useful address on our RemoteReference, so
179         # that the introducer's status page will show us.
180         d = self.when_tub_ready()
181         def _start_introducer_client(res):
182             ic.setServiceParent(self)
183         d.addCallback(_start_introducer_client)
184         d.addErrback(log.err, facility="tahoe.init",
185                      level=log.BAD, umid="URyI5w")
186
187     def init_stats_provider(self):
188         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
189         self.stats_provider = StatsProvider(self, gatherer_furl)
190         self.add_service(self.stats_provider)
191         self.stats_provider.register_producer(self)
192
193     def get_stats(self):
194         return { 'node.uptime': time.time() - self.started_timestamp }
195
196     def init_secrets(self):
197         lease_s = self.get_or_create_private_config("secret", _make_secret)
198         lease_secret = base32.a2b(lease_s)
199         convergence_s = self.get_or_create_private_config('convergence',
200                                                           _make_secret)
201         self.convergence = base32.a2b(convergence_s)
202         self._secret_holder = SecretHolder(lease_secret, self.convergence)
203
204     def init_storage(self):
205         # should we run a storage server (and publish it for others to use)?
206         if not self.get_config("storage", "enabled", True, boolean=True):
207             return
208         readonly = self.get_config("storage", "readonly", False, boolean=True)
209
210         storedir = os.path.join(self.basedir, self.STOREDIR)
211
212         data = self.get_config("storage", "reserved_space", None)
213         reserved = None
214         try:
215             reserved = parse_abbreviated_size(data)
216         except ValueError:
217             log.msg("[storage]reserved_space= contains unparseable value %s"
218                     % data)
219         if reserved is None:
220             reserved = 0
221         discard = self.get_config("storage", "debug_discard", False,
222                                   boolean=True)
223
224         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
225         if expire:
226             mode = self.get_config("storage", "expire.mode") # require a mode
227         else:
228             mode = self.get_config("storage", "expire.mode", "age")
229
230         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
231         if o_l_d is not None:
232             o_l_d = parse_duration(o_l_d)
233
234         cutoff_date = None
235         if mode == "cutoff-date":
236             cutoff_date = self.get_config("storage", "expire.cutoff_date")
237             cutoff_date = parse_date(cutoff_date)
238
239         sharetypes = []
240         if self.get_config("storage", "expire.immutable", True, boolean=True):
241             sharetypes.append("immutable")
242         if self.get_config("storage", "expire.mutable", True, boolean=True):
243             sharetypes.append("mutable")
244         expiration_sharetypes = tuple(sharetypes)
245
246         ss = StorageServer(storedir, self.nodeid,
247                            reserved_space=reserved,
248                            discard_storage=discard,
249                            readonly_storage=readonly,
250                            stats_provider=self.stats_provider,
251                            expiration_enabled=expire,
252                            expiration_mode=mode,
253                            expiration_override_lease_duration=o_l_d,
254                            expiration_cutoff_date=cutoff_date,
255                            expiration_sharetypes=expiration_sharetypes)
256         self.add_service(ss)
257
258         d = self.when_tub_ready()
259         # we can't do registerReference until the Tub is ready
260         def _publish(res):
261             furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
262             furl = self.tub.registerReference(ss, furlFile=furl_file)
263             ri_name = RIStorageServer.__remote_name__
264             self.introducer_client.publish(furl, "storage", ri_name)
265         d.addCallback(_publish)
266         d.addErrback(log.err, facility="tahoe.init",
267                      level=log.BAD, umid="aLGBKw")
268
269     def init_client(self):
270         helper_furl = self.get_config("client", "helper.furl", None)
271         DEP = self.DEFAULT_ENCODING_PARAMETERS
272         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
273         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
274         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
275
276         self.init_client_storage_broker()
277         self.history = History(self.stats_provider)
278         self.terminator = Terminator()
279         self.terminator.setServiceParent(self)
280         self.add_service(Uploader(helper_furl, self.stats_provider))
281         self.init_stub_client()
282         self.init_nodemaker()
283
284     def init_client_storage_broker(self):
285         # create a StorageFarmBroker object, for use by Uploader/Downloader
286         # (and everybody else who wants to use storage servers)
287         sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
288         self.storage_broker = sb
289
290         # load static server specifications from tahoe.cfg, if any.
291         # Not quite ready yet.
292         #if self.config.has_section("client-server-selection"):
293         #    server_params = {} # maps serverid to dict of parameters
294         #    for (name, value) in self.config.items("client-server-selection"):
295         #        pieces = name.split(".")
296         #        if pieces[0] == "server":
297         #            serverid = pieces[1]
298         #            if serverid not in server_params:
299         #                server_params[serverid] = {}
300         #            server_params[serverid][pieces[2]] = value
301         #    for serverid, params in server_params.items():
302         #        server_type = params.pop("type")
303         #        if server_type == "tahoe-foolscap":
304         #            s = storage_client.NativeStorageClient(*params)
305         #        else:
306         #            msg = ("unrecognized server type '%s' in "
307         #                   "tahoe.cfg [client-server-selection]server.%s.type"
308         #                   % (server_type, serverid))
309         #            raise storage_client.UnknownServerTypeError(msg)
310         #        sb.add_server(s.serverid, s)
311
312         # check to see if we're supposed to use the introducer too
313         if self.get_config("client-server-selection", "use_introducer",
314                            default=True, boolean=True):
315             sb.use_introducer(self.introducer_client)
316
317     def get_storage_broker(self):
318         return self.storage_broker
319
320     def init_stub_client(self):
321         def _publish(res):
322             # we publish an empty object so that the introducer can count how
323             # many clients are connected and see what versions they're
324             # running.
325             sc = StubClient()
326             furl = self.tub.registerReference(sc)
327             ri_name = RIStubClient.__remote_name__
328             self.introducer_client.publish(furl, "stub_client", ri_name)
329         d = self.when_tub_ready()
330         d.addCallback(_publish)
331         d.addErrback(log.err, facility="tahoe.init",
332                      level=log.BAD, umid="OEHq3g")
333
334     def init_nodemaker(self):
335         self.nodemaker = NodeMaker(self.storage_broker,
336                                    self._secret_holder,
337                                    self.get_history(),
338                                    self.getServiceNamed("uploader"),
339                                    self.terminator,
340                                    self.get_encoding_parameters(),
341                                    self._key_generator)
342         default = self.get_config("client", "mutable.format", default="sdmf")
343         if default == "mdmf":
344             self.mutable_file_default = MDMF_VERSION
345         else:
346             self.mutable_file_default = SDMF_VERSION
347
348     def get_history(self):
349         return self.history
350
351     def init_control(self):
352         d = self.when_tub_ready()
353         def _publish(res):
354             c = ControlServer()
355             c.setServiceParent(self)
356             control_url = self.tub.registerReference(c)
357             self.write_private_config("control.furl", control_url + "\n")
358         d.addCallback(_publish)
359         d.addErrback(log.err, facility="tahoe.init",
360                      level=log.BAD, umid="d3tNXA")
361
362     def init_helper(self):
363         d = self.when_tub_ready()
364         def _publish(self):
365             self.helper = Helper(os.path.join(self.basedir, "helper"),
366                                  self.storage_broker, self._secret_holder,
367                                  self.stats_provider, self.history)
368             # TODO: this is confusing. BASEDIR/private/helper.furl is created
369             # by the helper. BASEDIR/helper.furl is consumed by the client
370             # who wants to use the helper. I like having the filename be the
371             # same, since that makes 'cp' work smoothly, but the difference
372             # between config inputs and generated outputs is hard to see.
373             helper_furlfile = os.path.join(self.basedir,
374                                            "private", "helper.furl").encode(get_filesystem_encoding())
375             self.tub.registerReference(self.helper, furlFile=helper_furlfile)
376         d.addCallback(_publish)
377         d.addErrback(log.err, facility="tahoe.init",
378                      level=log.BAD, umid="K0mW5w")
379
380     def init_key_gen(self, key_gen_furl):
381         d = self.when_tub_ready()
382         def _subscribe(self):
383             self.tub.connectTo(key_gen_furl, self._got_key_generator)
384         d.addCallback(_subscribe)
385         d.addErrback(log.err, facility="tahoe.init",
386                      level=log.BAD, umid="z9DMzw")
387
388     def _got_key_generator(self, key_generator):
389         self._key_generator.set_remote_generator(key_generator)
390         key_generator.notifyOnDisconnect(self._lost_key_generator)
391
392     def _lost_key_generator(self):
393         self._key_generator.set_remote_generator(None)
394
395     def set_default_mutable_keysize(self, keysize):
396         self._key_generator.set_default_keysize(keysize)
397
398     def init_web(self, webport):
399         self.log("init_web(webport=%s)", args=(webport,))
400
401         from allmydata.webish import WebishServer
402         nodeurl_path = os.path.join(self.basedir, "node.url")
403         staticdir = self.get_config("node", "web.static", "public_html")
404         staticdir = os.path.expanduser(staticdir)
405         ws = WebishServer(self, webport, nodeurl_path, staticdir)
406         self.add_service(ws)
407
408     def init_ftp_server(self):
409         if self.get_config("ftpd", "enabled", False, boolean=True):
410             accountfile = self.get_config("ftpd", "accounts.file", None)
411             accounturl = self.get_config("ftpd", "accounts.url", None)
412             ftp_portstr = self.get_config("ftpd", "port", "8021")
413
414             from allmydata.frontends import ftpd
415             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
416             s.setServiceParent(self)
417
418     def init_sftp_server(self):
419         if self.get_config("sftpd", "enabled", False, boolean=True):
420             accountfile = self.get_config("sftpd", "accounts.file", None)
421             accounturl = self.get_config("sftpd", "accounts.url", None)
422             sftp_portstr = self.get_config("sftpd", "port", "8022")
423             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
424             privkey_file = self.get_config("sftpd", "host_privkey_file")
425
426             from allmydata.frontends import sftpd
427             s = sftpd.SFTPServer(self, accountfile, accounturl,
428                                  sftp_portstr, pubkey_file, privkey_file)
429             s.setServiceParent(self)
430
431     def init_drop_uploader(self):
432         if self.get_config("drop_upload", "enabled", False, boolean=True):
433             upload_dircap = self.get_config("drop_upload", "upload.dircap", None)
434             local_dir_utf8 = self.get_config("drop_upload", "local.directory", None)
435
436             if upload_dircap and local_dir_utf8:
437                 try:
438                     from allmydata.frontends import drop_upload
439                     s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
440                     s.setServiceParent(self)
441                     s.startService()
442                 except Exception, e:
443                     self.log("couldn't start drop-uploader: %r", args=(e,))
444             else:
445                 self.log("couldn't start drop-uploader: upload.dircap or local.directory not specified")
446
447     def _check_hotline(self, hotline_file):
448         if os.path.exists(hotline_file):
449             mtime = os.stat(hotline_file)[stat.ST_MTIME]
450             if mtime > time.time() - 120.0:
451                 return
452             else:
453                 self.log("hotline file too old, shutting down")
454         else:
455             self.log("hotline file missing, shutting down")
456         reactor.stop()
457
458     def get_encoding_parameters(self):
459         return self.DEFAULT_ENCODING_PARAMETERS
460
461     def connected_to_introducer(self):
462         if self.introducer_client:
463             return self.introducer_client.connected_to_introducer()
464         return False
465
466     def get_renewal_secret(self): # this will go away
467         return self._secret_holder.get_renewal_secret()
468
469     def get_cancel_secret(self):
470         return self._secret_holder.get_cancel_secret()
471
472     def debug_wait_for_client_connections(self, num_clients):
473         """Return a Deferred that fires (with None) when we have connections
474         to the given number of peers. Useful for tests that set up a
475         temporary test network and need to know when it is safe to proceed
476         with an upload or download."""
477         def _check():
478             return len(self.storage_broker.get_connected_servers()) >= num_clients
479         d = self.poll(_check, 0.5)
480         d.addCallback(lambda res: None)
481         return d
482
483
484     # these four methods are the primitives for creating filenodes and
485     # dirnodes. The first takes a URI and produces a filenode or (new-style)
486     # dirnode. The other three create brand-new filenodes/dirnodes.
487
488     def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
489         # This returns synchronously.
490         # Note that it does *not* validate the write_uri and read_uri; instead we
491         # may get an opaque node if there were any problems.
492         return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
493
494     def create_dirnode(self, initial_children={}, version=SDMF_VERSION):
495         d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
496         return d
497
498     def create_immutable_dirnode(self, children, convergence=None):
499         return self.nodemaker.create_immutable_directory(children, convergence)
500
501     def create_mutable_file(self, contents=None, keysize=None, version=None):
502         if not version:
503             version = self.mutable_file_default
504         return self.nodemaker.create_mutable_file(contents, keysize,
505                                                   version=version)
506
507     def upload(self, uploadable):
508         uploader = self.getServiceNamed("uploader")
509         return uploader.upload(uploadable, history=self.get_history())