]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
ftp server: initial implementation. Still needs unit tests, custom Twisted patches...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1
2 import os, stat, time, re, weakref
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
5
6 from zope.interface import implements
7 from twisted.internet import reactor
8 from twisted.application.internet import TimerService
9 from foolscap import Referenceable
10 from foolscap.logging import log
11 from pycryptopp.publickey import rsa
12
13 import allmydata
14 from allmydata.storage import StorageServer
15 from allmydata.immutable.upload import Uploader
16 from allmydata.immutable.download import Downloader
17 from allmydata.immutable.filenode import FileNode, LiteralFileNode
18 from allmydata.offloaded import Helper
19 from allmydata.control import ControlServer
20 from allmydata.introducer.client import IntroducerClient
21 from allmydata.util import hashutil, base32, testutil
22 from allmydata.uri import LiteralFileURI
23 from allmydata.dirnode import NewDirectoryNode
24 from allmydata.mutable.node import MutableFileNode, MutableWatcher
25 from allmydata.stats import StatsProvider
26 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
27      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
28
29 KiB=1024
30 MiB=1024*KiB
31 GiB=1024*MiB
32 TiB=1024*GiB
33 PiB=1024*TiB
34
35 class StubClient(Referenceable):
36     implements(RIStubClient)
37
38 def _make_secret():
39     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
40
41 class Client(node.Node, testutil.PollMixin):
42     implements(IStatsProducer)
43
44     PORTNUMFILE = "client.port"
45     STOREDIR = 'storage'
46     NODETYPE = "client"
47     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
48
49     # This means that if a storage server treats me as though I were a
50     # 1.0.0 storage client, it will work as they expect.
51     OLDEST_SUPPORTED_VERSION = "1.0.0"
52
53     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
54     # is the number of shares required to reconstruct a file. 'desired' means
55     # that we will abort an upload unless we can allocate space for at least
56     # this many. 'total' is the total number of shares created by encoding.
57     # If everybody has room then this is is how many we will upload.
58     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
59                                    "happy": 7,
60                                    "n": 10,
61                                    "max_segment_size": 128*KiB,
62                                    }
63
64     def __init__(self, basedir="."):
65         node.Node.__init__(self, basedir)
66         self.started_timestamp = time.time()
67         self.logSource="Client"
68         self.init_introducer_client()
69         self.init_stats_provider()
70         self.init_lease_secret()
71         self.init_storage()
72         self.init_control()
73         if self.get_config("helper", "enabled", False, boolean=True):
74             self.init_helper()
75         self.init_client()
76         self._key_generator = None
77         key_gen_furl = self.get_config("client", "key_generator.furl", None)
78         if key_gen_furl:
79             self.init_key_gen(key_gen_furl)
80         # ControlServer and Helper are attached after Tub startup
81         self.init_ftp_server()
82
83         hotline_file = os.path.join(self.basedir,
84                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
85         if os.path.exists(hotline_file):
86             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
87             self.log("hotline file noticed (%ds old), starting timer" % age)
88             hotline = TimerService(1.0, self._check_hotline, hotline_file)
89             hotline.setServiceParent(self)
90
91         webport = self.get_config("node", "web.port", None)
92         if webport:
93             self.init_web(webport) # strports string
94
95     def read_old_config_files(self):
96         node.Node.read_old_config_files(self)
97         copy = self._copy_config_from_file
98         copy("introducer.furl", "client", "introducer.furl")
99         copy("helper.furl", "client", "helper.furl")
100         copy("key_generator.furl", "client", "key_generator.furl")
101         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
102         if os.path.exists(os.path.join(self.basedir, "no_storage")):
103             self.set_config("storage", "enabled", "false")
104         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
105             self.set_config("storage", "readonly", "true")
106         copy("sizelimit", "storage", "sizelimit")
107         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
108             self.set_config("storage", "debug_discard", "true")
109         if os.path.exists(os.path.join(self.basedir, "run_helper")):
110             self.set_config("helper", "enabled", "true")
111
112     def init_introducer_client(self):
113         self.introducer_furl = self.get_config("client", "introducer.furl")
114         ic = IntroducerClient(self.tub, self.introducer_furl,
115                               self.nickname,
116                               str(allmydata.__version__),
117                               str(self.OLDEST_SUPPORTED_VERSION))
118         self.introducer_client = ic
119         # hold off on starting the IntroducerClient until our tub has been
120         # started, so we'll have a useful address on our RemoteReference, so
121         # that the introducer's status page will show us.
122         d = self.when_tub_ready()
123         def _start_introducer_client(res):
124             ic.setServiceParent(self)
125             # nodes that want to upload and download will need storage servers
126             ic.subscribe_to("storage")
127         d.addCallback(_start_introducer_client)
128         d.addErrback(log.err, facility="tahoe.init",
129                      level=log.BAD, umid="URyI5w")
130
131     def init_stats_provider(self):
132         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
133         self.stats_provider = StatsProvider(self, gatherer_furl)
134         self.add_service(self.stats_provider)
135         self.stats_provider.register_producer(self)
136
137     def get_stats(self):
138         return { 'node.uptime': time.time() - self.started_timestamp }
139
140     def init_lease_secret(self):
141         secret_s = self.get_or_create_private_config("secret", _make_secret)
142         self._lease_secret = base32.a2b(secret_s)
143
144     def init_storage(self):
145         # should we run a storage server (and publish it for others to use)?
146         if not self.get_config("storage", "enabled", True, boolean=True):
147             return
148         readonly = self.get_config("storage", "readonly", False, boolean=True)
149
150         storedir = os.path.join(self.basedir, self.STOREDIR)
151
152         sizelimit = None
153         data = self.get_config("storage", "sizelimit", None)
154         if data:
155             m = re.match(r"^(\d+)([kKmMgG]?[bB]?)$", data)
156             if not m:
157                 log.msg("SIZELIMIT_FILE contains unparseable value %s" % data)
158             else:
159                 number, suffix = m.groups()
160                 suffix = suffix.upper()
161                 if suffix.endswith("B"):
162                     suffix = suffix[:-1]
163                 multiplier = {"": 1,
164                               "K": 1000,
165                               "M": 1000 * 1000,
166                               "G": 1000 * 1000 * 1000,
167                               }[suffix]
168                 sizelimit = int(number) * multiplier
169         discard = self.get_config("storage", "debug_discard", False,
170                                   boolean=True)
171         ss = StorageServer(storedir, sizelimit, discard, readonly,
172                            self.stats_provider)
173         self.add_service(ss)
174         d = self.when_tub_ready()
175         # we can't do registerReference until the Tub is ready
176         def _publish(res):
177             furl_file = os.path.join(self.basedir, "private", "storage.furl")
178             furl = self.tub.registerReference(ss, furlFile=furl_file)
179             ri_name = RIStorageServer.__remote_name__
180             self.introducer_client.publish(furl, "storage", ri_name)
181         d.addCallback(_publish)
182         d.addErrback(log.err, facility="tahoe.init",
183                      level=log.BAD, umid="aLGBKw")
184
185     def init_client(self):
186         helper_furl = self.get_config("client", "helper.furl", None)
187         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
188         self.convergence = base32.a2b(convergence_s)
189         self._node_cache = weakref.WeakValueDictionary() # uri -> node
190         self.add_service(Uploader(helper_furl, self.stats_provider))
191         self.add_service(Downloader(self.stats_provider))
192         self.add_service(MutableWatcher(self.stats_provider))
193         def _publish(res):
194             # we publish an empty object so that the introducer can count how
195             # many clients are connected and see what versions they're
196             # running.
197             sc = StubClient()
198             furl = self.tub.registerReference(sc)
199             ri_name = RIStubClient.__remote_name__
200             self.introducer_client.publish(furl, "stub_client", ri_name)
201         d = self.when_tub_ready()
202         d.addCallback(_publish)
203         d.addErrback(log.err, facility="tahoe.init",
204                      level=log.BAD, umid="OEHq3g")
205
206     def init_control(self):
207         d = self.when_tub_ready()
208         def _publish(res):
209             c = ControlServer()
210             c.setServiceParent(self)
211             control_url = self.tub.registerReference(c)
212             self.write_private_config("control.furl", control_url + "\n")
213         d.addCallback(_publish)
214         d.addErrback(log.err, facility="tahoe.init",
215                      level=log.BAD, umid="d3tNXA")
216
217     def init_helper(self):
218         d = self.when_tub_ready()
219         def _publish(self):
220             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
221             h.setServiceParent(self)
222             # TODO: this is confusing. BASEDIR/private/helper.furl is created
223             # by the helper. BASEDIR/helper.furl is consumed by the client
224             # who wants to use the helper. I like having the filename be the
225             # same, since that makes 'cp' work smoothly, but the difference
226             # between config inputs and generated outputs is hard to see.
227             helper_furlfile = os.path.join(self.basedir,
228                                            "private", "helper.furl")
229             self.tub.registerReference(h, furlFile=helper_furlfile)
230         d.addCallback(_publish)
231         d.addErrback(log.err, facility="tahoe.init",
232                      level=log.BAD, umid="K0mW5w")
233
234     def init_key_gen(self, key_gen_furl):
235         d = self.when_tub_ready()
236         def _subscribe(self):
237             self.tub.connectTo(key_gen_furl, self._got_key_generator)
238         d.addCallback(_subscribe)
239         d.addErrback(log.err, facility="tahoe.init",
240                      level=log.BAD, umid="z9DMzw")
241
242     def _got_key_generator(self, key_generator):
243         self._key_generator = key_generator
244         key_generator.notifyOnDisconnect(self._lost_key_generator)
245
246     def _lost_key_generator(self):
247         self._key_generator = None
248
249     def init_web(self, webport):
250         self.log("init_web(webport=%s)", args=(webport,))
251
252         from allmydata.webish import WebishServer
253         nodeurl_path = os.path.join(self.basedir, "node.url")
254         ws = WebishServer(webport, nodeurl_path)
255         self.add_service(ws)
256
257     def init_ftp_server(self):
258         if not self.get_config("ftpd", "enabled", False, boolean=True):
259             return
260         portstr = self.get_config("ftpd", "ftp.port", "8021")
261         accountfile = self.get_config("ftpd", "ftp.accounts.file", None)
262         accounturl = self.get_config("ftpd", "ftp.accounts.url", None)
263
264         from allmydata import ftpd
265         s = ftpd.FTPServer(self, portstr, accountfile, accounturl)
266         s.setServiceParent(self)
267
268     def _check_hotline(self, hotline_file):
269         if os.path.exists(hotline_file):
270             mtime = os.stat(hotline_file)[stat.ST_MTIME]
271             if mtime > time.time() - 20.0:
272                 return
273             else:
274                 self.log("hotline file too old, shutting down")
275         else:
276             self.log("hotline file missing, shutting down")
277         reactor.stop()
278
279     def get_all_peerids(self):
280         return self.introducer_client.get_all_peerids()
281     def get_nickname_for_peerid(self, peerid):
282         return self.introducer_client.get_nickname_for_peerid(peerid)
283
284     def get_permuted_peers(self, service_name, key):
285         """
286         @return: list of (peerid, connection,)
287         """
288         assert isinstance(service_name, str)
289         assert isinstance(key, str)
290         return self.introducer_client.get_permuted_peers(service_name, key)
291
292     def get_encoding_parameters(self):
293         return self.DEFAULT_ENCODING_PARAMETERS
294
295     def connected_to_introducer(self):
296         if self.introducer_client:
297             return self.introducer_client.connected_to_introducer()
298         return False
299
300     def get_renewal_secret(self):
301         return hashutil.my_renewal_secret_hash(self._lease_secret)
302
303     def get_cancel_secret(self):
304         return hashutil.my_cancel_secret_hash(self._lease_secret)
305
306     def debug_wait_for_client_connections(self, num_clients):
307         """Return a Deferred that fires (with None) when we have connections
308         to the given number of peers. Useful for tests that set up a
309         temporary test network and need to know when it is safe to proceed
310         with an upload or download."""
311         def _check():
312             current_clients = list(self.get_all_peerids())
313             return len(current_clients) >= num_clients
314         d = self.poll(_check, 0.5)
315         d.addCallback(lambda res: None)
316         return d
317
318
319     # these four methods are the primitives for creating filenodes and
320     # dirnodes. The first takes a URI and produces a filenode or (new-style)
321     # dirnode. The other three create brand-new filenodes/dirnodes.
322
323     def create_node_from_uri(self, u):
324         # this returns synchronously.
325         u = IURI(u)
326         u_s = u.to_string()
327         if u_s not in self._node_cache:
328             if IReadonlyNewDirectoryURI.providedBy(u):
329                 # new-style read-only dirnodes
330                 node = NewDirectoryNode(self).init_from_uri(u)
331             elif INewDirectoryURI.providedBy(u):
332                 # new-style dirnodes
333                 node = NewDirectoryNode(self).init_from_uri(u)
334             elif IFileURI.providedBy(u):
335                 if isinstance(u, LiteralFileURI):
336                     node = LiteralFileNode(u, self) # LIT
337                 else:
338                     node = FileNode(u, self) # CHK
339             else:
340                 assert IMutableFileURI.providedBy(u), u
341                 node = MutableFileNode(self).init_from_uri(u)
342             self._node_cache[u_s] = node
343         return self._node_cache[u_s]
344
345     def notify_publish(self, publish_status, size):
346         self.getServiceNamed("mutable-watcher").notify_publish(publish_status,
347                                                                size)
348     def notify_retrieve(self, retrieve_status):
349         self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
350     def notify_mapupdate(self, update_status):
351         self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status)
352
353     def create_empty_dirnode(self):
354         n = NewDirectoryNode(self)
355         d = n.create(self._generate_pubprivkeys)
356         d.addCallback(lambda res: n)
357         return d
358
359     def create_mutable_file(self, contents=""):
360         n = MutableFileNode(self)
361         d = n.create(contents, self._generate_pubprivkeys)
362         d.addCallback(lambda res: n)
363         return d
364
365     def _generate_pubprivkeys(self, key_size):
366         if self._key_generator:
367             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
368             def make_key_objs((verifying_key, signing_key)):
369                 v = rsa.create_verifying_key_from_string(verifying_key)
370                 s = rsa.create_signing_key_from_string(signing_key)
371                 return v, s
372             d.addCallback(make_key_objs)
373             return d
374         else:
375             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
376             # secs
377             signer = rsa.generate(key_size)
378             verifier = signer.get_verifying_key()
379             return verifier, signer
380
381     def upload(self, uploadable):
382         uploader = self.getServiceNamed("uploader")
383         return uploader.upload(uploadable)
384
385
386     def list_all_upload_statuses(self):
387         uploader = self.getServiceNamed("uploader")
388         return uploader.list_all_upload_statuses()
389
390     def list_all_download_statuses(self):
391         downloader = self.getServiceNamed("downloader")
392         return downloader.list_all_download_statuses()
393
394     def list_all_mapupdate_statuses(self):
395         watcher = self.getServiceNamed("mutable-watcher")
396         return watcher.list_all_mapupdate_statuses()
397     def list_all_publish_statuses(self):
398         watcher = self.getServiceNamed("mutable-watcher")
399         return watcher.list_all_publish_statuses()
400     def list_all_retrieve_statuses(self):
401         watcher = self.getServiceNamed("mutable-watcher")
402         return watcher.list_all_retrieve_statuses()
403
404     def list_all_helper_statuses(self):
405         try:
406             helper = self.getServiceNamed("helper")
407         except KeyError:
408             return []
409         return helper.get_all_upload_statuses()
410