]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
#518: replace various BASEDIR/* config files with a single BASEDIR/tahoe.cfg, with...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1
2 import os, stat, time, re, weakref
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
5
6 from zope.interface import implements
7 from twisted.internet import reactor
8 from twisted.application.internet import TimerService
9 from foolscap import Referenceable
10 from foolscap.logging import log
11 from pycryptopp.publickey import rsa
12
13 import allmydata
14 from allmydata.storage import StorageServer
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.download import Downloader
17 from allmydata.immutable.filenode import FileNode, LiteralFileNode
18 from allmydata.offloaded import Helper
19 from allmydata.control import ControlServer
20 from allmydata.introducer.client import IntroducerClient
21 from allmydata.util import hashutil, base32, testutil
22 from allmydata.uri import LiteralFileURI
23 from allmydata.dirnode import NewDirectoryNode
24 from allmydata.mutable.node import MutableFileNode, MutableWatcher
25 from allmydata.stats import StatsProvider
26 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
27      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
28
29 KiB=1024
30 MiB=1024*KiB
31 GiB=1024*MiB
32 TiB=1024*GiB
33 PiB=1024*TiB
34
35 class StubClient(Referenceable):
36     implements(RIStubClient)
37
38 def _make_secret():
39     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
40
41 class Client(node.Node, testutil.PollMixin):
42     implements(IStatsProducer)
43
44     PORTNUMFILE = "client.port"
45     STOREDIR = 'storage'
46     NODETYPE = "client"
47     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
48
49     # This means that if a storage server treats me as though I were a
50     # 1.0.0 storage client, it will work as they expect.
51     OLDEST_SUPPORTED_VERSION = "1.0.0"
52
53     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
54     # is the number of shares required to reconstruct a file. 'desired' means
55     # that we will abort an upload unless we can allocate space for at least
56     # this many. 'total' is the total number of shares created by encoding.
57     # If everybody has room then this is is how many we will upload.
58     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
59                                    "happy": 7,
60                                    "n": 10,
61                                    "max_segment_size": 128*KiB,
62                                    }
63
64     def __init__(self, basedir="."):
65         node.Node.__init__(self, basedir)
66         self.started_timestamp = time.time()
67         self.logSource="Client"
68         self.init_introducer_client()
69         self.init_stats_provider()
70         self.init_lease_secret()
71         self.init_storage()
72         self.init_control()
73         if self.get_config("helper", "enabled", False, boolean=True):
74             self.init_helper()
75         self.init_client()
76         self._key_generator = None
77         key_gen_furl = self.get_config("client", "key_generator.furl", None)
78         if key_gen_furl:
79             self.init_key_gen(key_gen_furl)
80         # ControlServer and Helper are attached after Tub startup
81
82         hotline_file = os.path.join(self.basedir,
83                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
84         if os.path.exists(hotline_file):
85             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
86             self.log("hotline file noticed (%ds old), starting timer" % age)
87             hotline = TimerService(1.0, self._check_hotline, hotline_file)
88             hotline.setServiceParent(self)
89
90         webport = self.get_config("node", "web.port", None)
91         if webport:
92             self.init_web(webport) # strports string
93
94     def read_old_config_files(self):
95         node.Node.read_old_config_files(self)
96         copy = self._copy_config_from_file
97         copy("introducer.furl", "client", "introducer.furl")
98         copy("helper.furl", "client", "helper.furl")
99         copy("key_generator.furl", "client", "key_generator.furl")
100         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
101         if os.path.exists(os.path.join(self.basedir, "no_storage")):
102             self.set_config("storage", "enabled", "false")
103         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
104             self.set_config("storage", "readonly", "true")
105         copy("sizelimit", "storage", "sizelimit")
106         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
107             self.set_config("storage", "debug_discard", "true")
108         if os.path.exists(os.path.join(self.basedir, "run_helper")):
109             self.set_config("helper", "enabled", "true")
110
111     def init_introducer_client(self):
112         self.introducer_furl = self.get_config("client", "introducer.furl")
113         ic = IntroducerClient(self.tub, self.introducer_furl,
114                               self.nickname,
115                               str(allmydata.__version__),
116                               str(self.OLDEST_SUPPORTED_VERSION))
117         self.introducer_client = ic
118         # hold off on starting the IntroducerClient until our tub has been
119         # started, so we'll have a useful address on our RemoteReference, so
120         # that the introducer's status page will show us.
121         d = self.when_tub_ready()
122         def _start_introducer_client(res):
123             ic.setServiceParent(self)
124             # nodes that want to upload and download will need storage servers
125             ic.subscribe_to("storage")
126         d.addCallback(_start_introducer_client)
127         d.addErrback(log.err, facility="tahoe.init",
128                      level=log.BAD, umid="URyI5w")
129
130     def init_stats_provider(self):
131         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
132         self.stats_provider = StatsProvider(self, gatherer_furl)
133         self.add_service(self.stats_provider)
134         self.stats_provider.register_producer(self)
135
136     def get_stats(self):
137         return { 'node.uptime': time.time() - self.started_timestamp }
138
139     def init_lease_secret(self):
140         secret_s = self.get_or_create_private_config("secret", _make_secret)
141         self._lease_secret = base32.a2b(secret_s)
142
143     def init_storage(self):
144         # should we run a storage server (and publish it for others to use)?
145         if not self.get_config("storage", "enabled", True, boolean=True):
146             return
147         readonly = self.get_config("storage", "readonly", False, boolean=True)
148
149         storedir = os.path.join(self.basedir, self.STOREDIR)
150
151         sizelimit = None
152         data = self.get_config("storage", "sizelimit", None)
153         if data:
154             m = re.match(r"^(\d+)([kKmMgG]?[bB]?)$", data)
155             if not m:
156                 log.msg("SIZELIMIT_FILE contains unparseable value %s" % data)
157             else:
158                 number, suffix = m.groups()
159                 suffix = suffix.upper()
160                 if suffix.endswith("B"):
161                     suffix = suffix[:-1]
162                 multiplier = {"": 1,
163                               "K": 1000,
164                               "M": 1000 * 1000,
165                               "G": 1000 * 1000 * 1000,
166                               }[suffix]
167                 sizelimit = int(number) * multiplier
168         discard = self.get_config("storage", "debug_discard", False,
169                                   boolean=True)
170         ss = StorageServer(storedir, sizelimit, discard, readonly,
171                            self.stats_provider)
172         self.add_service(ss)
173         d = self.when_tub_ready()
174         # we can't do registerReference until the Tub is ready
175         def _publish(res):
176             furl_file = os.path.join(self.basedir, "private", "storage.furl")
177             furl = self.tub.registerReference(ss, furlFile=furl_file)
178             ri_name = RIStorageServer.__remote_name__
179             self.introducer_client.publish(furl, "storage", ri_name)
180         d.addCallback(_publish)
181         d.addErrback(log.err, facility="tahoe.init",
182                      level=log.BAD, umid="aLGBKw")
183
184     def init_client(self):
185         helper_furl = self.get_config("client", "helper.furl", None)
186         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
187         self.convergence = base32.a2b(convergence_s)
188         self._node_cache = weakref.WeakValueDictionary() # uri -> node
189         self.add_service(Uploader(helper_furl, self.stats_provider))
190         self.add_service(Downloader(self.stats_provider))
191         self.add_service(MutableWatcher(self.stats_provider))
192         def _publish(res):
193             # we publish an empty object so that the introducer can count how
194             # many clients are connected and see what versions they're
195             # running.
196             sc = StubClient()
197             furl = self.tub.registerReference(sc)
198             ri_name = RIStubClient.__remote_name__
199             self.introducer_client.publish(furl, "stub_client", ri_name)
200         d = self.when_tub_ready()
201         d.addCallback(_publish)
202         d.addErrback(log.err, facility="tahoe.init",
203                      level=log.BAD, umid="OEHq3g")
204
205     def init_control(self):
206         d = self.when_tub_ready()
207         def _publish(res):
208             c = ControlServer()
209             c.setServiceParent(self)
210             control_url = self.tub.registerReference(c)
211             self.write_private_config("control.furl", control_url + "\n")
212         d.addCallback(_publish)
213         d.addErrback(log.err, facility="tahoe.init",
214                      level=log.BAD, umid="d3tNXA")
215
216     def init_helper(self):
217         d = self.when_tub_ready()
218         def _publish(self):
219             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
220             h.setServiceParent(self)
221             # TODO: this is confusing. BASEDIR/private/helper.furl is created
222             # by the helper. BASEDIR/helper.furl is consumed by the client
223             # who wants to use the helper. I like having the filename be the
224             # same, since that makes 'cp' work smoothly, but the difference
225             # between config inputs and generated outputs is hard to see.
226             helper_furlfile = os.path.join(self.basedir,
227                                            "private", "helper.furl")
228             self.tub.registerReference(h, furlFile=helper_furlfile)
229         d.addCallback(_publish)
230         d.addErrback(log.err, facility="tahoe.init",
231                      level=log.BAD, umid="K0mW5w")
232
233     def init_key_gen(self, key_gen_furl):
234         d = self.when_tub_ready()
235         def _subscribe(self):
236             self.tub.connectTo(key_gen_furl, self._got_key_generator)
237         d.addCallback(_subscribe)
238         d.addErrback(log.err, facility="tahoe.init",
239                      level=log.BAD, umid="z9DMzw")
240
241     def _got_key_generator(self, key_generator):
242         self._key_generator = key_generator
243         key_generator.notifyOnDisconnect(self._lost_key_generator)
244
245     def _lost_key_generator(self):
246         self._key_generator = None
247
248     def init_web(self, webport):
249         self.log("init_web(webport=%s)", args=(webport,))
250
251         from allmydata.webish import WebishServer
252         nodeurl_path = os.path.join(self.basedir, "node.url")
253         ws = WebishServer(webport, nodeurl_path)
254         self.add_service(ws)
255
256     def _check_hotline(self, hotline_file):
257         if os.path.exists(hotline_file):
258             mtime = os.stat(hotline_file)[stat.ST_MTIME]
259             if mtime > time.time() - 20.0:
260                 return
261             else:
262                 self.log("hotline file too old, shutting down")
263         else:
264             self.log("hotline file missing, shutting down")
265         reactor.stop()
266
267     def get_all_peerids(self):
268         return self.introducer_client.get_all_peerids()
269     def get_nickname_for_peerid(self, peerid):
270         return self.introducer_client.get_nickname_for_peerid(peerid)
271
272     def get_permuted_peers(self, service_name, key):
273         """
274         @return: list of (peerid, connection,)
275         """
276         assert isinstance(service_name, str)
277         assert isinstance(key, str)
278         return self.introducer_client.get_permuted_peers(service_name, key)
279
280     def get_encoding_parameters(self):
281         return self.DEFAULT_ENCODING_PARAMETERS
282
283     def connected_to_introducer(self):
284         if self.introducer_client:
285             return self.introducer_client.connected_to_introducer()
286         return False
287
288     def get_renewal_secret(self):
289         return hashutil.my_renewal_secret_hash(self._lease_secret)
290
291     def get_cancel_secret(self):
292         return hashutil.my_cancel_secret_hash(self._lease_secret)
293
294     def debug_wait_for_client_connections(self, num_clients):
295         """Return a Deferred that fires (with None) when we have connections
296         to the given number of peers. Useful for tests that set up a
297         temporary test network and need to know when it is safe to proceed
298         with an upload or download."""
299         def _check():
300             current_clients = list(self.get_all_peerids())
301             return len(current_clients) >= num_clients
302         d = self.poll(_check, 0.5)
303         d.addCallback(lambda res: None)
304         return d
305
306
307     # these four methods are the primitives for creating filenodes and
308     # dirnodes. The first takes a URI and produces a filenode or (new-style)
309     # dirnode. The other three create brand-new filenodes/dirnodes.
310
311     def create_node_from_uri(self, u):
312         # this returns synchronously.
313         u = IURI(u)
314         u_s = u.to_string()
315         if u_s not in self._node_cache:
316             if IReadonlyNewDirectoryURI.providedBy(u):
317                 # new-style read-only dirnodes
318                 node = NewDirectoryNode(self).init_from_uri(u)
319             elif INewDirectoryURI.providedBy(u):
320                 # new-style dirnodes
321                 node = NewDirectoryNode(self).init_from_uri(u)
322             elif IFileURI.providedBy(u):
323                 if isinstance(u, LiteralFileURI):
324                     node = LiteralFileNode(u, self) # LIT
325                 else:
326                     node = FileNode(u, self) # CHK
327             else:
328                 assert IMutableFileURI.providedBy(u), u
329                 node = MutableFileNode(self).init_from_uri(u)
330             self._node_cache[u_s] = node
331         return self._node_cache[u_s]
332
333     def notify_publish(self, publish_status, size):
334         self.getServiceNamed("mutable-watcher").notify_publish(publish_status,
335                                                                size)
336     def notify_retrieve(self, retrieve_status):
337         self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
338     def notify_mapupdate(self, update_status):
339         self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status)
340
341     def create_empty_dirnode(self):
342         n = NewDirectoryNode(self)
343         d = n.create(self._generate_pubprivkeys)
344         d.addCallback(lambda res: n)
345         return d
346
347     def create_mutable_file(self, contents=""):
348         n = MutableFileNode(self)
349         d = n.create(contents, self._generate_pubprivkeys)
350         d.addCallback(lambda res: n)
351         return d
352
353     def _generate_pubprivkeys(self, key_size):
354         if self._key_generator:
355             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
356             def make_key_objs((verifying_key, signing_key)):
357                 v = rsa.create_verifying_key_from_string(verifying_key)
358                 s = rsa.create_signing_key_from_string(signing_key)
359                 return v, s
360             d.addCallback(make_key_objs)
361             return d
362         else:
363             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
364             # secs
365             signer = rsa.generate(key_size)
366             verifier = signer.get_verifying_key()
367             return verifier, signer
368
369     def upload(self, uploadable):
370         uploader = self.getServiceNamed("uploader")
371         return uploader.upload(uploadable)
372
373
374     def list_all_upload_statuses(self):
375         uploader = self.getServiceNamed("uploader")
376         return uploader.list_all_upload_statuses()
377
378     def list_all_download_statuses(self):
379         downloader = self.getServiceNamed("downloader")
380         return downloader.list_all_download_statuses()
381
382     def list_all_mapupdate_statuses(self):
383         watcher = self.getServiceNamed("mutable-watcher")
384         return watcher.list_all_mapupdate_statuses()
385     def list_all_publish_statuses(self):
386         watcher = self.getServiceNamed("mutable-watcher")
387         return watcher.list_all_publish_statuses()
388     def list_all_retrieve_statuses(self):
389         watcher = self.getServiceNamed("mutable-watcher")
390         return watcher.list_all_retrieve_statuses()
391
392     def list_all_helper_statuses(self):
393         try:
394             helper = self.getServiceNamed("helper")
395         except KeyError:
396             return []
397         return helper.get_all_upload_statuses()
398