]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
tahoe.cfg: add controls for k and N (and shares-of-happiness)
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1
2 import os, stat, time, re, weakref
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
5
6 from zope.interface import implements
7 from twisted.internet import reactor
8 from twisted.application.internet import TimerService
9 from foolscap import Referenceable
10 from foolscap.logging import log
11 from pycryptopp.publickey import rsa
12
13 import allmydata
14 from allmydata.storage import StorageServer
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.download import Downloader
17 from allmydata.immutable.filenode import FileNode, LiteralFileNode
18 from allmydata.offloaded import Helper
19 from allmydata.control import ControlServer
20 from allmydata.introducer.client import IntroducerClient
21 from allmydata.util import hashutil, base32, pollmixin, cachedir
22 from allmydata.uri import LiteralFileURI
23 from allmydata.dirnode import NewDirectoryNode
24 from allmydata.mutable.node import MutableFileNode, MutableWatcher
25 from allmydata.stats import StatsProvider
26 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
27      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
28
29 KiB=1024
30 MiB=1024*KiB
31 GiB=1024*MiB
32 TiB=1024*GiB
33 PiB=1024*TiB
34
35 class StubClient(Referenceable):
36     implements(RIStubClient)
37
38 def _make_secret():
39     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
40
41 class Client(node.Node, pollmixin.PollMixin):
42     implements(IStatsProducer)
43
44     PORTNUMFILE = "client.port"
45     STOREDIR = 'storage'
46     NODETYPE = "client"
47     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
48
49     # This means that if a storage server treats me as though I were a
50     # 1.0.0 storage client, it will work as they expect.
51     OLDEST_SUPPORTED_VERSION = "1.0.0"
52
53     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
54     # is the number of shares required to reconstruct a file. 'desired' means
55     # that we will abort an upload unless we can allocate space for at least
56     # this many. 'total' is the total number of shares created by encoding.
57     # If everybody has room then this is is how many we will upload.
58     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
59                                    "happy": 7,
60                                    "n": 10,
61                                    "max_segment_size": 128*KiB,
62                                    }
63
64     def __init__(self, basedir="."):
65         node.Node.__init__(self, basedir)
66         self.started_timestamp = time.time()
67         self.logSource="Client"
68         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
69         self.init_introducer_client()
70         self.init_stats_provider()
71         self.init_lease_secret()
72         self.init_storage()
73         self.init_control()
74         if self.get_config("helper", "enabled", False, boolean=True):
75             self.init_helper()
76         self.init_client()
77         self._key_generator = None
78         key_gen_furl = self.get_config("client", "key_generator.furl", None)
79         if key_gen_furl:
80             self.init_key_gen(key_gen_furl)
81         # ControlServer and Helper are attached after Tub startup
82         self.init_ftp_server()
83         self.init_sftp_server()
84
85         hotline_file = os.path.join(self.basedir,
86                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
87         if os.path.exists(hotline_file):
88             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
89             self.log("hotline file noticed (%ds old), starting timer" % age)
90             hotline = TimerService(1.0, self._check_hotline, hotline_file)
91             hotline.setServiceParent(self)
92
93         webport = self.get_config("node", "web.port", None)
94         if webport:
95             self.init_web(webport) # strports string
96
97     def read_old_config_files(self):
98         node.Node.read_old_config_files(self)
99         copy = self._copy_config_from_file
100         copy("introducer.furl", "client", "introducer.furl")
101         copy("helper.furl", "client", "helper.furl")
102         copy("key_generator.furl", "client", "key_generator.furl")
103         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
104         if os.path.exists(os.path.join(self.basedir, "no_storage")):
105             self.set_config("storage", "enabled", "false")
106         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
107             self.set_config("storage", "readonly", "true")
108         copy("sizelimit", "storage", "sizelimit")
109         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
110             self.set_config("storage", "debug_discard", "true")
111         if os.path.exists(os.path.join(self.basedir, "run_helper")):
112             self.set_config("helper", "enabled", "true")
113
114     def init_introducer_client(self):
115         self.introducer_furl = self.get_config("client", "introducer.furl")
116         ic = IntroducerClient(self.tub, self.introducer_furl,
117                               self.nickname,
118                               str(allmydata.__version__),
119                               str(self.OLDEST_SUPPORTED_VERSION))
120         self.introducer_client = ic
121         # hold off on starting the IntroducerClient until our tub has been
122         # started, so we'll have a useful address on our RemoteReference, so
123         # that the introducer's status page will show us.
124         d = self.when_tub_ready()
125         def _start_introducer_client(res):
126             ic.setServiceParent(self)
127             # nodes that want to upload and download will need storage servers
128             ic.subscribe_to("storage")
129         d.addCallback(_start_introducer_client)
130         d.addErrback(log.err, facility="tahoe.init",
131                      level=log.BAD, umid="URyI5w")
132
133     def init_stats_provider(self):
134         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
135         self.stats_provider = StatsProvider(self, gatherer_furl)
136         self.add_service(self.stats_provider)
137         self.stats_provider.register_producer(self)
138
139     def get_stats(self):
140         return { 'node.uptime': time.time() - self.started_timestamp }
141
142     def init_lease_secret(self):
143         secret_s = self.get_or_create_private_config("secret", _make_secret)
144         self._lease_secret = base32.a2b(secret_s)
145
146     def init_storage(self):
147         # should we run a storage server (and publish it for others to use)?
148         if not self.get_config("storage", "enabled", True, boolean=True):
149             return
150         readonly = self.get_config("storage", "readonly", False, boolean=True)
151
152         storedir = os.path.join(self.basedir, self.STOREDIR)
153
154         sizelimit = None
155         data = self.get_config("storage", "sizelimit", None)
156         if data:
157             m = re.match(r"^(\d+)([kKmMgG]?[bB]?)$", data)
158             if not m:
159                 log.msg("SIZELIMIT_FILE contains unparseable value %s" % data)
160             else:
161                 number, suffix = m.groups()
162                 suffix = suffix.upper()
163                 if suffix.endswith("B"):
164                     suffix = suffix[:-1]
165                 multiplier = {"": 1,
166                               "K": 1000,
167                               "M": 1000 * 1000,
168                               "G": 1000 * 1000 * 1000,
169                               }[suffix]
170                 sizelimit = int(number) * multiplier
171         discard = self.get_config("storage", "debug_discard", False,
172                                   boolean=True)
173         ss = StorageServer(storedir, sizelimit, discard, readonly,
174                            self.stats_provider)
175         self.add_service(ss)
176         d = self.when_tub_ready()
177         # we can't do registerReference until the Tub is ready
178         def _publish(res):
179             furl_file = os.path.join(self.basedir, "private", "storage.furl")
180             furl = self.tub.registerReference(ss, furlFile=furl_file)
181             ri_name = RIStorageServer.__remote_name__
182             self.introducer_client.publish(furl, "storage", ri_name)
183         d.addCallback(_publish)
184         d.addErrback(log.err, facility="tahoe.init",
185                      level=log.BAD, umid="aLGBKw")
186
187     def init_client(self):
188         helper_furl = self.get_config("client", "helper.furl", None)
189         DEP = self.DEFAULT_ENCODING_PARAMETERS
190         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
191         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
192         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
193         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
194         self.convergence = base32.a2b(convergence_s)
195         self._node_cache = weakref.WeakValueDictionary() # uri -> node
196         self.add_service(Uploader(helper_furl, self.stats_provider))
197         download_cachedir = os.path.join(self.basedir,
198                                          "private", "cache", "download")
199         self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
200         self.download_cache.setServiceParent(self)
201         self.add_service(Downloader(self.stats_provider))
202         self.add_service(MutableWatcher(self.stats_provider))
203         def _publish(res):
204             # we publish an empty object so that the introducer can count how
205             # many clients are connected and see what versions they're
206             # running.
207             sc = StubClient()
208             furl = self.tub.registerReference(sc)
209             ri_name = RIStubClient.__remote_name__
210             self.introducer_client.publish(furl, "stub_client", ri_name)
211         d = self.when_tub_ready()
212         d.addCallback(_publish)
213         d.addErrback(log.err, facility="tahoe.init",
214                      level=log.BAD, umid="OEHq3g")
215
216     def init_control(self):
217         d = self.when_tub_ready()
218         def _publish(res):
219             c = ControlServer()
220             c.setServiceParent(self)
221             control_url = self.tub.registerReference(c)
222             self.write_private_config("control.furl", control_url + "\n")
223         d.addCallback(_publish)
224         d.addErrback(log.err, facility="tahoe.init",
225                      level=log.BAD, umid="d3tNXA")
226
227     def init_helper(self):
228         d = self.when_tub_ready()
229         def _publish(self):
230             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
231             h.setServiceParent(self)
232             # TODO: this is confusing. BASEDIR/private/helper.furl is created
233             # by the helper. BASEDIR/helper.furl is consumed by the client
234             # who wants to use the helper. I like having the filename be the
235             # same, since that makes 'cp' work smoothly, but the difference
236             # between config inputs and generated outputs is hard to see.
237             helper_furlfile = os.path.join(self.basedir,
238                                            "private", "helper.furl")
239             self.tub.registerReference(h, furlFile=helper_furlfile)
240         d.addCallback(_publish)
241         d.addErrback(log.err, facility="tahoe.init",
242                      level=log.BAD, umid="K0mW5w")
243
244     def init_key_gen(self, key_gen_furl):
245         d = self.when_tub_ready()
246         def _subscribe(self):
247             self.tub.connectTo(key_gen_furl, self._got_key_generator)
248         d.addCallback(_subscribe)
249         d.addErrback(log.err, facility="tahoe.init",
250                      level=log.BAD, umid="z9DMzw")
251
252     def _got_key_generator(self, key_generator):
253         self._key_generator = key_generator
254         key_generator.notifyOnDisconnect(self._lost_key_generator)
255
256     def _lost_key_generator(self):
257         self._key_generator = None
258
259     def init_web(self, webport):
260         self.log("init_web(webport=%s)", args=(webport,))
261
262         from allmydata.webish import WebishServer
263         nodeurl_path = os.path.join(self.basedir, "node.url")
264         staticdir = self.get_config("node", "web.static", "public_html")
265         staticdir = os.path.expanduser(staticdir)
266         ws = WebishServer(webport, nodeurl_path, staticdir)
267         self.add_service(ws)
268
269     def init_ftp_server(self):
270         if self.get_config("ftpd", "enabled", False, boolean=True):
271             accountfile = self.get_config("ftpd", "accounts.file", None)
272             accounturl = self.get_config("ftpd", "accounts.url", None)
273             ftp_portstr = self.get_config("ftpd", "port", "8021")
274
275             from allmydata.frontends import ftpd
276             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
277             s.setServiceParent(self)
278
279     def init_sftp_server(self):
280         if self.get_config("sftpd", "enabled", False, boolean=True):
281             accountfile = self.get_config("sftpd", "accounts.file", None)
282             accounturl = self.get_config("sftpd", "accounts.url", None)
283             sftp_portstr = self.get_config("sftpd", "port", "8022")
284             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
285             privkey_file = self.get_config("sftpd", "host_privkey_file")
286
287             from allmydata.frontends import sftpd
288             s = sftpd.SFTPServer(self, accountfile, accounturl,
289                                  sftp_portstr, pubkey_file, privkey_file)
290             s.setServiceParent(self)
291
292     def _check_hotline(self, hotline_file):
293         if os.path.exists(hotline_file):
294             mtime = os.stat(hotline_file)[stat.ST_MTIME]
295             if mtime > time.time() - 20.0:
296                 return
297             else:
298                 self.log("hotline file too old, shutting down")
299         else:
300             self.log("hotline file missing, shutting down")
301         reactor.stop()
302
303     def get_all_peerids(self):
304         return self.introducer_client.get_all_peerids()
305     def get_nickname_for_peerid(self, peerid):
306         return self.introducer_client.get_nickname_for_peerid(peerid)
307
308     def get_permuted_peers(self, service_name, key):
309         """
310         @return: list of (peerid, connection,)
311         """
312         assert isinstance(service_name, str)
313         assert isinstance(key, str)
314         return self.introducer_client.get_permuted_peers(service_name, key)
315
316     def get_encoding_parameters(self):
317         return self.DEFAULT_ENCODING_PARAMETERS
318
319     def connected_to_introducer(self):
320         if self.introducer_client:
321             return self.introducer_client.connected_to_introducer()
322         return False
323
324     def get_renewal_secret(self):
325         return hashutil.my_renewal_secret_hash(self._lease_secret)
326
327     def get_cancel_secret(self):
328         return hashutil.my_cancel_secret_hash(self._lease_secret)
329
330     def debug_wait_for_client_connections(self, num_clients):
331         """Return a Deferred that fires (with None) when we have connections
332         to the given number of peers. Useful for tests that set up a
333         temporary test network and need to know when it is safe to proceed
334         with an upload or download."""
335         def _check():
336             current_clients = list(self.get_all_peerids())
337             return len(current_clients) >= num_clients
338         d = self.poll(_check, 0.5)
339         d.addCallback(lambda res: None)
340         return d
341
342
343     # these four methods are the primitives for creating filenodes and
344     # dirnodes. The first takes a URI and produces a filenode or (new-style)
345     # dirnode. The other three create brand-new filenodes/dirnodes.
346
347     def create_node_from_uri(self, u):
348         # this returns synchronously.
349         u = IURI(u)
350         u_s = u.to_string()
351         if u_s not in self._node_cache:
352             if IReadonlyNewDirectoryURI.providedBy(u):
353                 # new-style read-only dirnodes
354                 node = NewDirectoryNode(self).init_from_uri(u)
355             elif INewDirectoryURI.providedBy(u):
356                 # new-style dirnodes
357                 node = NewDirectoryNode(self).init_from_uri(u)
358             elif IFileURI.providedBy(u):
359                 if isinstance(u, LiteralFileURI):
360                     node = LiteralFileNode(u, self) # LIT
361                 else:
362                     key = base32.b2a(u.storage_index)
363                     cachefile = self.download_cache.get_file(key)
364                     node = FileNode(u, self, cachefile) # CHK
365             else:
366                 assert IMutableFileURI.providedBy(u), u
367                 node = MutableFileNode(self).init_from_uri(u)
368             self._node_cache[u_s] = node
369         return self._node_cache[u_s]
370
371     def notify_publish(self, publish_status, size):
372         self.getServiceNamed("mutable-watcher").notify_publish(publish_status,
373                                                                size)
374     def notify_retrieve(self, retrieve_status):
375         self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
376     def notify_mapupdate(self, update_status):
377         self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status)
378
379     def create_empty_dirnode(self):
380         n = NewDirectoryNode(self)
381         d = n.create(self._generate_pubprivkeys)
382         d.addCallback(lambda res: n)
383         return d
384
385     def create_mutable_file(self, contents=""):
386         n = MutableFileNode(self)
387         d = n.create(contents, self._generate_pubprivkeys)
388         d.addCallback(lambda res: n)
389         return d
390
391     def _generate_pubprivkeys(self, key_size):
392         if self._key_generator:
393             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
394             def make_key_objs((verifying_key, signing_key)):
395                 v = rsa.create_verifying_key_from_string(verifying_key)
396                 s = rsa.create_signing_key_from_string(signing_key)
397                 return v, s
398             d.addCallback(make_key_objs)
399             return d
400         else:
401             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
402             # secs
403             signer = rsa.generate(key_size)
404             verifier = signer.get_verifying_key()
405             return verifier, signer
406
407     def upload(self, uploadable):
408         uploader = self.getServiceNamed("uploader")
409         return uploader.upload(uploadable)
410
411
412     def list_all_upload_statuses(self):
413         uploader = self.getServiceNamed("uploader")
414         return uploader.list_all_upload_statuses()
415
416     def list_all_download_statuses(self):
417         downloader = self.getServiceNamed("downloader")
418         return downloader.list_all_download_statuses()
419
420     def list_all_mapupdate_statuses(self):
421         watcher = self.getServiceNamed("mutable-watcher")
422         return watcher.list_all_mapupdate_statuses()
423     def list_all_publish_statuses(self):
424         watcher = self.getServiceNamed("mutable-watcher")
425         return watcher.list_all_publish_statuses()
426     def list_all_retrieve_statuses(self):
427         watcher = self.getServiceNamed("mutable-watcher")
428         return watcher.list_all_retrieve_statuses()
429
430     def list_all_helper_statuses(self):
431         try:
432             helper = self.getServiceNamed("helper")
433         except KeyError:
434             return []
435         return helper.get_all_upload_statuses()
436