]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
mutable: move recent operation history management code (MutableWatcher) into history...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1
2 import os, stat, time, weakref
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
5
6 from zope.interface import implements
7 from twisted.internet import reactor
8 from twisted.application.internet import TimerService
9 from foolscap import Referenceable
10 from foolscap.logging import log
11 from pycryptopp.publickey import rsa
12
13 import allmydata
14 from allmydata.storage import StorageServer
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.download import Downloader
17 from allmydata.immutable.filenode import FileNode, LiteralFileNode
18 from allmydata.immutable.offloaded import Helper
19 from allmydata.control import ControlServer
20 from allmydata.introducer.client import IntroducerClient
21 from allmydata.util import hashutil, base32, pollmixin, cachedir
22 from allmydata.util.abbreviate import parse_abbreviated_size
23 from allmydata.uri import LiteralFileURI
24 from allmydata.dirnode import NewDirectoryNode
25 from allmydata.mutable.filenode import MutableFileNode
26 from allmydata.stats import StatsProvider
27 from allmydata.history import History
28 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
29      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
30
31 KiB=1024
32 MiB=1024*KiB
33 GiB=1024*MiB
34 TiB=1024*GiB
35 PiB=1024*TiB
36
37 class StubClient(Referenceable):
38     implements(RIStubClient)
39
40 def _make_secret():
41     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
42
43 class Client(node.Node, pollmixin.PollMixin):
44     implements(IStatsProducer)
45
46     PORTNUMFILE = "client.port"
47     STOREDIR = 'storage'
48     NODETYPE = "client"
49     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
50
51     # This means that if a storage server treats me as though I were a
52     # 1.0.0 storage client, it will work as they expect.
53     OLDEST_SUPPORTED_VERSION = "1.0.0"
54
55     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
56     # is the number of shares required to reconstruct a file. 'desired' means
57     # that we will abort an upload unless we can allocate space for at least
58     # this many. 'total' is the total number of shares created by encoding.
59     # If everybody has room then this is is how many we will upload.
60     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
61                                    "happy": 7,
62                                    "n": 10,
63                                    "max_segment_size": 128*KiB,
64                                    }
65
66     def __init__(self, basedir="."):
67         node.Node.__init__(self, basedir)
68         self.started_timestamp = time.time()
69         self.logSource="Client"
70         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
71         self.init_introducer_client()
72         self.init_stats_provider()
73         self.init_lease_secret()
74         self.init_storage()
75         self.init_control()
76         if self.get_config("helper", "enabled", False, boolean=True):
77             self.init_helper()
78         self.init_client()
79         self._key_generator = None
80         key_gen_furl = self.get_config("client", "key_generator.furl", None)
81         if key_gen_furl:
82             self.init_key_gen(key_gen_furl)
83         # ControlServer and Helper are attached after Tub startup
84         self.init_ftp_server()
85         self.init_sftp_server()
86
87         hotline_file = os.path.join(self.basedir,
88                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
89         if os.path.exists(hotline_file):
90             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
91             self.log("hotline file noticed (%ds old), starting timer" % age)
92             hotline = TimerService(1.0, self._check_hotline, hotline_file)
93             hotline.setServiceParent(self)
94
95         webport = self.get_config("node", "web.port", None)
96         if webport:
97             self.init_web(webport) # strports string
98
99     def read_old_config_files(self):
100         node.Node.read_old_config_files(self)
101         copy = self._copy_config_from_file
102         copy("introducer.furl", "client", "introducer.furl")
103         copy("helper.furl", "client", "helper.furl")
104         copy("key_generator.furl", "client", "key_generator.furl")
105         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
106         if os.path.exists(os.path.join(self.basedir, "no_storage")):
107             self.set_config("storage", "enabled", "false")
108         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
109             self.set_config("storage", "readonly", "true")
110         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
111             self.set_config("storage", "debug_discard", "true")
112         if os.path.exists(os.path.join(self.basedir, "run_helper")):
113             self.set_config("helper", "enabled", "true")
114
115     def init_introducer_client(self):
116         self.introducer_furl = self.get_config("client", "introducer.furl")
117         ic = IntroducerClient(self.tub, self.introducer_furl,
118                               self.nickname,
119                               str(allmydata.__version__),
120                               str(self.OLDEST_SUPPORTED_VERSION))
121         self.introducer_client = ic
122         # hold off on starting the IntroducerClient until our tub has been
123         # started, so we'll have a useful address on our RemoteReference, so
124         # that the introducer's status page will show us.
125         d = self.when_tub_ready()
126         def _start_introducer_client(res):
127             ic.setServiceParent(self)
128             # nodes that want to upload and download will need storage servers
129             ic.subscribe_to("storage")
130         d.addCallback(_start_introducer_client)
131         d.addErrback(log.err, facility="tahoe.init",
132                      level=log.BAD, umid="URyI5w")
133
134     def init_stats_provider(self):
135         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
136         self.stats_provider = StatsProvider(self, gatherer_furl)
137         self.add_service(self.stats_provider)
138         self.stats_provider.register_producer(self)
139
140     def get_stats(self):
141         return { 'node.uptime': time.time() - self.started_timestamp }
142
143     def init_lease_secret(self):
144         secret_s = self.get_or_create_private_config("secret", _make_secret)
145         self._lease_secret = base32.a2b(secret_s)
146
147     def init_storage(self):
148         # should we run a storage server (and publish it for others to use)?
149         if not self.get_config("storage", "enabled", True, boolean=True):
150             return
151         readonly = self.get_config("storage", "readonly", False, boolean=True)
152
153         storedir = os.path.join(self.basedir, self.STOREDIR)
154
155         data = self.get_config("storage", "reserved_space", None)
156         reserved = None
157         try:
158             reserved = parse_abbreviated_size(data)
159         except ValueError:
160             log.msg("[storage]reserved_space= contains unparseable value %s"
161                     % data)
162         if reserved is None:
163             reserved = 0
164         discard = self.get_config("storage", "debug_discard", False,
165                                   boolean=True)
166         ss = StorageServer(storedir,
167                            reserved_space=reserved,
168                            discard_storage=discard,
169                            readonly_storage=readonly,
170                            stats_provider=self.stats_provider)
171         self.add_service(ss)
172         d = self.when_tub_ready()
173         # we can't do registerReference until the Tub is ready
174         def _publish(res):
175             furl_file = os.path.join(self.basedir, "private", "storage.furl")
176             furl = self.tub.registerReference(ss, furlFile=furl_file)
177             ri_name = RIStorageServer.__remote_name__
178             self.introducer_client.publish(furl, "storage", ri_name)
179         d.addCallback(_publish)
180         d.addErrback(log.err, facility="tahoe.init",
181                      level=log.BAD, umid="aLGBKw")
182
183     def init_client(self):
184         helper_furl = self.get_config("client", "helper.furl", None)
185         DEP = self.DEFAULT_ENCODING_PARAMETERS
186         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
187         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
188         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
189         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
190         self.convergence = base32.a2b(convergence_s)
191         self._node_cache = weakref.WeakValueDictionary() # uri -> node
192         self.add_service(History(self.stats_provider))
193         self.add_service(Uploader(helper_furl, self.stats_provider))
194         download_cachedir = os.path.join(self.basedir,
195                                          "private", "cache", "download")
196         self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
197         self.download_cache.setServiceParent(self)
198         self.add_service(Downloader(self.stats_provider))
199         def _publish(res):
200             # we publish an empty object so that the introducer can count how
201             # many clients are connected and see what versions they're
202             # running.
203             sc = StubClient()
204             furl = self.tub.registerReference(sc)
205             ri_name = RIStubClient.__remote_name__
206             self.introducer_client.publish(furl, "stub_client", ri_name)
207         d = self.when_tub_ready()
208         d.addCallback(_publish)
209         d.addErrback(log.err, facility="tahoe.init",
210                      level=log.BAD, umid="OEHq3g")
211
212     def get_history(self):
213         return self.getServiceNamed("history")
214
215     def init_control(self):
216         d = self.when_tub_ready()
217         def _publish(res):
218             c = ControlServer()
219             c.setServiceParent(self)
220             control_url = self.tub.registerReference(c)
221             self.write_private_config("control.furl", control_url + "\n")
222         d.addCallback(_publish)
223         d.addErrback(log.err, facility="tahoe.init",
224                      level=log.BAD, umid="d3tNXA")
225
226     def init_helper(self):
227         d = self.when_tub_ready()
228         def _publish(self):
229             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
230             h.setServiceParent(self)
231             # TODO: this is confusing. BASEDIR/private/helper.furl is created
232             # by the helper. BASEDIR/helper.furl is consumed by the client
233             # who wants to use the helper. I like having the filename be the
234             # same, since that makes 'cp' work smoothly, but the difference
235             # between config inputs and generated outputs is hard to see.
236             helper_furlfile = os.path.join(self.basedir,
237                                            "private", "helper.furl")
238             self.tub.registerReference(h, furlFile=helper_furlfile)
239         d.addCallback(_publish)
240         d.addErrback(log.err, facility="tahoe.init",
241                      level=log.BAD, umid="K0mW5w")
242
243     def init_key_gen(self, key_gen_furl):
244         d = self.when_tub_ready()
245         def _subscribe(self):
246             self.tub.connectTo(key_gen_furl, self._got_key_generator)
247         d.addCallback(_subscribe)
248         d.addErrback(log.err, facility="tahoe.init",
249                      level=log.BAD, umid="z9DMzw")
250
251     def _got_key_generator(self, key_generator):
252         self._key_generator = key_generator
253         key_generator.notifyOnDisconnect(self._lost_key_generator)
254
255     def _lost_key_generator(self):
256         self._key_generator = None
257
258     def get_servers(self, service_name):
259         """ Return set of (peerid, versioned-rref) """
260         assert isinstance(service_name, str)
261         return self.introducer_client.get_peers(service_name)
262
263     def init_web(self, webport):
264         self.log("init_web(webport=%s)", args=(webport,))
265
266         from allmydata.webish import WebishServer
267         nodeurl_path = os.path.join(self.basedir, "node.url")
268         staticdir = self.get_config("node", "web.static", "public_html")
269         staticdir = os.path.expanduser(staticdir)
270         ws = WebishServer(webport, nodeurl_path, staticdir)
271         self.add_service(ws)
272
273     def init_ftp_server(self):
274         if self.get_config("ftpd", "enabled", False, boolean=True):
275             accountfile = self.get_config("ftpd", "accounts.file", None)
276             accounturl = self.get_config("ftpd", "accounts.url", None)
277             ftp_portstr = self.get_config("ftpd", "port", "8021")
278
279             from allmydata.frontends import ftpd
280             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
281             s.setServiceParent(self)
282
283     def init_sftp_server(self):
284         if self.get_config("sftpd", "enabled", False, boolean=True):
285             accountfile = self.get_config("sftpd", "accounts.file", None)
286             accounturl = self.get_config("sftpd", "accounts.url", None)
287             sftp_portstr = self.get_config("sftpd", "port", "8022")
288             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
289             privkey_file = self.get_config("sftpd", "host_privkey_file")
290
291             from allmydata.frontends import sftpd
292             s = sftpd.SFTPServer(self, accountfile, accounturl,
293                                  sftp_portstr, pubkey_file, privkey_file)
294             s.setServiceParent(self)
295
296     def _check_hotline(self, hotline_file):
297         if os.path.exists(hotline_file):
298             mtime = os.stat(hotline_file)[stat.ST_MTIME]
299             if mtime > time.time() - 40.0:
300                 return
301             else:
302                 self.log("hotline file too old, shutting down")
303         else:
304             self.log("hotline file missing, shutting down")
305         reactor.stop()
306
307     def get_all_peerids(self):
308         return self.introducer_client.get_all_peerids()
309     def get_nickname_for_peerid(self, peerid):
310         return self.introducer_client.get_nickname_for_peerid(peerid)
311
312     def get_permuted_peers(self, service_name, key):
313         """
314         @return: list of (peerid, connection,)
315         """
316         assert isinstance(service_name, str)
317         assert isinstance(key, str)
318         return self.introducer_client.get_permuted_peers(service_name, key)
319
320     def get_encoding_parameters(self):
321         return self.DEFAULT_ENCODING_PARAMETERS
322
323     def connected_to_introducer(self):
324         if self.introducer_client:
325             return self.introducer_client.connected_to_introducer()
326         return False
327
328     def get_renewal_secret(self):
329         return hashutil.my_renewal_secret_hash(self._lease_secret)
330
331     def get_cancel_secret(self):
332         return hashutil.my_cancel_secret_hash(self._lease_secret)
333
334     def debug_wait_for_client_connections(self, num_clients):
335         """Return a Deferred that fires (with None) when we have connections
336         to the given number of peers. Useful for tests that set up a
337         temporary test network and need to know when it is safe to proceed
338         with an upload or download."""
339         def _check():
340             current_clients = list(self.get_all_peerids())
341             return len(current_clients) >= num_clients
342         d = self.poll(_check, 0.5)
343         d.addCallback(lambda res: None)
344         return d
345
346
347     # these four methods are the primitives for creating filenodes and
348     # dirnodes. The first takes a URI and produces a filenode or (new-style)
349     # dirnode. The other three create brand-new filenodes/dirnodes.
350
351     def create_node_from_uri(self, u):
352         # this returns synchronously.
353         u = IURI(u)
354         u_s = u.to_string()
355         if u_s not in self._node_cache:
356             if IReadonlyNewDirectoryURI.providedBy(u):
357                 # new-style read-only dirnodes
358                 node = NewDirectoryNode(self).init_from_uri(u)
359             elif INewDirectoryURI.providedBy(u):
360                 # new-style dirnodes
361                 node = NewDirectoryNode(self).init_from_uri(u)
362             elif IFileURI.providedBy(u):
363                 if isinstance(u, LiteralFileURI):
364                     node = LiteralFileNode(u, self) # LIT
365                 else:
366                     key = base32.b2a(u.storage_index)
367                     cachefile = self.download_cache.get_file(key)
368                     node = FileNode(u, self, cachefile) # CHK
369             else:
370                 assert IMutableFileURI.providedBy(u), u
371                 node = MutableFileNode(self).init_from_uri(u)
372             self._node_cache[u_s] = node
373         return self._node_cache[u_s]
374
375     def create_empty_dirnode(self):
376         n = NewDirectoryNode(self)
377         d = n.create(self._generate_pubprivkeys)
378         d.addCallback(lambda res: n)
379         return d
380
381     def create_mutable_file(self, contents=""):
382         n = MutableFileNode(self)
383         d = n.create(contents, self._generate_pubprivkeys)
384         d.addCallback(lambda res: n)
385         return d
386
387     def _generate_pubprivkeys(self, key_size):
388         if self._key_generator:
389             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
390             def make_key_objs((verifying_key, signing_key)):
391                 v = rsa.create_verifying_key_from_string(verifying_key)
392                 s = rsa.create_signing_key_from_string(signing_key)
393                 return v, s
394             d.addCallback(make_key_objs)
395             return d
396         else:
397             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
398             # secs
399             signer = rsa.generate(key_size)
400             verifier = signer.get_verifying_key()
401             return verifier, signer
402
403     def upload(self, uploadable):
404         uploader = self.getServiceNamed("uploader")
405         return uploader.upload(uploadable, history=self.get_history())
406
407
408     def list_all_upload_statuses(self):
409         return self.get_history().list_all_upload_statuses()
410
411     def list_all_download_statuses(self):
412         return self.get_history().list_all_download_statuses()
413
414     def list_all_mapupdate_statuses(self):
415         return self.get_history().list_all_mapupdate_statuses()
416     def list_all_publish_statuses(self):
417         return self.get_history().list_all_publish_statuses()
418     def list_all_retrieve_statuses(self):
419         return self.get_history().list_all_retrieve_statuses()
420
421     def list_all_helper_statuses(self):
422         try:
423             helper = self.getServiceNamed("helper")
424         except KeyError:
425             return []
426         return helper.get_all_upload_statuses()
427