]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
introducer: add get_nickname_for_peerid
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1
2 import os, stat, time, re, weakref
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
5
6 from zope.interface import implements
7 from twisted.internet import reactor
8 from twisted.application.internet import TimerService
9 from foolscap import Referenceable
10 from foolscap.logging import log
11 from pycryptopp.publickey import rsa
12
13 import allmydata
14 from allmydata.storage import StorageServer
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.download import Downloader
17 from allmydata.immutable.filenode import FileNode, LiteralFileNode
18 from allmydata.offloaded import Helper
19 from allmydata.control import ControlServer
20 from allmydata.introducer.client import IntroducerClient
21 from allmydata.util import hashutil, base32, testutil
22 from allmydata.uri import LiteralFileURI
23 from allmydata.dirnode import NewDirectoryNode
24 from allmydata.mutable.node import MutableFileNode, MutableWatcher
25 from allmydata.stats import StatsProvider
26 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
27      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
28
29 KiB=1024
30 MiB=1024*KiB
31 GiB=1024*MiB
32 TiB=1024*GiB
33 PiB=1024*TiB
34
35 class StubClient(Referenceable):
36     implements(RIStubClient)
37
38 def _make_secret():
39     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
40
41 class Client(node.Node, testutil.PollMixin):
42     implements(IStatsProducer)
43
44     PORTNUMFILE = "client.port"
45     STOREDIR = 'storage'
46     NODETYPE = "client"
47     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
48
49     # This means that if a storage server treats me as though I were a
50     # 1.0.0 storage client, it will work as they expect.
51     OLDEST_SUPPORTED_VERSION = "1.0.0"
52
53     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
54     # is the number of shares required to reconstruct a file. 'desired' means
55     # that we will abort an upload unless we can allocate space for at least
56     # this many. 'total' is the total number of shares created by encoding.
57     # If everybody has room then this is is how many we will upload.
58     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
59                                    "happy": 7,
60                                    "n": 10,
61                                    "max_segment_size": 128*KiB,
62                                    }
63
64     def __init__(self, basedir="."):
65         node.Node.__init__(self, basedir)
66         self.started_timestamp = time.time()
67         self.logSource="Client"
68         self.nickname = self.get_config("nickname")
69         if self.nickname is None:
70             self.nickname = "<unspecified>"
71         self.init_introducer_client()
72         self.init_stats_provider()
73         self.init_lease_secret()
74         self.init_storage()
75         self.init_control()
76         run_helper = self.get_config("run_helper")
77         if run_helper:
78             self.init_helper()
79         self.init_client()
80         self._key_generator = None
81         key_gen_furl = self.get_config('key_generator.furl')
82         if key_gen_furl:
83             self.init_key_gen(key_gen_furl)
84         # ControlServer and Helper are attached after Tub startup
85
86         hotline_file = os.path.join(self.basedir,
87                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
88         if os.path.exists(hotline_file):
89             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
90             self.log("hotline file noticed (%ds old), starting timer" % age)
91             hotline = TimerService(1.0, self._check_hotline, hotline_file)
92             hotline.setServiceParent(self)
93
94         webport = self.get_config("webport")
95         if webport:
96             self.init_web(webport) # strports string
97
98     def init_introducer_client(self):
99         self.introducer_furl = self.get_config("introducer.furl", required=True)
100         ic = IntroducerClient(self.tub, self.introducer_furl,
101                               self.nickname,
102                               str(allmydata.__version__),
103                               str(self.OLDEST_SUPPORTED_VERSION))
104         self.introducer_client = ic
105         # hold off on starting the IntroducerClient until our tub has been
106         # started, so we'll have a useful address on our RemoteReference, so
107         # that the introducer's status page will show us.
108         d = self.when_tub_ready()
109         def _start_introducer_client(res):
110             ic.setServiceParent(self)
111             # nodes that want to upload and download will need storage servers
112             ic.subscribe_to("storage")
113         d.addCallback(_start_introducer_client)
114         d.addErrback(log.err, facility="tahoe.init",
115                      level=log.BAD, umid="URyI5w")
116
117     def init_stats_provider(self):
118         gatherer_furl = self.get_config('stats_gatherer.furl')
119         self.stats_provider = StatsProvider(self, gatherer_furl)
120         self.add_service(self.stats_provider)
121         self.stats_provider.register_producer(self)
122
123     def get_stats(self):
124         return { 'node.uptime': time.time() - self.started_timestamp }
125
126     def init_lease_secret(self):
127         secret_s = self.get_or_create_private_config("secret", _make_secret)
128         self._lease_secret = base32.a2b(secret_s)
129
130     def init_storage(self):
131         # should we run a storage server (and publish it for others to use)?
132         provide_storage = (self.get_config("no_storage") is None)
133         if not provide_storage:
134             return
135         readonly_storage = (self.get_config("readonly_storage") is not None)
136
137         storedir = os.path.join(self.basedir, self.STOREDIR)
138         sizelimit = None
139
140         data = self.get_config("sizelimit")
141         if data:
142             m = re.match(r"^(\d+)([kKmMgG]?[bB]?)$", data)
143             if not m:
144                 log.msg("SIZELIMIT_FILE contains unparseable value %s" % data)
145             else:
146                 number, suffix = m.groups()
147                 suffix = suffix.upper()
148                 if suffix.endswith("B"):
149                     suffix = suffix[:-1]
150                 multiplier = {"": 1,
151                               "K": 1000,
152                               "M": 1000 * 1000,
153                               "G": 1000 * 1000 * 1000,
154                               }[suffix]
155                 sizelimit = int(number) * multiplier
156         discard_storage = self.get_config("debug_discard_storage") is not None
157         ss = StorageServer(storedir, sizelimit,
158                            discard_storage, readonly_storage,
159                            self.stats_provider)
160         self.add_service(ss)
161         d = self.when_tub_ready()
162         # we can't do registerReference until the Tub is ready
163         def _publish(res):
164             furl_file = os.path.join(self.basedir, "private", "storage.furl")
165             furl = self.tub.registerReference(ss, furlFile=furl_file)
166             ri_name = RIStorageServer.__remote_name__
167             self.introducer_client.publish(furl, "storage", ri_name)
168         d.addCallback(_publish)
169         d.addErrback(log.err, facility="tahoe.init",
170                      level=log.BAD, umid="aLGBKw")
171
172     def init_client(self):
173         helper_furl = self.get_config("helper.furl")
174         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
175         self.convergence = base32.a2b(convergence_s)
176         self._node_cache = weakref.WeakValueDictionary() # uri -> node
177         self.add_service(Uploader(helper_furl, self.stats_provider))
178         self.add_service(Downloader(self.stats_provider))
179         self.add_service(MutableWatcher(self.stats_provider))
180         def _publish(res):
181             # we publish an empty object so that the introducer can count how
182             # many clients are connected and see what versions they're
183             # running.
184             sc = StubClient()
185             furl = self.tub.registerReference(sc)
186             ri_name = RIStubClient.__remote_name__
187             self.introducer_client.publish(furl, "stub_client", ri_name)
188         d = self.when_tub_ready()
189         d.addCallback(_publish)
190         d.addErrback(log.err, facility="tahoe.init",
191                      level=log.BAD, umid="OEHq3g")
192
193     def init_control(self):
194         d = self.when_tub_ready()
195         def _publish(res):
196             c = ControlServer()
197             c.setServiceParent(self)
198             control_url = self.tub.registerReference(c)
199             self.write_private_config("control.furl", control_url + "\n")
200         d.addCallback(_publish)
201         d.addErrback(log.err, facility="tahoe.init",
202                      level=log.BAD, umid="d3tNXA")
203
204     def init_helper(self):
205         d = self.when_tub_ready()
206         def _publish(self):
207             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
208             h.setServiceParent(self)
209             # TODO: this is confusing. BASEDIR/private/helper.furl is created
210             # by the helper. BASEDIR/helper.furl is consumed by the client
211             # who wants to use the helper. I like having the filename be the
212             # same, since that makes 'cp' work smoothly, but the difference
213             # between config inputs and generated outputs is hard to see.
214             helper_furlfile = os.path.join(self.basedir,
215                                            "private", "helper.furl")
216             self.tub.registerReference(h, furlFile=helper_furlfile)
217         d.addCallback(_publish)
218         d.addErrback(log.err, facility="tahoe.init",
219                      level=log.BAD, umid="K0mW5w")
220
221     def init_key_gen(self, key_gen_furl):
222         d = self.when_tub_ready()
223         def _subscribe(self):
224             self.tub.connectTo(key_gen_furl, self._got_key_generator)
225         d.addCallback(_subscribe)
226         d.addErrback(log.err, facility="tahoe.init",
227                      level=log.BAD, umid="z9DMzw")
228
229     def _got_key_generator(self, key_generator):
230         self._key_generator = key_generator
231         key_generator.notifyOnDisconnect(self._lost_key_generator)
232
233     def _lost_key_generator(self):
234         self._key_generator = None
235
236     def init_web(self, webport):
237         self.log("init_web(webport=%s)", args=(webport,))
238
239         from allmydata.webish import WebishServer
240         nodeurl_path = os.path.join(self.basedir, "node.url")
241         ws = WebishServer(webport, nodeurl_path)
242         if self.get_config("webport_allow_localfile") is not None:
243             ws.allow_local_access(True)
244         self.add_service(ws)
245
246     def _check_hotline(self, hotline_file):
247         if os.path.exists(hotline_file):
248             mtime = os.stat(hotline_file)[stat.ST_MTIME]
249             if mtime > time.time() - 20.0:
250                 return
251             else:
252                 self.log("hotline file too old, shutting down")
253         else:
254             self.log("hotline file missing, shutting down")
255         reactor.stop()
256
257     def get_all_peerids(self):
258         return self.introducer_client.get_all_peerids()
259     def get_nickname_for_peerid(self, peerid):
260         return self.introducer_client.get_nickname_for_peerid(peerid)
261
262     def get_permuted_peers(self, service_name, key):
263         """
264         @return: list of (peerid, connection,)
265         """
266         assert isinstance(service_name, str)
267         assert isinstance(key, str)
268         return self.introducer_client.get_permuted_peers(service_name, key)
269
270     def get_encoding_parameters(self):
271         return self.DEFAULT_ENCODING_PARAMETERS
272
273     def connected_to_introducer(self):
274         if self.introducer_client:
275             return self.introducer_client.connected_to_introducer()
276         return False
277
278     def get_renewal_secret(self):
279         return hashutil.my_renewal_secret_hash(self._lease_secret)
280
281     def get_cancel_secret(self):
282         return hashutil.my_cancel_secret_hash(self._lease_secret)
283
284     def debug_wait_for_client_connections(self, num_clients):
285         """Return a Deferred that fires (with None) when we have connections
286         to the given number of peers. Useful for tests that set up a
287         temporary test network and need to know when it is safe to proceed
288         with an upload or download."""
289         def _check():
290             current_clients = list(self.get_all_peerids())
291             return len(current_clients) >= num_clients
292         d = self.poll(_check, 0.5)
293         d.addCallback(lambda res: None)
294         return d
295
296
297     # these four methods are the primitives for creating filenodes and
298     # dirnodes. The first takes a URI and produces a filenode or (new-style)
299     # dirnode. The other three create brand-new filenodes/dirnodes.
300
301     def create_node_from_uri(self, u):
302         # this returns synchronously.
303         u = IURI(u)
304         u_s = u.to_string()
305         if u_s not in self._node_cache:
306             if IReadonlyNewDirectoryURI.providedBy(u):
307                 # new-style read-only dirnodes
308                 node = NewDirectoryNode(self).init_from_uri(u)
309             elif INewDirectoryURI.providedBy(u):
310                 # new-style dirnodes
311                 node = NewDirectoryNode(self).init_from_uri(u)
312             elif IFileURI.providedBy(u):
313                 if isinstance(u, LiteralFileURI):
314                     node = LiteralFileNode(u, self) # LIT
315                 else:
316                     node = FileNode(u, self) # CHK
317             else:
318                 assert IMutableFileURI.providedBy(u), u
319                 node = MutableFileNode(self).init_from_uri(u)
320             self._node_cache[u_s] = node
321         return self._node_cache[u_s]
322
323     def notify_publish(self, publish_status, size):
324         self.getServiceNamed("mutable-watcher").notify_publish(publish_status,
325                                                                size)
326     def notify_retrieve(self, retrieve_status):
327         self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
328     def notify_mapupdate(self, update_status):
329         self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status)
330
331     def create_empty_dirnode(self):
332         n = NewDirectoryNode(self)
333         d = n.create(self._generate_pubprivkeys)
334         d.addCallback(lambda res: n)
335         return d
336
337     def create_mutable_file(self, contents=""):
338         n = MutableFileNode(self)
339         d = n.create(contents, self._generate_pubprivkeys)
340         d.addCallback(lambda res: n)
341         return d
342
343     def _generate_pubprivkeys(self, key_size):
344         if self._key_generator:
345             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
346             def make_key_objs((verifying_key, signing_key)):
347                 v = rsa.create_verifying_key_from_string(verifying_key)
348                 s = rsa.create_signing_key_from_string(signing_key)
349                 return v, s
350             d.addCallback(make_key_objs)
351             return d
352         else:
353             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
354             # secs
355             signer = rsa.generate(key_size)
356             verifier = signer.get_verifying_key()
357             return verifier, signer
358
359     def upload(self, uploadable):
360         uploader = self.getServiceNamed("uploader")
361         return uploader.upload(uploadable)
362
363
364     def list_all_upload_statuses(self):
365         uploader = self.getServiceNamed("uploader")
366         return uploader.list_all_upload_statuses()
367
368     def list_all_download_statuses(self):
369         downloader = self.getServiceNamed("downloader")
370         return downloader.list_all_download_statuses()
371
372     def list_all_mapupdate_statuses(self):
373         watcher = self.getServiceNamed("mutable-watcher")
374         return watcher.list_all_mapupdate_statuses()
375     def list_all_publish_statuses(self):
376         watcher = self.getServiceNamed("mutable-watcher")
377         return watcher.list_all_publish_statuses()
378     def list_all_retrieve_statuses(self):
379         watcher = self.getServiceNamed("mutable-watcher")
380         return watcher.list_all_retrieve_statuses()
381
382     def list_all_helper_statuses(self):
383         try:
384             helper = self.getServiceNamed("helper")
385         except KeyError:
386             return []
387         return helper.get_all_upload_statuses()
388