]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
create_node_from_uri: take both writecap+readcap, move logic out of dirnode.py
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
4
5 from zope.interface import implements
6 from twisted.internet import reactor
7 from twisted.application.internet import TimerService
8 from foolscap.api import Referenceable
9 from pycryptopp.publickey import rsa
10
11 import allmydata
12 from allmydata.storage.server import StorageServer
13 from allmydata import storage_client
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.download import Downloader
16 from allmydata.immutable.filenode import FileNode, LiteralFileNode
17 from allmydata.immutable.offloaded import Helper
18 from allmydata.control import ControlServer
19 from allmydata.introducer.client import IntroducerClient
20 from allmydata.util import hashutil, base32, pollmixin, cachedir, log
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.uri import LiteralFileURI
24 from allmydata.dirnode import NewDirectoryNode
25 from allmydata.mutable.filenode import MutableFileNode
26 from allmydata.stats import StatsProvider
27 from allmydata.history import History
28 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
29      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
30
31 KiB=1024
32 MiB=1024*KiB
33 GiB=1024*MiB
34 TiB=1024*GiB
35 PiB=1024*TiB
36
37 class StubClient(Referenceable):
38     implements(RIStubClient)
39
40 def _make_secret():
41     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
42
43 class Client(node.Node, pollmixin.PollMixin):
44     implements(IStatsProducer)
45
46     PORTNUMFILE = "client.port"
47     STOREDIR = 'storage'
48     NODETYPE = "client"
49     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
50
51     # This means that if a storage server treats me as though I were a
52     # 1.0.0 storage client, it will work as they expect.
53     OLDEST_SUPPORTED_VERSION = "1.0.0"
54
55     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
56     # is the number of shares required to reconstruct a file. 'desired' means
57     # that we will abort an upload unless we can allocate space for at least
58     # this many. 'total' is the total number of shares created by encoding.
59     # If everybody has room then this is is how many we will upload.
60     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
61                                    "happy": 7,
62                                    "n": 10,
63                                    "max_segment_size": 128*KiB,
64                                    }
65
66     # set this to override the size of the RSA keys created for new mutable
67     # files. The default of None means to let mutable.filenode choose its own
68     # size, which means 2048 bits.
69     DEFAULT_MUTABLE_KEYSIZE = None
70
71     def __init__(self, basedir="."):
72         node.Node.__init__(self, basedir)
73         self.started_timestamp = time.time()
74         self.logSource="Client"
75         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
76         self.init_introducer_client()
77         self.init_stats_provider()
78         self.init_lease_secret()
79         self.init_storage()
80         self.init_control()
81         if self.get_config("helper", "enabled", False, boolean=True):
82             self.init_helper()
83         self.init_client()
84         self._key_generator = None
85         key_gen_furl = self.get_config("client", "key_generator.furl", None)
86         if key_gen_furl:
87             self.init_key_gen(key_gen_furl)
88         # ControlServer and Helper are attached after Tub startup
89         self.init_ftp_server()
90         self.init_sftp_server()
91
92         hotline_file = os.path.join(self.basedir,
93                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
94         if os.path.exists(hotline_file):
95             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
96             self.log("hotline file noticed (%ds old), starting timer" % age)
97             hotline = TimerService(1.0, self._check_hotline, hotline_file)
98             hotline.setServiceParent(self)
99
100         # this needs to happen last, so it can use getServiceNamed() to
101         # acquire references to StorageServer and other web-statusable things
102         webport = self.get_config("node", "web.port", None)
103         if webport:
104             self.init_web(webport) # strports string
105
106     def read_old_config_files(self):
107         node.Node.read_old_config_files(self)
108         copy = self._copy_config_from_file
109         copy("introducer.furl", "client", "introducer.furl")
110         copy("helper.furl", "client", "helper.furl")
111         copy("key_generator.furl", "client", "key_generator.furl")
112         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
113         if os.path.exists(os.path.join(self.basedir, "no_storage")):
114             self.set_config("storage", "enabled", "false")
115         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
116             self.set_config("storage", "readonly", "true")
117         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
118             self.set_config("storage", "debug_discard", "true")
119         if os.path.exists(os.path.join(self.basedir, "run_helper")):
120             self.set_config("helper", "enabled", "true")
121
122     def init_introducer_client(self):
123         self.introducer_furl = self.get_config("client", "introducer.furl")
124         ic = IntroducerClient(self.tub, self.introducer_furl,
125                               self.nickname,
126                               str(allmydata.__full_version__),
127                               str(self.OLDEST_SUPPORTED_VERSION))
128         self.introducer_client = ic
129         # hold off on starting the IntroducerClient until our tub has been
130         # started, so we'll have a useful address on our RemoteReference, so
131         # that the introducer's status page will show us.
132         d = self.when_tub_ready()
133         def _start_introducer_client(res):
134             ic.setServiceParent(self)
135         d.addCallback(_start_introducer_client)
136         d.addErrback(log.err, facility="tahoe.init",
137                      level=log.BAD, umid="URyI5w")
138
139     def init_stats_provider(self):
140         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
141         self.stats_provider = StatsProvider(self, gatherer_furl)
142         self.add_service(self.stats_provider)
143         self.stats_provider.register_producer(self)
144
145     def get_stats(self):
146         return { 'node.uptime': time.time() - self.started_timestamp }
147
148     def init_lease_secret(self):
149         secret_s = self.get_or_create_private_config("secret", _make_secret)
150         self._lease_secret = base32.a2b(secret_s)
151
152     def init_storage(self):
153         # should we run a storage server (and publish it for others to use)?
154         if not self.get_config("storage", "enabled", True, boolean=True):
155             return
156         readonly = self.get_config("storage", "readonly", False, boolean=True)
157
158         storedir = os.path.join(self.basedir, self.STOREDIR)
159
160         data = self.get_config("storage", "reserved_space", None)
161         reserved = None
162         try:
163             reserved = parse_abbreviated_size(data)
164         except ValueError:
165             log.msg("[storage]reserved_space= contains unparseable value %s"
166                     % data)
167         if reserved is None:
168             reserved = 0
169         discard = self.get_config("storage", "debug_discard", False,
170                                   boolean=True)
171
172         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
173         if expire:
174             mode = self.get_config("storage", "expire.mode") # require a mode
175         else:
176             mode = self.get_config("storage", "expire.mode", "age")
177
178         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
179         if o_l_d is not None:
180             o_l_d = parse_duration(o_l_d)
181
182         cutoff_date = None
183         if mode == "cutoff-date":
184             cutoff_date = self.get_config("storage", "expire.cutoff_date")
185             cutoff_date = parse_date(cutoff_date)
186
187         sharetypes = []
188         if self.get_config("storage", "expire.immutable", True, boolean=True):
189             sharetypes.append("immutable")
190         if self.get_config("storage", "expire.mutable", True, boolean=True):
191             sharetypes.append("mutable")
192         expiration_sharetypes = tuple(sharetypes)
193
194         ss = StorageServer(storedir, self.nodeid,
195                            reserved_space=reserved,
196                            discard_storage=discard,
197                            readonly_storage=readonly,
198                            stats_provider=self.stats_provider,
199                            expiration_enabled=expire,
200                            expiration_mode=mode,
201                            expiration_override_lease_duration=o_l_d,
202                            expiration_cutoff_date=cutoff_date,
203                            expiration_sharetypes=expiration_sharetypes)
204         self.add_service(ss)
205
206         d = self.when_tub_ready()
207         # we can't do registerReference until the Tub is ready
208         def _publish(res):
209             furl_file = os.path.join(self.basedir, "private", "storage.furl")
210             furl = self.tub.registerReference(ss, furlFile=furl_file)
211             ri_name = RIStorageServer.__remote_name__
212             self.introducer_client.publish(furl, "storage", ri_name)
213         d.addCallback(_publish)
214         d.addErrback(log.err, facility="tahoe.init",
215                      level=log.BAD, umid="aLGBKw")
216
217     def init_client(self):
218         helper_furl = self.get_config("client", "helper.furl", None)
219         DEP = self.DEFAULT_ENCODING_PARAMETERS
220         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
221         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
222         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
223         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
224         self.convergence = base32.a2b(convergence_s)
225         self._node_cache = weakref.WeakValueDictionary() # uri -> node
226
227         self.init_client_storage_broker()
228         self.add_service(History(self.stats_provider))
229         self.add_service(Uploader(helper_furl, self.stats_provider))
230         download_cachedir = os.path.join(self.basedir,
231                                          "private", "cache", "download")
232         self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
233         self.download_cache.setServiceParent(self)
234         self.add_service(Downloader(self.stats_provider))
235         self.init_stub_client()
236
237     def init_client_storage_broker(self):
238         # create a StorageFarmBroker object, for use by Uploader/Downloader
239         # (and everybody else who wants to use storage servers)
240         sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
241         self.storage_broker = sb
242
243         # load static server specifications from tahoe.cfg, if any.
244         # Not quite ready yet.
245         #if self.config.has_section("client-server-selection"):
246         #    server_params = {} # maps serverid to dict of parameters
247         #    for (name, value) in self.config.items("client-server-selection"):
248         #        pieces = name.split(".")
249         #        if pieces[0] == "server":
250         #            serverid = pieces[1]
251         #            if serverid not in server_params:
252         #                server_params[serverid] = {}
253         #            server_params[serverid][pieces[2]] = value
254         #    for serverid, params in server_params.items():
255         #        server_type = params.pop("type")
256         #        if server_type == "tahoe-foolscap":
257         #            s = storage_client.NativeStorageClient(*params)
258         #        else:
259         #            msg = ("unrecognized server type '%s' in "
260         #                   "tahoe.cfg [client-server-selection]server.%s.type"
261         #                   % (server_type, serverid))
262         #            raise storage_client.UnknownServerTypeError(msg)
263         #        sb.add_server(s.serverid, s)
264
265         # check to see if we're supposed to use the introducer too
266         if self.get_config("client-server-selection", "use_introducer",
267                            default=True, boolean=True):
268             sb.use_introducer(self.introducer_client)
269
270     def get_storage_broker(self):
271         return self.storage_broker
272
273     def init_stub_client(self):
274         def _publish(res):
275             # we publish an empty object so that the introducer can count how
276             # many clients are connected and see what versions they're
277             # running.
278             sc = StubClient()
279             furl = self.tub.registerReference(sc)
280             ri_name = RIStubClient.__remote_name__
281             self.introducer_client.publish(furl, "stub_client", ri_name)
282         d = self.when_tub_ready()
283         d.addCallback(_publish)
284         d.addErrback(log.err, facility="tahoe.init",
285                      level=log.BAD, umid="OEHq3g")
286
287     def get_history(self):
288         return self.getServiceNamed("history")
289
290     def init_control(self):
291         d = self.when_tub_ready()
292         def _publish(res):
293             c = ControlServer()
294             c.setServiceParent(self)
295             control_url = self.tub.registerReference(c)
296             self.write_private_config("control.furl", control_url + "\n")
297         d.addCallback(_publish)
298         d.addErrback(log.err, facility="tahoe.init",
299                      level=log.BAD, umid="d3tNXA")
300
301     def init_helper(self):
302         d = self.when_tub_ready()
303         def _publish(self):
304             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
305             h.setServiceParent(self)
306             # TODO: this is confusing. BASEDIR/private/helper.furl is created
307             # by the helper. BASEDIR/helper.furl is consumed by the client
308             # who wants to use the helper. I like having the filename be the
309             # same, since that makes 'cp' work smoothly, but the difference
310             # between config inputs and generated outputs is hard to see.
311             helper_furlfile = os.path.join(self.basedir,
312                                            "private", "helper.furl")
313             self.tub.registerReference(h, furlFile=helper_furlfile)
314         d.addCallback(_publish)
315         d.addErrback(log.err, facility="tahoe.init",
316                      level=log.BAD, umid="K0mW5w")
317
318     def init_key_gen(self, key_gen_furl):
319         d = self.when_tub_ready()
320         def _subscribe(self):
321             self.tub.connectTo(key_gen_furl, self._got_key_generator)
322         d.addCallback(_subscribe)
323         d.addErrback(log.err, facility="tahoe.init",
324                      level=log.BAD, umid="z9DMzw")
325
326     def _got_key_generator(self, key_generator):
327         self._key_generator = key_generator
328         key_generator.notifyOnDisconnect(self._lost_key_generator)
329
330     def _lost_key_generator(self):
331         self._key_generator = None
332
333     def init_web(self, webport):
334         self.log("init_web(webport=%s)", args=(webport,))
335
336         from allmydata.webish import WebishServer
337         nodeurl_path = os.path.join(self.basedir, "node.url")
338         staticdir = self.get_config("node", "web.static", "public_html")
339         staticdir = os.path.expanduser(staticdir)
340         ws = WebishServer(self, webport, nodeurl_path, staticdir)
341         self.add_service(ws)
342
343     def init_ftp_server(self):
344         if self.get_config("ftpd", "enabled", False, boolean=True):
345             accountfile = self.get_config("ftpd", "accounts.file", None)
346             accounturl = self.get_config("ftpd", "accounts.url", None)
347             ftp_portstr = self.get_config("ftpd", "port", "8021")
348
349             from allmydata.frontends import ftpd
350             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
351             s.setServiceParent(self)
352
353     def init_sftp_server(self):
354         if self.get_config("sftpd", "enabled", False, boolean=True):
355             accountfile = self.get_config("sftpd", "accounts.file", None)
356             accounturl = self.get_config("sftpd", "accounts.url", None)
357             sftp_portstr = self.get_config("sftpd", "port", "8022")
358             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
359             privkey_file = self.get_config("sftpd", "host_privkey_file")
360
361             from allmydata.frontends import sftpd
362             s = sftpd.SFTPServer(self, accountfile, accounturl,
363                                  sftp_portstr, pubkey_file, privkey_file)
364             s.setServiceParent(self)
365
366     def _check_hotline(self, hotline_file):
367         if os.path.exists(hotline_file):
368             mtime = os.stat(hotline_file)[stat.ST_MTIME]
369             if mtime > time.time() - 120.0:
370                 return
371             else:
372                 self.log("hotline file too old, shutting down")
373         else:
374             self.log("hotline file missing, shutting down")
375         reactor.stop()
376
377     def get_encoding_parameters(self):
378         return self.DEFAULT_ENCODING_PARAMETERS
379
380     def connected_to_introducer(self):
381         if self.introducer_client:
382             return self.introducer_client.connected_to_introducer()
383         return False
384
385     def get_renewal_secret(self):
386         return hashutil.my_renewal_secret_hash(self._lease_secret)
387
388     def get_cancel_secret(self):
389         return hashutil.my_cancel_secret_hash(self._lease_secret)
390
391     def debug_wait_for_client_connections(self, num_clients):
392         """Return a Deferred that fires (with None) when we have connections
393         to the given number of peers. Useful for tests that set up a
394         temporary test network and need to know when it is safe to proceed
395         with an upload or download."""
396         def _check():
397             return len(self.storage_broker.get_all_servers()) >= num_clients
398         d = self.poll(_check, 0.5)
399         d.addCallback(lambda res: None)
400         return d
401
402
403     # these four methods are the primitives for creating filenodes and
404     # dirnodes. The first takes a URI and produces a filenode or (new-style)
405     # dirnode. The other three create brand-new filenodes/dirnodes.
406
407     def create_node_from_uri(self, u, readcap=None):
408         # this returns synchronously.
409         if not u:
410             u = readcap
411         u = IURI(u)
412         u_s = u.to_string()
413         if u_s not in self._node_cache:
414             if IReadonlyNewDirectoryURI.providedBy(u):
415                 # new-style read-only dirnodes
416                 node = NewDirectoryNode(self).init_from_uri(u)
417             elif INewDirectoryURI.providedBy(u):
418                 # new-style dirnodes
419                 node = NewDirectoryNode(self).init_from_uri(u)
420             elif IFileURI.providedBy(u):
421                 if isinstance(u, LiteralFileURI):
422                     node = LiteralFileNode(u, self) # LIT
423                 else:
424                     key = base32.b2a(u.storage_index)
425                     cachefile = self.download_cache.get_file(key)
426                     node = FileNode(u, self, cachefile) # CHK
427             else:
428                 assert IMutableFileURI.providedBy(u), u
429                 node = MutableFileNode(self).init_from_uri(u)
430             self._node_cache[u_s] = node
431         return self._node_cache[u_s]
432
433     def create_empty_dirnode(self):
434         n = NewDirectoryNode(self)
435         d = n.create(self._generate_pubprivkeys, self.DEFAULT_MUTABLE_KEYSIZE)
436         d.addCallback(lambda res: n)
437         return d
438
439     def create_mutable_file(self, contents="", keysize=None):
440         keysize = keysize or self.DEFAULT_MUTABLE_KEYSIZE
441         n = MutableFileNode(self)
442         d = n.create(contents, self._generate_pubprivkeys, keysize=keysize)
443         d.addCallback(lambda res: n)
444         return d
445
446     def _generate_pubprivkeys(self, key_size):
447         if self._key_generator:
448             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
449             def make_key_objs((verifying_key, signing_key)):
450                 v = rsa.create_verifying_key_from_string(verifying_key)
451                 s = rsa.create_signing_key_from_string(signing_key)
452                 return v, s
453             d.addCallback(make_key_objs)
454             return d
455         else:
456             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
457             # secs
458             signer = rsa.generate(key_size)
459             verifier = signer.get_verifying_key()
460             return verifier, signer
461
462     def upload(self, uploadable):
463         uploader = self.getServiceNamed("uploader")
464         return uploader.upload(uploadable, history=self.get_history())
465
466
467     def list_all_upload_statuses(self):
468         return self.get_history().list_all_upload_statuses()
469
470     def list_all_download_statuses(self):
471         return self.get_history().list_all_download_statuses()
472
473     def list_all_mapupdate_statuses(self):
474         return self.get_history().list_all_mapupdate_statuses()
475     def list_all_publish_statuses(self):
476         return self.get_history().list_all_publish_statuses()
477     def list_all_retrieve_statuses(self):
478         return self.get_history().list_all_retrieve_statuses()
479
480     def list_all_helper_statuses(self):
481         try:
482             helper = self.getServiceNamed("helper")
483         except KeyError:
484             return []
485         return helper.get_all_upload_statuses()