]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
directories: rename internal data member download_cache to download_cache_dirman...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
4
5 from zope.interface import implements
6 from twisted.internet import reactor
7 from twisted.application.internet import TimerService
8 from foolscap.api import Referenceable
9 from pycryptopp.publickey import rsa
10
11 import allmydata
12 from allmydata.storage.server import StorageServer
13 from allmydata import storage_client
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.download import Downloader
16 from allmydata.immutable.filenode import FileNode, LiteralFileNode
17 from allmydata.immutable.offloaded import Helper
18 from allmydata.control import ControlServer
19 from allmydata.introducer.client import IntroducerClient
20 from allmydata.util import hashutil, base32, pollmixin, cachedir, log
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.uri import LiteralFileURI, UnknownURI
24 from allmydata.dirnode import NewDirectoryNode
25 from allmydata.mutable.filenode import MutableFileNode
26 from allmydata.unknown import UnknownNode
27 from allmydata.stats import StatsProvider
28 from allmydata.history import History
29 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
30      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
31
32 KiB=1024
33 MiB=1024*KiB
34 GiB=1024*MiB
35 TiB=1024*GiB
36 PiB=1024*TiB
37
38 class StubClient(Referenceable):
39     implements(RIStubClient)
40
41 def _make_secret():
42     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
43
44 class Client(node.Node, pollmixin.PollMixin):
45     implements(IStatsProducer)
46
47     PORTNUMFILE = "client.port"
48     STOREDIR = 'storage'
49     NODETYPE = "client"
50     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
51
52     # This means that if a storage server treats me as though I were a
53     # 1.0.0 storage client, it will work as they expect.
54     OLDEST_SUPPORTED_VERSION = "1.0.0"
55
56     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
57     # is the number of shares required to reconstruct a file. 'desired' means
58     # that we will abort an upload unless we can allocate space for at least
59     # this many. 'total' is the total number of shares created by encoding.
60     # If everybody has room then this is is how many we will upload.
61     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
62                                    "happy": 7,
63                                    "n": 10,
64                                    "max_segment_size": 128*KiB,
65                                    }
66
67     # set this to override the size of the RSA keys created for new mutable
68     # files. The default of None means to let mutable.filenode choose its own
69     # size, which means 2048 bits.
70     DEFAULT_MUTABLE_KEYSIZE = None
71
72     def __init__(self, basedir="."):
73         node.Node.__init__(self, basedir)
74         self.started_timestamp = time.time()
75         self.logSource="Client"
76         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
77         self.init_introducer_client()
78         self.init_stats_provider()
79         self.init_lease_secret()
80         self.init_storage()
81         self.init_control()
82         if self.get_config("helper", "enabled", False, boolean=True):
83             self.init_helper()
84         self.init_client()
85         self._key_generator = None
86         key_gen_furl = self.get_config("client", "key_generator.furl", None)
87         if key_gen_furl:
88             self.init_key_gen(key_gen_furl)
89         # ControlServer and Helper are attached after Tub startup
90         self.init_ftp_server()
91         self.init_sftp_server()
92
93         hotline_file = os.path.join(self.basedir,
94                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
95         if os.path.exists(hotline_file):
96             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
97             self.log("hotline file noticed (%ds old), starting timer" % age)
98             hotline = TimerService(1.0, self._check_hotline, hotline_file)
99             hotline.setServiceParent(self)
100
101         # this needs to happen last, so it can use getServiceNamed() to
102         # acquire references to StorageServer and other web-statusable things
103         webport = self.get_config("node", "web.port", None)
104         if webport:
105             self.init_web(webport) # strports string
106
107     def read_old_config_files(self):
108         node.Node.read_old_config_files(self)
109         copy = self._copy_config_from_file
110         copy("introducer.furl", "client", "introducer.furl")
111         copy("helper.furl", "client", "helper.furl")
112         copy("key_generator.furl", "client", "key_generator.furl")
113         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
114         if os.path.exists(os.path.join(self.basedir, "no_storage")):
115             self.set_config("storage", "enabled", "false")
116         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
117             self.set_config("storage", "readonly", "true")
118         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
119             self.set_config("storage", "debug_discard", "true")
120         if os.path.exists(os.path.join(self.basedir, "run_helper")):
121             self.set_config("helper", "enabled", "true")
122
123     def init_introducer_client(self):
124         self.introducer_furl = self.get_config("client", "introducer.furl")
125         ic = IntroducerClient(self.tub, self.introducer_furl,
126                               self.nickname,
127                               str(allmydata.__full_version__),
128                               str(self.OLDEST_SUPPORTED_VERSION))
129         self.introducer_client = ic
130         # hold off on starting the IntroducerClient until our tub has been
131         # started, so we'll have a useful address on our RemoteReference, so
132         # that the introducer's status page will show us.
133         d = self.when_tub_ready()
134         def _start_introducer_client(res):
135             ic.setServiceParent(self)
136         d.addCallback(_start_introducer_client)
137         d.addErrback(log.err, facility="tahoe.init",
138                      level=log.BAD, umid="URyI5w")
139
140     def init_stats_provider(self):
141         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
142         self.stats_provider = StatsProvider(self, gatherer_furl)
143         self.add_service(self.stats_provider)
144         self.stats_provider.register_producer(self)
145
146     def get_stats(self):
147         return { 'node.uptime': time.time() - self.started_timestamp }
148
149     def init_lease_secret(self):
150         secret_s = self.get_or_create_private_config("secret", _make_secret)
151         self._lease_secret = base32.a2b(secret_s)
152
153     def init_storage(self):
154         # should we run a storage server (and publish it for others to use)?
155         if not self.get_config("storage", "enabled", True, boolean=True):
156             return
157         readonly = self.get_config("storage", "readonly", False, boolean=True)
158
159         storedir = os.path.join(self.basedir, self.STOREDIR)
160
161         data = self.get_config("storage", "reserved_space", None)
162         reserved = None
163         try:
164             reserved = parse_abbreviated_size(data)
165         except ValueError:
166             log.msg("[storage]reserved_space= contains unparseable value %s"
167                     % data)
168         if reserved is None:
169             reserved = 0
170         discard = self.get_config("storage", "debug_discard", False,
171                                   boolean=True)
172
173         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
174         if expire:
175             mode = self.get_config("storage", "expire.mode") # require a mode
176         else:
177             mode = self.get_config("storage", "expire.mode", "age")
178
179         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
180         if o_l_d is not None:
181             o_l_d = parse_duration(o_l_d)
182
183         cutoff_date = None
184         if mode == "cutoff-date":
185             cutoff_date = self.get_config("storage", "expire.cutoff_date")
186             cutoff_date = parse_date(cutoff_date)
187
188         sharetypes = []
189         if self.get_config("storage", "expire.immutable", True, boolean=True):
190             sharetypes.append("immutable")
191         if self.get_config("storage", "expire.mutable", True, boolean=True):
192             sharetypes.append("mutable")
193         expiration_sharetypes = tuple(sharetypes)
194
195         ss = StorageServer(storedir, self.nodeid,
196                            reserved_space=reserved,
197                            discard_storage=discard,
198                            readonly_storage=readonly,
199                            stats_provider=self.stats_provider,
200                            expiration_enabled=expire,
201                            expiration_mode=mode,
202                            expiration_override_lease_duration=o_l_d,
203                            expiration_cutoff_date=cutoff_date,
204                            expiration_sharetypes=expiration_sharetypes)
205         self.add_service(ss)
206
207         d = self.when_tub_ready()
208         # we can't do registerReference until the Tub is ready
209         def _publish(res):
210             furl_file = os.path.join(self.basedir, "private", "storage.furl")
211             furl = self.tub.registerReference(ss, furlFile=furl_file)
212             ri_name = RIStorageServer.__remote_name__
213             self.introducer_client.publish(furl, "storage", ri_name)
214         d.addCallback(_publish)
215         d.addErrback(log.err, facility="tahoe.init",
216                      level=log.BAD, umid="aLGBKw")
217
218     def init_client(self):
219         helper_furl = self.get_config("client", "helper.furl", None)
220         DEP = self.DEFAULT_ENCODING_PARAMETERS
221         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
222         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
223         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
224         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
225         self.convergence = base32.a2b(convergence_s)
226         self._node_cache = weakref.WeakValueDictionary() # uri -> node
227
228         self.init_client_storage_broker()
229         self.add_service(History(self.stats_provider))
230         self.add_service(Uploader(helper_furl, self.stats_provider))
231         download_cachedir = os.path.join(self.basedir,
232                                          "private", "cache", "download")
233         self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
234         self.download_cache_dirman.setServiceParent(self)
235         self.add_service(Downloader(self.stats_provider))
236         self.init_stub_client()
237
238     def init_client_storage_broker(self):
239         # create a StorageFarmBroker object, for use by Uploader/Downloader
240         # (and everybody else who wants to use storage servers)
241         sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
242         self.storage_broker = sb
243
244         # load static server specifications from tahoe.cfg, if any.
245         # Not quite ready yet.
246         #if self.config.has_section("client-server-selection"):
247         #    server_params = {} # maps serverid to dict of parameters
248         #    for (name, value) in self.config.items("client-server-selection"):
249         #        pieces = name.split(".")
250         #        if pieces[0] == "server":
251         #            serverid = pieces[1]
252         #            if serverid not in server_params:
253         #                server_params[serverid] = {}
254         #            server_params[serverid][pieces[2]] = value
255         #    for serverid, params in server_params.items():
256         #        server_type = params.pop("type")
257         #        if server_type == "tahoe-foolscap":
258         #            s = storage_client.NativeStorageClient(*params)
259         #        else:
260         #            msg = ("unrecognized server type '%s' in "
261         #                   "tahoe.cfg [client-server-selection]server.%s.type"
262         #                   % (server_type, serverid))
263         #            raise storage_client.UnknownServerTypeError(msg)
264         #        sb.add_server(s.serverid, s)
265
266         # check to see if we're supposed to use the introducer too
267         if self.get_config("client-server-selection", "use_introducer",
268                            default=True, boolean=True):
269             sb.use_introducer(self.introducer_client)
270
271     def get_storage_broker(self):
272         return self.storage_broker
273
274     def init_stub_client(self):
275         def _publish(res):
276             # we publish an empty object so that the introducer can count how
277             # many clients are connected and see what versions they're
278             # running.
279             sc = StubClient()
280             furl = self.tub.registerReference(sc)
281             ri_name = RIStubClient.__remote_name__
282             self.introducer_client.publish(furl, "stub_client", ri_name)
283         d = self.when_tub_ready()
284         d.addCallback(_publish)
285         d.addErrback(log.err, facility="tahoe.init",
286                      level=log.BAD, umid="OEHq3g")
287
288     def get_history(self):
289         return self.getServiceNamed("history")
290
291     def init_control(self):
292         d = self.when_tub_ready()
293         def _publish(res):
294             c = ControlServer()
295             c.setServiceParent(self)
296             control_url = self.tub.registerReference(c)
297             self.write_private_config("control.furl", control_url + "\n")
298         d.addCallback(_publish)
299         d.addErrback(log.err, facility="tahoe.init",
300                      level=log.BAD, umid="d3tNXA")
301
302     def init_helper(self):
303         d = self.when_tub_ready()
304         def _publish(self):
305             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
306             h.setServiceParent(self)
307             # TODO: this is confusing. BASEDIR/private/helper.furl is created
308             # by the helper. BASEDIR/helper.furl is consumed by the client
309             # who wants to use the helper. I like having the filename be the
310             # same, since that makes 'cp' work smoothly, but the difference
311             # between config inputs and generated outputs is hard to see.
312             helper_furlfile = os.path.join(self.basedir,
313                                            "private", "helper.furl")
314             self.tub.registerReference(h, furlFile=helper_furlfile)
315         d.addCallback(_publish)
316         d.addErrback(log.err, facility="tahoe.init",
317                      level=log.BAD, umid="K0mW5w")
318
319     def init_key_gen(self, key_gen_furl):
320         d = self.when_tub_ready()
321         def _subscribe(self):
322             self.tub.connectTo(key_gen_furl, self._got_key_generator)
323         d.addCallback(_subscribe)
324         d.addErrback(log.err, facility="tahoe.init",
325                      level=log.BAD, umid="z9DMzw")
326
327     def _got_key_generator(self, key_generator):
328         self._key_generator = key_generator
329         key_generator.notifyOnDisconnect(self._lost_key_generator)
330
331     def _lost_key_generator(self):
332         self._key_generator = None
333
334     def init_web(self, webport):
335         self.log("init_web(webport=%s)", args=(webport,))
336
337         from allmydata.webish import WebishServer
338         nodeurl_path = os.path.join(self.basedir, "node.url")
339         staticdir = self.get_config("node", "web.static", "public_html")
340         staticdir = os.path.expanduser(staticdir)
341         ws = WebishServer(self, webport, nodeurl_path, staticdir)
342         self.add_service(ws)
343
344     def init_ftp_server(self):
345         if self.get_config("ftpd", "enabled", False, boolean=True):
346             accountfile = self.get_config("ftpd", "accounts.file", None)
347             accounturl = self.get_config("ftpd", "accounts.url", None)
348             ftp_portstr = self.get_config("ftpd", "port", "8021")
349
350             from allmydata.frontends import ftpd
351             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
352             s.setServiceParent(self)
353
354     def init_sftp_server(self):
355         if self.get_config("sftpd", "enabled", False, boolean=True):
356             accountfile = self.get_config("sftpd", "accounts.file", None)
357             accounturl = self.get_config("sftpd", "accounts.url", None)
358             sftp_portstr = self.get_config("sftpd", "port", "8022")
359             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
360             privkey_file = self.get_config("sftpd", "host_privkey_file")
361
362             from allmydata.frontends import sftpd
363             s = sftpd.SFTPServer(self, accountfile, accounturl,
364                                  sftp_portstr, pubkey_file, privkey_file)
365             s.setServiceParent(self)
366
367     def _check_hotline(self, hotline_file):
368         if os.path.exists(hotline_file):
369             mtime = os.stat(hotline_file)[stat.ST_MTIME]
370             if mtime > time.time() - 120.0:
371                 return
372             else:
373                 self.log("hotline file too old, shutting down")
374         else:
375             self.log("hotline file missing, shutting down")
376         reactor.stop()
377
378     def get_encoding_parameters(self):
379         return self.DEFAULT_ENCODING_PARAMETERS
380
381     def connected_to_introducer(self):
382         if self.introducer_client:
383             return self.introducer_client.connected_to_introducer()
384         return False
385
386     def get_renewal_secret(self):
387         return hashutil.my_renewal_secret_hash(self._lease_secret)
388
389     def get_cancel_secret(self):
390         return hashutil.my_cancel_secret_hash(self._lease_secret)
391
392     def debug_wait_for_client_connections(self, num_clients):
393         """Return a Deferred that fires (with None) when we have connections
394         to the given number of peers. Useful for tests that set up a
395         temporary test network and need to know when it is safe to proceed
396         with an upload or download."""
397         def _check():
398             return len(self.storage_broker.get_all_servers()) >= num_clients
399         d = self.poll(_check, 0.5)
400         d.addCallback(lambda res: None)
401         return d
402
403
404     # these four methods are the primitives for creating filenodes and
405     # dirnodes. The first takes a URI and produces a filenode or (new-style)
406     # dirnode. The other three create brand-new filenodes/dirnodes.
407
408     def create_node_from_uri(self, writecap, readcap=None):
409         # this returns synchronously.
410         u = writecap or readcap
411         if not u:
412             # maybe the writecap was hidden because we're in a readonly
413             # directory, and the future cap format doesn't have a readcap, or
414             # something.
415             return UnknownNode(writecap, readcap)
416         u = IURI(u)
417         if isinstance(u, UnknownURI):
418             return UnknownNode(writecap, readcap)
419         u_s = u.to_string()
420         if u_s not in self._node_cache:
421             if IReadonlyNewDirectoryURI.providedBy(u):
422                 # new-style read-only dirnodes
423                 node = NewDirectoryNode(self).init_from_uri(u)
424             elif INewDirectoryURI.providedBy(u):
425                 # new-style dirnodes
426                 node = NewDirectoryNode(self).init_from_uri(u)
427             elif IFileURI.providedBy(u):
428                 if isinstance(u, LiteralFileURI):
429                     node = LiteralFileNode(u, self) # LIT
430                 else:
431                     key = base32.b2a(u.storage_index)
432                     cachefile = self.download_cache_dirman.get_file(key)
433                     node = FileNode(u, self, cachefile) # CHK
434             else:
435                 assert IMutableFileURI.providedBy(u), u
436                 node = MutableFileNode(self).init_from_uri(u)
437             self._node_cache[u_s] = node  # note: WeakValueDictionary
438         return self._node_cache[u_s]
439
440     def create_empty_dirnode(self):
441         d = self.create_mutable_file()
442         d.addCallback(NewDirectoryNode.create_with_mutablefile, self)
443         return d
444
445     def create_mutable_file(self, contents="", keysize=None):
446         keysize = keysize or self.DEFAULT_MUTABLE_KEYSIZE
447         n = MutableFileNode(self)
448         d = n.create(contents, self._generate_pubprivkeys, keysize=keysize)
449         d.addCallback(lambda res: n)
450         return d
451
452     def _generate_pubprivkeys(self, key_size):
453         if self._key_generator:
454             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
455             def make_key_objs((verifying_key, signing_key)):
456                 v = rsa.create_verifying_key_from_string(verifying_key)
457                 s = rsa.create_signing_key_from_string(signing_key)
458                 return v, s
459             d.addCallback(make_key_objs)
460             return d
461         else:
462             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
463             # secs
464             signer = rsa.generate(key_size)
465             verifier = signer.get_verifying_key()
466             return verifier, signer
467
468     def upload(self, uploadable):
469         uploader = self.getServiceNamed("uploader")
470         return uploader.upload(uploadable, history=self.get_history())
471
472
473     def list_all_upload_statuses(self):
474         return self.get_history().list_all_upload_statuses()
475
476     def list_all_download_statuses(self):
477         return self.get_history().list_all_download_statuses()
478
479     def list_all_mapupdate_statuses(self):
480         return self.get_history().list_all_mapupdate_statuses()
481     def list_all_publish_statuses(self):
482         return self.get_history().list_all_publish_statuses()
483     def list_all_retrieve_statuses(self):
484         return self.get_history().list_all_retrieve_statuses()
485
486     def list_all_helper_statuses(self):
487         try:
488             helper = self.getServiceNamed("helper")
489         except KeyError:
490             return []
491         return helper.get_all_upload_statuses()