]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
#527: expire the cached files that are used to support Range: headers, every hour...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1
2 import os, stat, time, re, weakref
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
5
6 from zope.interface import implements
7 from twisted.internet import reactor
8 from twisted.application.internet import TimerService
9 from foolscap import Referenceable
10 from foolscap.logging import log
11 from pycryptopp.publickey import rsa
12
13 import allmydata
14 from allmydata.storage import StorageServer
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.download import Downloader
17 from allmydata.immutable.filenode import FileNode, LiteralFileNode
18 from allmydata.offloaded import Helper
19 from allmydata.control import ControlServer
20 from allmydata.introducer.client import IntroducerClient
21 from allmydata.util import hashutil, base32, pollmixin, cachedir
22 from allmydata.uri import LiteralFileURI
23 from allmydata.dirnode import NewDirectoryNode
24 from allmydata.mutable.node import MutableFileNode, MutableWatcher
25 from allmydata.stats import StatsProvider
26 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
27      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
28
29 KiB=1024
30 MiB=1024*KiB
31 GiB=1024*MiB
32 TiB=1024*GiB
33 PiB=1024*TiB
34
35 class StubClient(Referenceable):
36     implements(RIStubClient)
37
38 def _make_secret():
39     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
40
41 class Client(node.Node, pollmixin.PollMixin):
42     implements(IStatsProducer)
43
44     PORTNUMFILE = "client.port"
45     STOREDIR = 'storage'
46     NODETYPE = "client"
47     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
48
49     # This means that if a storage server treats me as though I were a
50     # 1.0.0 storage client, it will work as they expect.
51     OLDEST_SUPPORTED_VERSION = "1.0.0"
52
53     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
54     # is the number of shares required to reconstruct a file. 'desired' means
55     # that we will abort an upload unless we can allocate space for at least
56     # this many. 'total' is the total number of shares created by encoding.
57     # If everybody has room then this is is how many we will upload.
58     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
59                                    "happy": 7,
60                                    "n": 10,
61                                    "max_segment_size": 128*KiB,
62                                    }
63
64     def __init__(self, basedir="."):
65         node.Node.__init__(self, basedir)
66         self.started_timestamp = time.time()
67         self.logSource="Client"
68         self.init_introducer_client()
69         self.init_stats_provider()
70         self.init_lease_secret()
71         self.init_storage()
72         self.init_control()
73         if self.get_config("helper", "enabled", False, boolean=True):
74             self.init_helper()
75         self.init_client()
76         self._key_generator = None
77         key_gen_furl = self.get_config("client", "key_generator.furl", None)
78         if key_gen_furl:
79             self.init_key_gen(key_gen_furl)
80         # ControlServer and Helper are attached after Tub startup
81         self.init_ftp_server()
82
83         hotline_file = os.path.join(self.basedir,
84                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
85         if os.path.exists(hotline_file):
86             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
87             self.log("hotline file noticed (%ds old), starting timer" % age)
88             hotline = TimerService(1.0, self._check_hotline, hotline_file)
89             hotline.setServiceParent(self)
90
91         webport = self.get_config("node", "web.port", None)
92         if webport:
93             self.init_web(webport) # strports string
94
95     def read_old_config_files(self):
96         node.Node.read_old_config_files(self)
97         copy = self._copy_config_from_file
98         copy("introducer.furl", "client", "introducer.furl")
99         copy("helper.furl", "client", "helper.furl")
100         copy("key_generator.furl", "client", "key_generator.furl")
101         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
102         if os.path.exists(os.path.join(self.basedir, "no_storage")):
103             self.set_config("storage", "enabled", "false")
104         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
105             self.set_config("storage", "readonly", "true")
106         copy("sizelimit", "storage", "sizelimit")
107         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
108             self.set_config("storage", "debug_discard", "true")
109         if os.path.exists(os.path.join(self.basedir, "run_helper")):
110             self.set_config("helper", "enabled", "true")
111
112     def init_introducer_client(self):
113         self.introducer_furl = self.get_config("client", "introducer.furl")
114         ic = IntroducerClient(self.tub, self.introducer_furl,
115                               self.nickname,
116                               str(allmydata.__version__),
117                               str(self.OLDEST_SUPPORTED_VERSION))
118         self.introducer_client = ic
119         # hold off on starting the IntroducerClient until our tub has been
120         # started, so we'll have a useful address on our RemoteReference, so
121         # that the introducer's status page will show us.
122         d = self.when_tub_ready()
123         def _start_introducer_client(res):
124             ic.setServiceParent(self)
125             # nodes that want to upload and download will need storage servers
126             ic.subscribe_to("storage")
127         d.addCallback(_start_introducer_client)
128         d.addErrback(log.err, facility="tahoe.init",
129                      level=log.BAD, umid="URyI5w")
130
131     def init_stats_provider(self):
132         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
133         self.stats_provider = StatsProvider(self, gatherer_furl)
134         self.add_service(self.stats_provider)
135         self.stats_provider.register_producer(self)
136
137     def get_stats(self):
138         return { 'node.uptime': time.time() - self.started_timestamp }
139
140     def init_lease_secret(self):
141         secret_s = self.get_or_create_private_config("secret", _make_secret)
142         self._lease_secret = base32.a2b(secret_s)
143
144     def init_storage(self):
145         # should we run a storage server (and publish it for others to use)?
146         if not self.get_config("storage", "enabled", True, boolean=True):
147             return
148         readonly = self.get_config("storage", "readonly", False, boolean=True)
149
150         storedir = os.path.join(self.basedir, self.STOREDIR)
151
152         sizelimit = None
153         data = self.get_config("storage", "sizelimit", None)
154         if data:
155             m = re.match(r"^(\d+)([kKmMgG]?[bB]?)$", data)
156             if not m:
157                 log.msg("SIZELIMIT_FILE contains unparseable value %s" % data)
158             else:
159                 number, suffix = m.groups()
160                 suffix = suffix.upper()
161                 if suffix.endswith("B"):
162                     suffix = suffix[:-1]
163                 multiplier = {"": 1,
164                               "K": 1000,
165                               "M": 1000 * 1000,
166                               "G": 1000 * 1000 * 1000,
167                               }[suffix]
168                 sizelimit = int(number) * multiplier
169         discard = self.get_config("storage", "debug_discard", False,
170                                   boolean=True)
171         ss = StorageServer(storedir, sizelimit, discard, readonly,
172                            self.stats_provider)
173         self.add_service(ss)
174         d = self.when_tub_ready()
175         # we can't do registerReference until the Tub is ready
176         def _publish(res):
177             furl_file = os.path.join(self.basedir, "private", "storage.furl")
178             furl = self.tub.registerReference(ss, furlFile=furl_file)
179             ri_name = RIStorageServer.__remote_name__
180             self.introducer_client.publish(furl, "storage", ri_name)
181         d.addCallback(_publish)
182         d.addErrback(log.err, facility="tahoe.init",
183                      level=log.BAD, umid="aLGBKw")
184
185     def init_client(self):
186         helper_furl = self.get_config("client", "helper.furl", None)
187         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
188         self.convergence = base32.a2b(convergence_s)
189         self._node_cache = weakref.WeakValueDictionary() # uri -> node
190         self.add_service(Uploader(helper_furl, self.stats_provider))
191         download_cachedir = os.path.join(self.basedir,
192                                          "private", "cache", "download")
193         self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
194         self.download_cache.setServiceParent(self)
195         self.add_service(Downloader(self.stats_provider))
196         self.add_service(MutableWatcher(self.stats_provider))
197         def _publish(res):
198             # we publish an empty object so that the introducer can count how
199             # many clients are connected and see what versions they're
200             # running.
201             sc = StubClient()
202             furl = self.tub.registerReference(sc)
203             ri_name = RIStubClient.__remote_name__
204             self.introducer_client.publish(furl, "stub_client", ri_name)
205         d = self.when_tub_ready()
206         d.addCallback(_publish)
207         d.addErrback(log.err, facility="tahoe.init",
208                      level=log.BAD, umid="OEHq3g")
209
210     def init_control(self):
211         d = self.when_tub_ready()
212         def _publish(res):
213             c = ControlServer()
214             c.setServiceParent(self)
215             control_url = self.tub.registerReference(c)
216             self.write_private_config("control.furl", control_url + "\n")
217         d.addCallback(_publish)
218         d.addErrback(log.err, facility="tahoe.init",
219                      level=log.BAD, umid="d3tNXA")
220
221     def init_helper(self):
222         d = self.when_tub_ready()
223         def _publish(self):
224             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
225             h.setServiceParent(self)
226             # TODO: this is confusing. BASEDIR/private/helper.furl is created
227             # by the helper. BASEDIR/helper.furl is consumed by the client
228             # who wants to use the helper. I like having the filename be the
229             # same, since that makes 'cp' work smoothly, but the difference
230             # between config inputs and generated outputs is hard to see.
231             helper_furlfile = os.path.join(self.basedir,
232                                            "private", "helper.furl")
233             self.tub.registerReference(h, furlFile=helper_furlfile)
234         d.addCallback(_publish)
235         d.addErrback(log.err, facility="tahoe.init",
236                      level=log.BAD, umid="K0mW5w")
237
238     def init_key_gen(self, key_gen_furl):
239         d = self.when_tub_ready()
240         def _subscribe(self):
241             self.tub.connectTo(key_gen_furl, self._got_key_generator)
242         d.addCallback(_subscribe)
243         d.addErrback(log.err, facility="tahoe.init",
244                      level=log.BAD, umid="z9DMzw")
245
246     def _got_key_generator(self, key_generator):
247         self._key_generator = key_generator
248         key_generator.notifyOnDisconnect(self._lost_key_generator)
249
250     def _lost_key_generator(self):
251         self._key_generator = None
252
253     def init_web(self, webport):
254         self.log("init_web(webport=%s)", args=(webport,))
255
256         from allmydata.webish import WebishServer
257         nodeurl_path = os.path.join(self.basedir, "node.url")
258         staticdir = self.get_config("node", "web.static", "public_html")
259         staticdir = os.path.expanduser(staticdir)
260         ws = WebishServer(webport, nodeurl_path, staticdir)
261         self.add_service(ws)
262
263     def init_ftp_server(self):
264         if self.get_config("ftpd", "enabled", False, boolean=True):
265             accountfile = self.get_config("ftpd", "ftp.accounts.file", None)
266             accounturl = self.get_config("ftpd", "ftp.accounts.url", None)
267             ftp_portstr = self.get_config("ftpd", "ftp.port", "8021")
268
269             from allmydata import ftpd
270             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
271             s.setServiceParent(self)
272
273     def _check_hotline(self, hotline_file):
274         if os.path.exists(hotline_file):
275             mtime = os.stat(hotline_file)[stat.ST_MTIME]
276             if mtime > time.time() - 20.0:
277                 return
278             else:
279                 self.log("hotline file too old, shutting down")
280         else:
281             self.log("hotline file missing, shutting down")
282         reactor.stop()
283
284     def get_all_peerids(self):
285         return self.introducer_client.get_all_peerids()
286     def get_nickname_for_peerid(self, peerid):
287         return self.introducer_client.get_nickname_for_peerid(peerid)
288
289     def get_permuted_peers(self, service_name, key):
290         """
291         @return: list of (peerid, connection,)
292         """
293         assert isinstance(service_name, str)
294         assert isinstance(key, str)
295         return self.introducer_client.get_permuted_peers(service_name, key)
296
297     def get_encoding_parameters(self):
298         return self.DEFAULT_ENCODING_PARAMETERS
299
300     def connected_to_introducer(self):
301         if self.introducer_client:
302             return self.introducer_client.connected_to_introducer()
303         return False
304
305     def get_renewal_secret(self):
306         return hashutil.my_renewal_secret_hash(self._lease_secret)
307
308     def get_cancel_secret(self):
309         return hashutil.my_cancel_secret_hash(self._lease_secret)
310
311     def debug_wait_for_client_connections(self, num_clients):
312         """Return a Deferred that fires (with None) when we have connections
313         to the given number of peers. Useful for tests that set up a
314         temporary test network and need to know when it is safe to proceed
315         with an upload or download."""
316         def _check():
317             current_clients = list(self.get_all_peerids())
318             return len(current_clients) >= num_clients
319         d = self.poll(_check, 0.5)
320         d.addCallback(lambda res: None)
321         return d
322
323
324     # these four methods are the primitives for creating filenodes and
325     # dirnodes. The first takes a URI and produces a filenode or (new-style)
326     # dirnode. The other three create brand-new filenodes/dirnodes.
327
328     def create_node_from_uri(self, u):
329         # this returns synchronously.
330         u = IURI(u)
331         u_s = u.to_string()
332         if u_s not in self._node_cache:
333             if IReadonlyNewDirectoryURI.providedBy(u):
334                 # new-style read-only dirnodes
335                 node = NewDirectoryNode(self).init_from_uri(u)
336             elif INewDirectoryURI.providedBy(u):
337                 # new-style dirnodes
338                 node = NewDirectoryNode(self).init_from_uri(u)
339             elif IFileURI.providedBy(u):
340                 if isinstance(u, LiteralFileURI):
341                     node = LiteralFileNode(u, self) # LIT
342                 else:
343                     key = base32.b2a(u.storage_index)
344                     cachefile = self.download_cache.get_file(key)
345                     node = FileNode(u, self, cachefile) # CHK
346             else:
347                 assert IMutableFileURI.providedBy(u), u
348                 node = MutableFileNode(self).init_from_uri(u)
349             self._node_cache[u_s] = node
350         return self._node_cache[u_s]
351
352     def notify_publish(self, publish_status, size):
353         self.getServiceNamed("mutable-watcher").notify_publish(publish_status,
354                                                                size)
355     def notify_retrieve(self, retrieve_status):
356         self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
357     def notify_mapupdate(self, update_status):
358         self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status)
359
360     def create_empty_dirnode(self):
361         n = NewDirectoryNode(self)
362         d = n.create(self._generate_pubprivkeys)
363         d.addCallback(lambda res: n)
364         return d
365
366     def create_mutable_file(self, contents=""):
367         n = MutableFileNode(self)
368         d = n.create(contents, self._generate_pubprivkeys)
369         d.addCallback(lambda res: n)
370         return d
371
372     def _generate_pubprivkeys(self, key_size):
373         if self._key_generator:
374             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
375             def make_key_objs((verifying_key, signing_key)):
376                 v = rsa.create_verifying_key_from_string(verifying_key)
377                 s = rsa.create_signing_key_from_string(signing_key)
378                 return v, s
379             d.addCallback(make_key_objs)
380             return d
381         else:
382             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
383             # secs
384             signer = rsa.generate(key_size)
385             verifier = signer.get_verifying_key()
386             return verifier, signer
387
388     def upload(self, uploadable):
389         uploader = self.getServiceNamed("uploader")
390         return uploader.upload(uploadable)
391
392
393     def list_all_upload_statuses(self):
394         uploader = self.getServiceNamed("uploader")
395         return uploader.list_all_upload_statuses()
396
397     def list_all_download_statuses(self):
398         downloader = self.getServiceNamed("downloader")
399         return downloader.list_all_download_statuses()
400
401     def list_all_mapupdate_statuses(self):
402         watcher = self.getServiceNamed("mutable-watcher")
403         return watcher.list_all_mapupdate_statuses()
404     def list_all_publish_statuses(self):
405         watcher = self.getServiceNamed("mutable-watcher")
406         return watcher.list_all_publish_statuses()
407     def list_all_retrieve_statuses(self):
408         watcher = self.getServiceNamed("mutable-watcher")
409         return watcher.list_all_retrieve_statuses()
410
411     def list_all_helper_statuses(self):
412         try:
413             helper = self.getServiceNamed("helper")
414         except KeyError:
415             return []
416         return helper.get_all_upload_statuses()
417