]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
util: move PollMixin to a separate file (pollmixin.py), so testutil can be moved...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1
2 import os, stat, time, re, weakref
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
5
6 from zope.interface import implements
7 from twisted.internet import reactor
8 from twisted.application.internet import TimerService
9 from foolscap import Referenceable
10 from foolscap.logging import log
11 from pycryptopp.publickey import rsa
12
13 import allmydata
14 from allmydata.storage import StorageServer
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.download import Downloader
17 from allmydata.immutable.filenode import FileNode, LiteralFileNode
18 from allmydata.offloaded import Helper
19 from allmydata.control import ControlServer
20 from allmydata.introducer.client import IntroducerClient
21 from allmydata.util import hashutil, base32, pollmixin, fileutil
22 from allmydata.uri import LiteralFileURI
23 from allmydata.dirnode import NewDirectoryNode
24 from allmydata.mutable.node import MutableFileNode, MutableWatcher
25 from allmydata.stats import StatsProvider
26 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
27      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
28
29 KiB=1024
30 MiB=1024*KiB
31 GiB=1024*MiB
32 TiB=1024*GiB
33 PiB=1024*TiB
34
35 class StubClient(Referenceable):
36     implements(RIStubClient)
37
38 def _make_secret():
39     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
40
41 class Client(node.Node, pollmixin.PollMixin):
42     implements(IStatsProducer)
43
44     PORTNUMFILE = "client.port"
45     STOREDIR = 'storage'
46     NODETYPE = "client"
47     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
48
49     # This means that if a storage server treats me as though I were a
50     # 1.0.0 storage client, it will work as they expect.
51     OLDEST_SUPPORTED_VERSION = "1.0.0"
52
53     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
54     # is the number of shares required to reconstruct a file. 'desired' means
55     # that we will abort an upload unless we can allocate space for at least
56     # this many. 'total' is the total number of shares created by encoding.
57     # If everybody has room then this is is how many we will upload.
58     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
59                                    "happy": 7,
60                                    "n": 10,
61                                    "max_segment_size": 128*KiB,
62                                    }
63
64     def __init__(self, basedir="."):
65         node.Node.__init__(self, basedir)
66         self.started_timestamp = time.time()
67         self.logSource="Client"
68         self.init_introducer_client()
69         self.init_stats_provider()
70         self.init_lease_secret()
71         self.init_storage()
72         self.init_control()
73         if self.get_config("helper", "enabled", False, boolean=True):
74             self.init_helper()
75         self.init_client()
76         self._key_generator = None
77         key_gen_furl = self.get_config("client", "key_generator.furl", None)
78         if key_gen_furl:
79             self.init_key_gen(key_gen_furl)
80         # ControlServer and Helper are attached after Tub startup
81         self.init_ftp_server()
82
83         hotline_file = os.path.join(self.basedir,
84                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
85         if os.path.exists(hotline_file):
86             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
87             self.log("hotline file noticed (%ds old), starting timer" % age)
88             hotline = TimerService(1.0, self._check_hotline, hotline_file)
89             hotline.setServiceParent(self)
90
91         webport = self.get_config("node", "web.port", None)
92         if webport:
93             self.init_web(webport) # strports string
94
95     def read_old_config_files(self):
96         node.Node.read_old_config_files(self)
97         copy = self._copy_config_from_file
98         copy("introducer.furl", "client", "introducer.furl")
99         copy("helper.furl", "client", "helper.furl")
100         copy("key_generator.furl", "client", "key_generator.furl")
101         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
102         if os.path.exists(os.path.join(self.basedir, "no_storage")):
103             self.set_config("storage", "enabled", "false")
104         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
105             self.set_config("storage", "readonly", "true")
106         copy("sizelimit", "storage", "sizelimit")
107         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
108             self.set_config("storage", "debug_discard", "true")
109         if os.path.exists(os.path.join(self.basedir, "run_helper")):
110             self.set_config("helper", "enabled", "true")
111
112     def init_introducer_client(self):
113         self.introducer_furl = self.get_config("client", "introducer.furl")
114         ic = IntroducerClient(self.tub, self.introducer_furl,
115                               self.nickname,
116                               str(allmydata.__version__),
117                               str(self.OLDEST_SUPPORTED_VERSION))
118         self.introducer_client = ic
119         # hold off on starting the IntroducerClient until our tub has been
120         # started, so we'll have a useful address on our RemoteReference, so
121         # that the introducer's status page will show us.
122         d = self.when_tub_ready()
123         def _start_introducer_client(res):
124             ic.setServiceParent(self)
125             # nodes that want to upload and download will need storage servers
126             ic.subscribe_to("storage")
127         d.addCallback(_start_introducer_client)
128         d.addErrback(log.err, facility="tahoe.init",
129                      level=log.BAD, umid="URyI5w")
130
131     def init_stats_provider(self):
132         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
133         self.stats_provider = StatsProvider(self, gatherer_furl)
134         self.add_service(self.stats_provider)
135         self.stats_provider.register_producer(self)
136
137     def get_stats(self):
138         return { 'node.uptime': time.time() - self.started_timestamp }
139
140     def init_lease_secret(self):
141         secret_s = self.get_or_create_private_config("secret", _make_secret)
142         self._lease_secret = base32.a2b(secret_s)
143
144     def init_storage(self):
145         # should we run a storage server (and publish it for others to use)?
146         if not self.get_config("storage", "enabled", True, boolean=True):
147             return
148         readonly = self.get_config("storage", "readonly", False, boolean=True)
149
150         storedir = os.path.join(self.basedir, self.STOREDIR)
151
152         sizelimit = None
153         data = self.get_config("storage", "sizelimit", None)
154         if data:
155             m = re.match(r"^(\d+)([kKmMgG]?[bB]?)$", data)
156             if not m:
157                 log.msg("SIZELIMIT_FILE contains unparseable value %s" % data)
158             else:
159                 number, suffix = m.groups()
160                 suffix = suffix.upper()
161                 if suffix.endswith("B"):
162                     suffix = suffix[:-1]
163                 multiplier = {"": 1,
164                               "K": 1000,
165                               "M": 1000 * 1000,
166                               "G": 1000 * 1000 * 1000,
167                               }[suffix]
168                 sizelimit = int(number) * multiplier
169         discard = self.get_config("storage", "debug_discard", False,
170                                   boolean=True)
171         ss = StorageServer(storedir, sizelimit, discard, readonly,
172                            self.stats_provider)
173         self.add_service(ss)
174         d = self.when_tub_ready()
175         # we can't do registerReference until the Tub is ready
176         def _publish(res):
177             furl_file = os.path.join(self.basedir, "private", "storage.furl")
178             furl = self.tub.registerReference(ss, furlFile=furl_file)
179             ri_name = RIStorageServer.__remote_name__
180             self.introducer_client.publish(furl, "storage", ri_name)
181         d.addCallback(_publish)
182         d.addErrback(log.err, facility="tahoe.init",
183                      level=log.BAD, umid="aLGBKw")
184
185     def init_client(self):
186         helper_furl = self.get_config("client", "helper.furl", None)
187         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
188         self.convergence = base32.a2b(convergence_s)
189         self._node_cache = weakref.WeakValueDictionary() # uri -> node
190         self.add_service(Uploader(helper_furl, self.stats_provider))
191         self.download_cachedir = os.path.join(self.basedir,
192                                               "private", "cache", "download")
193         fileutil.make_dirs(self.download_cachedir)
194         self.add_service(Downloader(self.stats_provider))
195         self.add_service(MutableWatcher(self.stats_provider))
196         def _publish(res):
197             # we publish an empty object so that the introducer can count how
198             # many clients are connected and see what versions they're
199             # running.
200             sc = StubClient()
201             furl = self.tub.registerReference(sc)
202             ri_name = RIStubClient.__remote_name__
203             self.introducer_client.publish(furl, "stub_client", ri_name)
204         d = self.when_tub_ready()
205         d.addCallback(_publish)
206         d.addErrback(log.err, facility="tahoe.init",
207                      level=log.BAD, umid="OEHq3g")
208
209     def init_control(self):
210         d = self.when_tub_ready()
211         def _publish(res):
212             c = ControlServer()
213             c.setServiceParent(self)
214             control_url = self.tub.registerReference(c)
215             self.write_private_config("control.furl", control_url + "\n")
216         d.addCallback(_publish)
217         d.addErrback(log.err, facility="tahoe.init",
218                      level=log.BAD, umid="d3tNXA")
219
220     def init_helper(self):
221         d = self.when_tub_ready()
222         def _publish(self):
223             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
224             h.setServiceParent(self)
225             # TODO: this is confusing. BASEDIR/private/helper.furl is created
226             # by the helper. BASEDIR/helper.furl is consumed by the client
227             # who wants to use the helper. I like having the filename be the
228             # same, since that makes 'cp' work smoothly, but the difference
229             # between config inputs and generated outputs is hard to see.
230             helper_furlfile = os.path.join(self.basedir,
231                                            "private", "helper.furl")
232             self.tub.registerReference(h, furlFile=helper_furlfile)
233         d.addCallback(_publish)
234         d.addErrback(log.err, facility="tahoe.init",
235                      level=log.BAD, umid="K0mW5w")
236
237     def init_key_gen(self, key_gen_furl):
238         d = self.when_tub_ready()
239         def _subscribe(self):
240             self.tub.connectTo(key_gen_furl, self._got_key_generator)
241         d.addCallback(_subscribe)
242         d.addErrback(log.err, facility="tahoe.init",
243                      level=log.BAD, umid="z9DMzw")
244
245     def _got_key_generator(self, key_generator):
246         self._key_generator = key_generator
247         key_generator.notifyOnDisconnect(self._lost_key_generator)
248
249     def _lost_key_generator(self):
250         self._key_generator = None
251
252     def init_web(self, webport):
253         self.log("init_web(webport=%s)", args=(webport,))
254
255         from allmydata.webish import WebishServer
256         nodeurl_path = os.path.join(self.basedir, "node.url")
257         ws = WebishServer(webport, nodeurl_path)
258         self.add_service(ws)
259
260     def init_ftp_server(self):
261         if self.get_config("ftpd", "enabled", False, boolean=True):
262             accountfile = self.get_config("ftpd", "ftp.accounts.file", None)
263             accounturl = self.get_config("ftpd", "ftp.accounts.url", None)
264             ftp_portstr = self.get_config("ftpd", "ftp.port", "8021")
265
266             from allmydata import ftpd
267             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
268             s.setServiceParent(self)
269
270     def _check_hotline(self, hotline_file):
271         if os.path.exists(hotline_file):
272             mtime = os.stat(hotline_file)[stat.ST_MTIME]
273             if mtime > time.time() - 20.0:
274                 return
275             else:
276                 self.log("hotline file too old, shutting down")
277         else:
278             self.log("hotline file missing, shutting down")
279         reactor.stop()
280
281     def get_all_peerids(self):
282         return self.introducer_client.get_all_peerids()
283     def get_nickname_for_peerid(self, peerid):
284         return self.introducer_client.get_nickname_for_peerid(peerid)
285
286     def get_permuted_peers(self, service_name, key):
287         """
288         @return: list of (peerid, connection,)
289         """
290         assert isinstance(service_name, str)
291         assert isinstance(key, str)
292         return self.introducer_client.get_permuted_peers(service_name, key)
293
294     def get_encoding_parameters(self):
295         return self.DEFAULT_ENCODING_PARAMETERS
296
297     def connected_to_introducer(self):
298         if self.introducer_client:
299             return self.introducer_client.connected_to_introducer()
300         return False
301
302     def get_renewal_secret(self):
303         return hashutil.my_renewal_secret_hash(self._lease_secret)
304
305     def get_cancel_secret(self):
306         return hashutil.my_cancel_secret_hash(self._lease_secret)
307
308     def debug_wait_for_client_connections(self, num_clients):
309         """Return a Deferred that fires (with None) when we have connections
310         to the given number of peers. Useful for tests that set up a
311         temporary test network and need to know when it is safe to proceed
312         with an upload or download."""
313         def _check():
314             current_clients = list(self.get_all_peerids())
315             return len(current_clients) >= num_clients
316         d = self.poll(_check, 0.5)
317         d.addCallback(lambda res: None)
318         return d
319
320
321     # these four methods are the primitives for creating filenodes and
322     # dirnodes. The first takes a URI and produces a filenode or (new-style)
323     # dirnode. The other three create brand-new filenodes/dirnodes.
324
325     def create_node_from_uri(self, u):
326         # this returns synchronously.
327         u = IURI(u)
328         u_s = u.to_string()
329         if u_s not in self._node_cache:
330             if IReadonlyNewDirectoryURI.providedBy(u):
331                 # new-style read-only dirnodes
332                 node = NewDirectoryNode(self).init_from_uri(u)
333             elif INewDirectoryURI.providedBy(u):
334                 # new-style dirnodes
335                 node = NewDirectoryNode(self).init_from_uri(u)
336             elif IFileURI.providedBy(u):
337                 if isinstance(u, LiteralFileURI):
338                     node = LiteralFileNode(u, self) # LIT
339                 else:
340                     cachefile = os.path.join(self.download_cachedir,
341                                              base32.b2a(u.storage_index))
342                     # TODO: cachefile manager, weakref, expire policy
343                     node = FileNode(u, self, cachefile) # CHK
344             else:
345                 assert IMutableFileURI.providedBy(u), u
346                 node = MutableFileNode(self).init_from_uri(u)
347             self._node_cache[u_s] = node
348         return self._node_cache[u_s]
349
350     def notify_publish(self, publish_status, size):
351         self.getServiceNamed("mutable-watcher").notify_publish(publish_status,
352                                                                size)
353     def notify_retrieve(self, retrieve_status):
354         self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
355     def notify_mapupdate(self, update_status):
356         self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status)
357
358     def create_empty_dirnode(self):
359         n = NewDirectoryNode(self)
360         d = n.create(self._generate_pubprivkeys)
361         d.addCallback(lambda res: n)
362         return d
363
364     def create_mutable_file(self, contents=""):
365         n = MutableFileNode(self)
366         d = n.create(contents, self._generate_pubprivkeys)
367         d.addCallback(lambda res: n)
368         return d
369
370     def _generate_pubprivkeys(self, key_size):
371         if self._key_generator:
372             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
373             def make_key_objs((verifying_key, signing_key)):
374                 v = rsa.create_verifying_key_from_string(verifying_key)
375                 s = rsa.create_signing_key_from_string(signing_key)
376                 return v, s
377             d.addCallback(make_key_objs)
378             return d
379         else:
380             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
381             # secs
382             signer = rsa.generate(key_size)
383             verifier = signer.get_verifying_key()
384             return verifier, signer
385
386     def upload(self, uploadable):
387         uploader = self.getServiceNamed("uploader")
388         return uploader.upload(uploadable)
389
390
391     def list_all_upload_statuses(self):
392         uploader = self.getServiceNamed("uploader")
393         return uploader.list_all_upload_statuses()
394
395     def list_all_download_statuses(self):
396         downloader = self.getServiceNamed("downloader")
397         return downloader.list_all_download_statuses()
398
399     def list_all_mapupdate_statuses(self):
400         watcher = self.getServiceNamed("mutable-watcher")
401         return watcher.list_all_mapupdate_statuses()
402     def list_all_publish_statuses(self):
403         watcher = self.getServiceNamed("mutable-watcher")
404         return watcher.list_all_publish_statuses()
405     def list_all_retrieve_statuses(self):
406         watcher = self.getServiceNamed("mutable-watcher")
407         return watcher.list_all_retrieve_statuses()
408
409     def list_all_helper_statuses(self):
410         try:
411             helper = self.getServiceNamed("helper")
412         except KeyError:
413             return []
414         return helper.get_all_upload_statuses()
415