]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
break introducer up into separate modules in the new allmydata.introducer package
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1
2 import os, stat, time, re, weakref
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
5
6 from zope.interface import implements
7 from twisted.internet import reactor
8 from twisted.application.internet import TimerService
9 from foolscap import Referenceable
10 from foolscap.logging import log
11 from pycryptopp.publickey import rsa
12
13 import allmydata
14 from allmydata.storage import StorageServer
15 from allmydata.upload import Uploader
16 from allmydata.download import Downloader
17 from allmydata.checker import Checker
18 from allmydata.offloaded import Helper
19 from allmydata.control import ControlServer
20 from allmydata.introducer.client import IntroducerClient
21 from allmydata.util import hashutil, base32, testutil
22 from allmydata.filenode import FileNode
23 from allmydata.dirnode import NewDirectoryNode
24 from allmydata.mutable.node import MutableFileNode, MutableWatcher
25 from allmydata.stats import StatsProvider
26 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
27      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
28
29 KiB=1024
30 MiB=1024*KiB
31 GiB=1024*MiB
32 TiB=1024*GiB
33 PiB=1024*TiB
34
35 class StubClient(Referenceable):
36     implements(RIStubClient)
37
38 def _make_secret():
39     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
40
41 class Client(node.Node, testutil.PollMixin):
42     implements(IStatsProducer)
43
44     PORTNUMFILE = "client.port"
45     STOREDIR = 'storage'
46     NODETYPE = "client"
47     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
48
49     # we're pretty narrow-minded right now
50     OLDEST_SUPPORTED_VERSION = allmydata.__version__
51
52     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
53     # is the number of shares required to reconstruct a file. 'desired' means
54     # that we will abort an upload unless we can allocate space for at least
55     # this many. 'total' is the total number of shares created by encoding.
56     # If everybody has room then this is is how many we will upload.
57     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
58                                    "happy": 7,
59                                    "n": 10,
60                                    "max_segment_size": 128*KiB,
61                                    }
62
63     def __init__(self, basedir="."):
64         node.Node.__init__(self, basedir)
65         self.started_timestamp = time.time()
66         self.logSource="Client"
67         self.nickname = self.get_config("nickname")
68         if self.nickname is None:
69             self.nickname = "<unspecified>"
70         self.init_introducer_client()
71         self.init_stats_provider()
72         self.init_lease_secret()
73         self.init_storage()
74         self.init_control()
75         run_helper = self.get_config("run_helper")
76         if run_helper:
77             self.init_helper()
78         self.init_client()
79         self._key_generator = None
80         key_gen_furl = self.get_config('key_generator.furl')
81         if key_gen_furl:
82             self.init_key_gen(key_gen_furl)
83         # ControlServer and Helper are attached after Tub startup
84
85         hotline_file = os.path.join(self.basedir,
86                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
87         if os.path.exists(hotline_file):
88             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
89             self.log("hotline file noticed (%ds old), starting timer" % age)
90             hotline = TimerService(1.0, self._check_hotline, hotline_file)
91             hotline.setServiceParent(self)
92
93         webport = self.get_config("webport")
94         if webport:
95             self.init_web(webport) # strports string
96
97     def init_introducer_client(self):
98         self.introducer_furl = self.get_config("introducer.furl", required=True)
99         ic = IntroducerClient(self.tub, self.introducer_furl,
100                               self.nickname,
101                               str(allmydata.__version__),
102                               str(self.OLDEST_SUPPORTED_VERSION))
103         self.introducer_client = ic
104         # hold off on starting the IntroducerClient until our tub has been
105         # started, so we'll have a useful address on our RemoteReference, so
106         # that the introducer's status page will show us.
107         d = self.when_tub_ready()
108         def _start_introducer_client(res):
109             ic.setServiceParent(self)
110             # nodes that want to upload and download will need storage servers
111             ic.subscribe_to("storage")
112         d.addCallback(_start_introducer_client)
113         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
114
115     def init_stats_provider(self):
116         gatherer_furl = self.get_config('stats_gatherer.furl')
117         self.stats_provider = StatsProvider(self, gatherer_furl)
118         self.add_service(self.stats_provider)
119         self.stats_provider.register_producer(self)
120
121     def get_stats(self):
122         return { 'node.uptime': time.time() - self.started_timestamp }
123
124     def init_lease_secret(self):
125         secret_s = self.get_or_create_private_config("secret", _make_secret)
126         self._lease_secret = base32.a2b(secret_s)
127
128     def init_storage(self):
129         # should we run a storage server (and publish it for others to use)?
130         provide_storage = (self.get_config("no_storage") is None)
131         if not provide_storage:
132             return
133         readonly_storage = (self.get_config("readonly_storage") is not None)
134
135         storedir = os.path.join(self.basedir, self.STOREDIR)
136         sizelimit = None
137
138         data = self.get_config("sizelimit")
139         if data:
140             m = re.match(r"^(\d+)([kKmMgG]?[bB]?)$", data)
141             if not m:
142                 log.msg("SIZELIMIT_FILE contains unparseable value %s" % data)
143             else:
144                 number, suffix = m.groups()
145                 suffix = suffix.upper()
146                 if suffix.endswith("B"):
147                     suffix = suffix[:-1]
148                 multiplier = {"": 1,
149                               "K": 1000,
150                               "M": 1000 * 1000,
151                               "G": 1000 * 1000 * 1000,
152                               }[suffix]
153                 sizelimit = int(number) * multiplier
154         discard_storage = self.get_config("debug_discard_storage") is not None
155         ss = StorageServer(storedir, sizelimit,
156                            discard_storage, readonly_storage,
157                            self.stats_provider)
158         self.add_service(ss)
159         d = self.when_tub_ready()
160         # we can't do registerReference until the Tub is ready
161         def _publish(res):
162             furl_file = os.path.join(self.basedir, "private", "storage.furl")
163             furl = self.tub.registerReference(ss, furlFile=furl_file)
164             ri_name = RIStorageServer.__remote_name__
165             self.introducer_client.publish(furl, "storage", ri_name)
166         d.addCallback(_publish)
167         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
168
169     def init_client(self):
170         helper_furl = self.get_config("helper.furl")
171         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
172         self.convergence = base32.a2b(convergence_s)
173         self._node_cache = weakref.WeakValueDictionary() # uri -> node
174         self.add_service(Uploader(helper_furl, self.stats_provider))
175         self.add_service(Downloader(self.stats_provider))
176         self.add_service(Checker())
177         self.add_service(MutableWatcher(self.stats_provider))
178         def _publish(res):
179             # we publish an empty object so that the introducer can count how
180             # many clients are connected and see what versions they're
181             # running.
182             sc = StubClient()
183             furl = self.tub.registerReference(sc)
184             ri_name = RIStubClient.__remote_name__
185             self.introducer_client.publish(furl, "stub_client", ri_name)
186         d = self.when_tub_ready()
187         d.addCallback(_publish)
188         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
189
190     def init_control(self):
191         d = self.when_tub_ready()
192         def _publish(res):
193             c = ControlServer()
194             c.setServiceParent(self)
195             control_url = self.tub.registerReference(c)
196             self.write_private_config("control.furl", control_url + "\n")
197         d.addCallback(_publish)
198         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
199
200     def init_helper(self):
201         d = self.when_tub_ready()
202         def _publish(self):
203             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
204             h.setServiceParent(self)
205             # TODO: this is confusing. BASEDIR/private/helper.furl is created
206             # by the helper. BASEDIR/helper.furl is consumed by the client
207             # who wants to use the helper. I like having the filename be the
208             # same, since that makes 'cp' work smoothly, but the difference
209             # between config inputs and generated outputs is hard to see.
210             helper_furlfile = os.path.join(self.basedir,
211                                            "private", "helper.furl")
212             self.tub.registerReference(h, furlFile=helper_furlfile)
213         d.addCallback(_publish)
214         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
215
216     def init_key_gen(self, key_gen_furl):
217         d = self.when_tub_ready()
218         def _subscribe(self):
219             self.tub.connectTo(key_gen_furl, self._got_key_generator)
220         d.addCallback(_subscribe)
221         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
222
223     def _got_key_generator(self, key_generator):
224         self._key_generator = key_generator
225         key_generator.notifyOnDisconnect(self._lost_key_generator)
226
227     def _lost_key_generator(self):
228         self._key_generator = None
229
230     def init_web(self, webport):
231         self.log("init_web(webport=%s)", args=(webport,))
232
233         from allmydata.webish import WebishServer
234         nodeurl_path = os.path.join(self.basedir, "node.url")
235         ws = WebishServer(webport, nodeurl_path)
236         if self.get_config("webport_allow_localfile") is not None:
237             ws.allow_local_access(True)
238         self.add_service(ws)
239
240     def _check_hotline(self, hotline_file):
241         if os.path.exists(hotline_file):
242             mtime = os.stat(hotline_file)[stat.ST_MTIME]
243             if mtime > time.time() - 20.0:
244                 return
245             else:
246                 self.log("hotline file too old, shutting down")
247         else:
248             self.log("hotline file missing, shutting down")
249         reactor.stop()
250
251     def get_all_peerids(self):
252         return self.introducer_client.get_all_peerids()
253
254     def get_permuted_peers(self, service_name, key):
255         """
256         @return: list of (peerid, connection,)
257         """
258         assert isinstance(service_name, str)
259         assert isinstance(key, str)
260         return self.introducer_client.get_permuted_peers(service_name, key)
261
262     def get_encoding_parameters(self):
263         return self.DEFAULT_ENCODING_PARAMETERS
264
265     def connected_to_introducer(self):
266         if self.introducer_client:
267             return self.introducer_client.connected_to_introducer()
268         return False
269
270     def get_renewal_secret(self):
271         return hashutil.my_renewal_secret_hash(self._lease_secret)
272
273     def get_cancel_secret(self):
274         return hashutil.my_cancel_secret_hash(self._lease_secret)
275
276     def debug_wait_for_client_connections(self, num_clients):
277         """Return a Deferred that fires (with None) when we have connections
278         to the given number of peers. Useful for tests that set up a
279         temporary test network and need to know when it is safe to proceed
280         with an upload or download."""
281         def _check():
282             current_clients = list(self.get_all_peerids())
283             return len(current_clients) >= num_clients
284         d = self.poll(_check, 0.5)
285         d.addCallback(lambda res: None)
286         return d
287
288
289     # these four methods are the primitives for creating filenodes and
290     # dirnodes. The first takes a URI and produces a filenode or (new-style)
291     # dirnode. The other three create brand-new filenodes/dirnodes.
292
293     def create_node_from_uri(self, u):
294         # this returns synchronously.
295         u = IURI(u)
296         u_s = u.to_string()
297         if u_s not in self._node_cache:
298             if IReadonlyNewDirectoryURI.providedBy(u):
299                 # new-style read-only dirnodes
300                 node = NewDirectoryNode(self).init_from_uri(u)
301             elif INewDirectoryURI.providedBy(u):
302                 # new-style dirnodes
303                 node = NewDirectoryNode(self).init_from_uri(u)
304             elif IFileURI.providedBy(u):
305                 # CHK
306                 node = FileNode(u, self)
307             else:
308                 assert IMutableFileURI.providedBy(u), u
309                 node = MutableFileNode(self).init_from_uri(u)
310             self._node_cache[u_s] = node
311         return self._node_cache[u_s]
312
313     def notify_publish(self, publish_status, size):
314         self.getServiceNamed("mutable-watcher").notify_publish(publish_status,
315                                                                size)
316     def notify_retrieve(self, retrieve_status):
317         self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
318     def notify_mapupdate(self, update_status):
319         self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status)
320
321     def create_empty_dirnode(self):
322         n = NewDirectoryNode(self)
323         d = n.create(self._generate_pubprivkeys)
324         d.addCallback(lambda res: n)
325         return d
326
327     def create_mutable_file(self, contents=""):
328         n = MutableFileNode(self)
329         d = n.create(contents, self._generate_pubprivkeys)
330         d.addCallback(lambda res: n)
331         return d
332
333     def _generate_pubprivkeys(self, key_size):
334         if self._key_generator:
335             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
336             def make_key_objs((verifying_key, signing_key)):
337                 v = rsa.create_verifying_key_from_string(verifying_key)
338                 s = rsa.create_signing_key_from_string(signing_key)
339                 return v, s
340             d.addCallback(make_key_objs)
341             return d
342         else:
343             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
344             # secs
345             signer = rsa.generate(key_size)
346             verifier = signer.get_verifying_key()
347             return verifier, signer
348
349     def upload(self, uploadable):
350         uploader = self.getServiceNamed("uploader")
351         return uploader.upload(uploadable)
352
353
354     def list_all_upload_statuses(self):
355         uploader = self.getServiceNamed("uploader")
356         return uploader.list_all_upload_statuses()
357
358     def list_all_download_statuses(self):
359         downloader = self.getServiceNamed("downloader")
360         return downloader.list_all_download_statuses()
361
362     def list_all_mapupdate_statuses(self):
363         watcher = self.getServiceNamed("mutable-watcher")
364         return watcher.list_all_mapupdate_statuses()
365     def list_all_publish_statuses(self):
366         watcher = self.getServiceNamed("mutable-watcher")
367         return watcher.list_all_publish_statuses()
368     def list_all_retrieve_statuses(self):
369         watcher = self.getServiceNamed("mutable-watcher")
370         return watcher.list_all_retrieve_statuses()
371
372     def list_all_helper_statuses(self):
373         try:
374             helper = self.getServiceNamed("helper")
375         except KeyError:
376             return []
377         return helper.get_all_upload_statuses()
378