]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
break storage.py into smaller pieces in storage/*.py . No behavioral changes.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
4
5 from zope.interface import implements
6 from twisted.internet import reactor
7 from twisted.application.internet import TimerService
8 from foolscap import Referenceable
9 from foolscap.logging import log
10 from pycryptopp.publickey import rsa
11
12 import allmydata
13 from allmydata.storage.server import StorageServer
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.download import Downloader
16 from allmydata.immutable.filenode import FileNode, LiteralFileNode
17 from allmydata.immutable.offloaded import Helper
18 from allmydata.control import ControlServer
19 from allmydata.introducer.client import IntroducerClient
20 from allmydata.util import hashutil, base32, pollmixin, cachedir
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.uri import LiteralFileURI
23 from allmydata.dirnode import NewDirectoryNode
24 from allmydata.mutable.filenode import MutableFileNode
25 from allmydata.stats import StatsProvider
26 from allmydata.history import History
27 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
28      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
29
30 KiB=1024
31 MiB=1024*KiB
32 GiB=1024*MiB
33 TiB=1024*GiB
34 PiB=1024*TiB
35
36 class StubClient(Referenceable):
37     implements(RIStubClient)
38
39 def _make_secret():
40     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
41
42 class Client(node.Node, pollmixin.PollMixin):
43     implements(IStatsProducer)
44
45     PORTNUMFILE = "client.port"
46     STOREDIR = 'storage'
47     NODETYPE = "client"
48     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
49
50     # This means that if a storage server treats me as though I were a
51     # 1.0.0 storage client, it will work as they expect.
52     OLDEST_SUPPORTED_VERSION = "1.0.0"
53
54     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
55     # is the number of shares required to reconstruct a file. 'desired' means
56     # that we will abort an upload unless we can allocate space for at least
57     # this many. 'total' is the total number of shares created by encoding.
58     # If everybody has room then this is is how many we will upload.
59     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
60                                    "happy": 7,
61                                    "n": 10,
62                                    "max_segment_size": 128*KiB,
63                                    }
64
65     def __init__(self, basedir="."):
66         node.Node.__init__(self, basedir)
67         self.started_timestamp = time.time()
68         self.logSource="Client"
69         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
70         self.init_introducer_client()
71         self.init_stats_provider()
72         self.init_lease_secret()
73         self.init_storage()
74         self.init_control()
75         if self.get_config("helper", "enabled", False, boolean=True):
76             self.init_helper()
77         self.init_client()
78         self._key_generator = None
79         key_gen_furl = self.get_config("client", "key_generator.furl", None)
80         if key_gen_furl:
81             self.init_key_gen(key_gen_furl)
82         # ControlServer and Helper are attached after Tub startup
83         self.init_ftp_server()
84         self.init_sftp_server()
85
86         hotline_file = os.path.join(self.basedir,
87                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
88         if os.path.exists(hotline_file):
89             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
90             self.log("hotline file noticed (%ds old), starting timer" % age)
91             hotline = TimerService(1.0, self._check_hotline, hotline_file)
92             hotline.setServiceParent(self)
93
94         webport = self.get_config("node", "web.port", None)
95         if webport:
96             self.init_web(webport) # strports string
97
98     def read_old_config_files(self):
99         node.Node.read_old_config_files(self)
100         copy = self._copy_config_from_file
101         copy("introducer.furl", "client", "introducer.furl")
102         copy("helper.furl", "client", "helper.furl")
103         copy("key_generator.furl", "client", "key_generator.furl")
104         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
105         if os.path.exists(os.path.join(self.basedir, "no_storage")):
106             self.set_config("storage", "enabled", "false")
107         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
108             self.set_config("storage", "readonly", "true")
109         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
110             self.set_config("storage", "debug_discard", "true")
111         if os.path.exists(os.path.join(self.basedir, "run_helper")):
112             self.set_config("helper", "enabled", "true")
113
114     def init_introducer_client(self):
115         self.introducer_furl = self.get_config("client", "introducer.furl")
116         ic = IntroducerClient(self.tub, self.introducer_furl,
117                               self.nickname,
118                               str(allmydata.__full_version__),
119                               str(self.OLDEST_SUPPORTED_VERSION))
120         self.introducer_client = ic
121         # hold off on starting the IntroducerClient until our tub has been
122         # started, so we'll have a useful address on our RemoteReference, so
123         # that the introducer's status page will show us.
124         d = self.when_tub_ready()
125         def _start_introducer_client(res):
126             ic.setServiceParent(self)
127             # nodes that want to upload and download will need storage servers
128             ic.subscribe_to("storage")
129         d.addCallback(_start_introducer_client)
130         d.addErrback(log.err, facility="tahoe.init",
131                      level=log.BAD, umid="URyI5w")
132
133     def init_stats_provider(self):
134         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
135         self.stats_provider = StatsProvider(self, gatherer_furl)
136         self.add_service(self.stats_provider)
137         self.stats_provider.register_producer(self)
138
139     def get_stats(self):
140         return { 'node.uptime': time.time() - self.started_timestamp }
141
142     def init_lease_secret(self):
143         secret_s = self.get_or_create_private_config("secret", _make_secret)
144         self._lease_secret = base32.a2b(secret_s)
145
146     def init_storage(self):
147         # should we run a storage server (and publish it for others to use)?
148         if not self.get_config("storage", "enabled", True, boolean=True):
149             return
150         readonly = self.get_config("storage", "readonly", False, boolean=True)
151
152         storedir = os.path.join(self.basedir, self.STOREDIR)
153
154         data = self.get_config("storage", "reserved_space", None)
155         reserved = None
156         try:
157             reserved = parse_abbreviated_size(data)
158         except ValueError:
159             log.msg("[storage]reserved_space= contains unparseable value %s"
160                     % data)
161         if reserved is None:
162             reserved = 0
163         discard = self.get_config("storage", "debug_discard", False,
164                                   boolean=True)
165         ss = StorageServer(storedir,
166                            reserved_space=reserved,
167                            discard_storage=discard,
168                            readonly_storage=readonly,
169                            stats_provider=self.stats_provider)
170         self.add_service(ss)
171         d = self.when_tub_ready()
172         # we can't do registerReference until the Tub is ready
173         def _publish(res):
174             furl_file = os.path.join(self.basedir, "private", "storage.furl")
175             furl = self.tub.registerReference(ss, furlFile=furl_file)
176             ri_name = RIStorageServer.__remote_name__
177             self.introducer_client.publish(furl, "storage", ri_name)
178         d.addCallback(_publish)
179         d.addErrback(log.err, facility="tahoe.init",
180                      level=log.BAD, umid="aLGBKw")
181
182     def init_client(self):
183         helper_furl = self.get_config("client", "helper.furl", None)
184         DEP = self.DEFAULT_ENCODING_PARAMETERS
185         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
186         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
187         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
188         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
189         self.convergence = base32.a2b(convergence_s)
190         self._node_cache = weakref.WeakValueDictionary() # uri -> node
191         self.add_service(History(self.stats_provider))
192         self.add_service(Uploader(helper_furl, self.stats_provider))
193         download_cachedir = os.path.join(self.basedir,
194                                          "private", "cache", "download")
195         self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
196         self.download_cache.setServiceParent(self)
197         self.add_service(Downloader(self.stats_provider))
198         self.init_stub_client()
199
200     def init_stub_client(self):
201         def _publish(res):
202             # we publish an empty object so that the introducer can count how
203             # many clients are connected and see what versions they're
204             # running.
205             sc = StubClient()
206             furl = self.tub.registerReference(sc)
207             ri_name = RIStubClient.__remote_name__
208             self.introducer_client.publish(furl, "stub_client", ri_name)
209         d = self.when_tub_ready()
210         d.addCallback(_publish)
211         d.addErrback(log.err, facility="tahoe.init",
212                      level=log.BAD, umid="OEHq3g")
213
214     def get_history(self):
215         return self.getServiceNamed("history")
216
217     def init_control(self):
218         d = self.when_tub_ready()
219         def _publish(res):
220             c = ControlServer()
221             c.setServiceParent(self)
222             control_url = self.tub.registerReference(c)
223             self.write_private_config("control.furl", control_url + "\n")
224         d.addCallback(_publish)
225         d.addErrback(log.err, facility="tahoe.init",
226                      level=log.BAD, umid="d3tNXA")
227
228     def init_helper(self):
229         d = self.when_tub_ready()
230         def _publish(self):
231             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
232             h.setServiceParent(self)
233             # TODO: this is confusing. BASEDIR/private/helper.furl is created
234             # by the helper. BASEDIR/helper.furl is consumed by the client
235             # who wants to use the helper. I like having the filename be the
236             # same, since that makes 'cp' work smoothly, but the difference
237             # between config inputs and generated outputs is hard to see.
238             helper_furlfile = os.path.join(self.basedir,
239                                            "private", "helper.furl")
240             self.tub.registerReference(h, furlFile=helper_furlfile)
241         d.addCallback(_publish)
242         d.addErrback(log.err, facility="tahoe.init",
243                      level=log.BAD, umid="K0mW5w")
244
245     def init_key_gen(self, key_gen_furl):
246         d = self.when_tub_ready()
247         def _subscribe(self):
248             self.tub.connectTo(key_gen_furl, self._got_key_generator)
249         d.addCallback(_subscribe)
250         d.addErrback(log.err, facility="tahoe.init",
251                      level=log.BAD, umid="z9DMzw")
252
253     def _got_key_generator(self, key_generator):
254         self._key_generator = key_generator
255         key_generator.notifyOnDisconnect(self._lost_key_generator)
256
257     def _lost_key_generator(self):
258         self._key_generator = None
259
260     def get_servers(self, service_name):
261         """ Return frozenset of (peerid, versioned-rref) """
262         assert isinstance(service_name, str)
263         return self.introducer_client.get_peers(service_name)
264
265     def init_web(self, webport):
266         self.log("init_web(webport=%s)", args=(webport,))
267
268         from allmydata.webish import WebishServer
269         nodeurl_path = os.path.join(self.basedir, "node.url")
270         staticdir = self.get_config("node", "web.static", "public_html")
271         staticdir = os.path.expanduser(staticdir)
272         ws = WebishServer(webport, nodeurl_path, staticdir)
273         self.add_service(ws)
274
275     def init_ftp_server(self):
276         if self.get_config("ftpd", "enabled", False, boolean=True):
277             accountfile = self.get_config("ftpd", "accounts.file", None)
278             accounturl = self.get_config("ftpd", "accounts.url", None)
279             ftp_portstr = self.get_config("ftpd", "port", "8021")
280
281             from allmydata.frontends import ftpd
282             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
283             s.setServiceParent(self)
284
285     def init_sftp_server(self):
286         if self.get_config("sftpd", "enabled", False, boolean=True):
287             accountfile = self.get_config("sftpd", "accounts.file", None)
288             accounturl = self.get_config("sftpd", "accounts.url", None)
289             sftp_portstr = self.get_config("sftpd", "port", "8022")
290             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
291             privkey_file = self.get_config("sftpd", "host_privkey_file")
292
293             from allmydata.frontends import sftpd
294             s = sftpd.SFTPServer(self, accountfile, accounturl,
295                                  sftp_portstr, pubkey_file, privkey_file)
296             s.setServiceParent(self)
297
298     def _check_hotline(self, hotline_file):
299         if os.path.exists(hotline_file):
300             mtime = os.stat(hotline_file)[stat.ST_MTIME]
301             if mtime > time.time() - 60.0:
302                 return
303             else:
304                 self.log("hotline file too old, shutting down")
305         else:
306             self.log("hotline file missing, shutting down")
307         reactor.stop()
308
309     def get_all_peerids(self):
310         return self.introducer_client.get_all_peerids()
311     def get_nickname_for_peerid(self, peerid):
312         return self.introducer_client.get_nickname_for_peerid(peerid)
313
314     def get_permuted_peers(self, service_name, key):
315         """
316         @return: list of (peerid, connection,)
317         """
318         assert isinstance(service_name, str)
319         assert isinstance(key, str)
320         return self.introducer_client.get_permuted_peers(service_name, key)
321
322     def get_encoding_parameters(self):
323         return self.DEFAULT_ENCODING_PARAMETERS
324
325     def connected_to_introducer(self):
326         if self.introducer_client:
327             return self.introducer_client.connected_to_introducer()
328         return False
329
330     def get_renewal_secret(self):
331         return hashutil.my_renewal_secret_hash(self._lease_secret)
332
333     def get_cancel_secret(self):
334         return hashutil.my_cancel_secret_hash(self._lease_secret)
335
336     def debug_wait_for_client_connections(self, num_clients):
337         """Return a Deferred that fires (with None) when we have connections
338         to the given number of peers. Useful for tests that set up a
339         temporary test network and need to know when it is safe to proceed
340         with an upload or download."""
341         def _check():
342             current_clients = list(self.get_all_peerids())
343             return len(current_clients) >= num_clients
344         d = self.poll(_check, 0.5)
345         d.addCallback(lambda res: None)
346         return d
347
348
349     # these four methods are the primitives for creating filenodes and
350     # dirnodes. The first takes a URI and produces a filenode or (new-style)
351     # dirnode. The other three create brand-new filenodes/dirnodes.
352
353     def create_node_from_uri(self, u):
354         # this returns synchronously.
355         u = IURI(u)
356         u_s = u.to_string()
357         if u_s not in self._node_cache:
358             if IReadonlyNewDirectoryURI.providedBy(u):
359                 # new-style read-only dirnodes
360                 node = NewDirectoryNode(self).init_from_uri(u)
361             elif INewDirectoryURI.providedBy(u):
362                 # new-style dirnodes
363                 node = NewDirectoryNode(self).init_from_uri(u)
364             elif IFileURI.providedBy(u):
365                 if isinstance(u, LiteralFileURI):
366                     node = LiteralFileNode(u, self) # LIT
367                 else:
368                     key = base32.b2a(u.storage_index)
369                     cachefile = self.download_cache.get_file(key)
370                     node = FileNode(u, self, cachefile) # CHK
371             else:
372                 assert IMutableFileURI.providedBy(u), u
373                 node = MutableFileNode(self).init_from_uri(u)
374             self._node_cache[u_s] = node
375         return self._node_cache[u_s]
376
377     def create_empty_dirnode(self):
378         n = NewDirectoryNode(self)
379         d = n.create(self._generate_pubprivkeys)
380         d.addCallback(lambda res: n)
381         return d
382
383     def create_mutable_file(self, contents=""):
384         n = MutableFileNode(self)
385         d = n.create(contents, self._generate_pubprivkeys)
386         d.addCallback(lambda res: n)
387         return d
388
389     def _generate_pubprivkeys(self, key_size):
390         if self._key_generator:
391             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
392             def make_key_objs((verifying_key, signing_key)):
393                 v = rsa.create_verifying_key_from_string(verifying_key)
394                 s = rsa.create_signing_key_from_string(signing_key)
395                 return v, s
396             d.addCallback(make_key_objs)
397             return d
398         else:
399             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
400             # secs
401             signer = rsa.generate(key_size)
402             verifier = signer.get_verifying_key()
403             return verifier, signer
404
405     def upload(self, uploadable):
406         uploader = self.getServiceNamed("uploader")
407         return uploader.upload(uploadable, history=self.get_history())
408
409
410     def list_all_upload_statuses(self):
411         return self.get_history().list_all_upload_statuses()
412
413     def list_all_download_statuses(self):
414         return self.get_history().list_all_download_statuses()
415
416     def list_all_mapupdate_statuses(self):
417         return self.get_history().list_all_mapupdate_statuses()
418     def list_all_publish_statuses(self):
419         return self.get_history().list_all_publish_statuses()
420     def list_all_retrieve_statuses(self):
421         return self.get_history().list_all_retrieve_statuses()
422
423     def list_all_helper_statuses(self):
424         try:
425             helper = self.getServiceNamed("helper")
426         except KeyError:
427             return []
428         return helper.get_all_upload_statuses()