]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
use added secret to protect convergent encryption
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1
2 import os, stat, time, re
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
5
6 from zope.interface import implements
7 from twisted.internet import reactor
8 from twisted.application.internet import TimerService
9 from foolscap import Referenceable
10 from foolscap.logging import log
11
12 import allmydata
13 from allmydata.storage import StorageServer
14 from allmydata.upload import Uploader
15 from allmydata.download import Downloader
16 from allmydata.checker import Checker
17 from allmydata.offloaded import Helper
18 from allmydata.control import ControlServer
19 from allmydata.introducer import IntroducerClient
20 from allmydata.util import hashutil, base32, testutil
21 from allmydata.filenode import FileNode
22 from allmydata.dirnode import NewDirectoryNode
23 from allmydata.mutable import MutableFileNode, MutableWatcher
24 from allmydata.stats import StatsProvider
25 from allmydata.interfaces import IURI, INewDirectoryURI, \
26      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
27
28 KiB=1024
29 MiB=1024*KiB
30 GiB=1024*MiB
31 TiB=1024*GiB
32 PiB=1024*TiB
33
34 class StubClient(Referenceable):
35     implements(RIStubClient)
36
37 def _make_secret():
38     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
39
40 class Client(node.Node, testutil.PollMixin):
41     PORTNUMFILE = "client.port"
42     STOREDIR = 'storage'
43     NODETYPE = "client"
44     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
45
46     # we're pretty narrow-minded right now
47     OLDEST_SUPPORTED_VERSION = allmydata.__version__
48
49     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
50     # is the number of shares required to reconstruct a file. 'desired' means
51     # that we will abort an upload unless we can allocate space for at least
52     # this many. 'total' is the total number of shares created by encoding.
53     # If everybody has room then this is is how many we will upload.
54     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
55                                    "happy": 7,
56                                    "n": 10,
57                                    "max_segment_size": 128*KiB,
58                                    }
59
60     def __init__(self, basedir="."):
61         node.Node.__init__(self, basedir)
62         self.logSource="Client"
63         self.nickname = self.get_config("nickname")
64         if self.nickname is None:
65             self.nickname = "<unspecified>"
66         self.init_introducer_client()
67         self.init_stats_provider()
68         self.init_lease_secret()
69         self.init_storage()
70         self.init_control()
71         run_helper = self.get_config("run_helper")
72         if run_helper:
73             self.init_helper()
74         self.init_client()
75         # ControlServer and Helper are attached after Tub startup
76
77         hotline_file = os.path.join(self.basedir,
78                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
79         if os.path.exists(hotline_file):
80             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
81             self.log("hotline file noticed (%ds old), starting timer" % age)
82             hotline = TimerService(1.0, self._check_hotline, hotline_file)
83             hotline.setServiceParent(self)
84
85         webport = self.get_config("webport")
86         if webport:
87             self.init_web(webport) # strports string
88
89     def init_introducer_client(self):
90         self.introducer_furl = self.get_config("introducer.furl", required=True)
91         ic = IntroducerClient(self.tub, self.introducer_furl,
92                               self.nickname,
93                               str(allmydata.__version__),
94                               str(self.OLDEST_SUPPORTED_VERSION))
95         self.introducer_client = ic
96         ic.setServiceParent(self)
97         # nodes that want to upload and download will need storage servers
98         ic.subscribe_to("storage")
99
100     def init_stats_provider(self):
101         gatherer_furl = self.get_config('stats_gatherer.furl')
102         if gatherer_furl:
103             self.stats_provider = StatsProvider(self, gatherer_furl)
104             self.add_service(self.stats_provider)
105         else:
106             self.stats_provider = None
107
108     def init_lease_secret(self):
109         secret_s = self.get_or_create_private_config("secret", _make_secret)
110         self._lease_secret = base32.a2b(secret_s)
111
112     def init_storage(self):
113         # should we run a storage server (and publish it for others to use)?
114         provide_storage = (self.get_config("no_storage") is None)
115         if not provide_storage:
116             return
117         readonly_storage = (self.get_config("readonly_storage") is not None)
118
119         storedir = os.path.join(self.basedir, self.STOREDIR)
120         sizelimit = None
121
122         data = self.get_config("sizelimit")
123         if data:
124             m = re.match(r"^(\d+)([kKmMgG]?[bB]?)$", data)
125             if not m:
126                 log.msg("SIZELIMIT_FILE contains unparseable value %s" % data)
127             else:
128                 number, suffix = m.groups()
129                 suffix = suffix.upper()
130                 if suffix.endswith("B"):
131                     suffix = suffix[:-1]
132                 multiplier = {"": 1,
133                               "K": 1000,
134                               "M": 1000 * 1000,
135                               "G": 1000 * 1000 * 1000,
136                               }[suffix]
137                 sizelimit = int(number) * multiplier
138         discard_storage = self.get_config("debug_discard_storage") is not None
139         ss = StorageServer(storedir, sizelimit,
140                            discard_storage, readonly_storage,
141                            self.stats_provider)
142         self.add_service(ss)
143         d = self.when_tub_ready()
144         # we can't do registerReference until the Tub is ready
145         def _publish(res):
146             furl_file = os.path.join(self.basedir, "private", "storage.furl")
147             furl = self.tub.registerReference(ss, furlFile=furl_file)
148             ri_name = RIStorageServer.__remote_name__
149             self.introducer_client.publish(furl, "storage", ri_name)
150         d.addCallback(_publish)
151         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
152
153     def init_client(self):
154         helper_furl = self.get_config("helper.furl")
155         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
156         self.convergence = base32.a2b(convergence_s)
157         self.add_service(Uploader(helper_furl))
158         self.add_service(Downloader())
159         self.add_service(Checker())
160         self.add_service(MutableWatcher())
161         def _publish(res):
162             # we publish an empty object so that the introducer can count how
163             # many clients are connected and see what versions they're
164             # running.
165             sc = StubClient()
166             furl = self.tub.registerReference(sc)
167             ri_name = RIStubClient.__remote_name__
168             self.introducer_client.publish(furl, "stub_client", ri_name)
169         d = self.when_tub_ready()
170         d.addCallback(_publish)
171         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
172
173     def init_control(self):
174         d = self.when_tub_ready()
175         def _publish(res):
176             c = ControlServer()
177             c.setServiceParent(self)
178             control_url = self.tub.registerReference(c)
179             self.write_private_config("control.furl", control_url + "\n")
180         d.addCallback(_publish)
181         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
182
183     def init_helper(self):
184         d = self.when_tub_ready()
185         def _publish(self):
186             h = Helper(os.path.join(self.basedir, "helper"))
187             h.setServiceParent(self)
188             # TODO: this is confusing. BASEDIR/private/helper.furl is created
189             # by the helper. BASEDIR/helper.furl is consumed by the client
190             # who wants to use the helper. I like having the filename be the
191             # same, since that makes 'cp' work smoothly, but the difference
192             # between config inputs and generated outputs is hard to see.
193             helper_furlfile = os.path.join(self.basedir,
194                                            "private", "helper.furl")
195             self.tub.registerReference(h, furlFile=helper_furlfile)
196         d.addCallback(_publish)
197         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
198
199     def init_web(self, webport):
200         self.log("init_web(webport=%s)", args=(webport,))
201
202         from allmydata.webish import WebishServer
203         nodeurl_path = os.path.join(self.basedir, "node.url")
204         ws = WebishServer(webport, nodeurl_path)
205         if self.get_config("webport_allow_localfile") is not None:
206             ws.allow_local_access(True)
207         self.add_service(ws)
208
209     def _check_hotline(self, hotline_file):
210         if os.path.exists(hotline_file):
211             mtime = os.stat(hotline_file)[stat.ST_MTIME]
212             if mtime > time.time() - 20.0:
213                 return
214             else:
215                 self.log("hotline file too old, shutting down")
216         else:
217             self.log("hotline file missing, shutting down")
218         reactor.stop()
219
220     def get_all_peerids(self):
221         return self.introducer_client.get_all_peerids()
222
223     def get_permuted_peers(self, service_name, key):
224         """
225         @return: list of (peerid, connection,)
226         """
227         assert isinstance(service_name, str)
228         assert isinstance(key, str)
229         return self.introducer_client.get_permuted_peers(service_name, key)
230
231     def get_encoding_parameters(self):
232         return self.DEFAULT_ENCODING_PARAMETERS
233
234     def connected_to_introducer(self):
235         if self.introducer_client:
236             return self.introducer_client.connected_to_introducer()
237         return False
238
239     def get_renewal_secret(self):
240         return hashutil.my_renewal_secret_hash(self._lease_secret)
241
242     def get_cancel_secret(self):
243         return hashutil.my_cancel_secret_hash(self._lease_secret)
244
245     def debug_wait_for_client_connections(self, num_clients):
246         """Return a Deferred that fires (with None) when we have connections
247         to the given number of peers. Useful for tests that set up a
248         temporary test network and need to know when it is safe to proceed
249         with an upload or download."""
250         def _check():
251             current_clients = list(self.get_all_peerids())
252             return len(current_clients) >= num_clients
253         d = self.poll(_check, 0.5)
254         d.addCallback(lambda res: None)
255         return d
256
257
258     # these four methods are the primitives for creating filenodes and
259     # dirnodes. The first takes a URI and produces a filenode or (new-style)
260     # dirnode. The other three create brand-new filenodes/dirnodes.
261
262     def create_node_from_uri(self, u):
263         # this returns synchronously.
264         u = IURI(u)
265         if IReadonlyNewDirectoryURI.providedBy(u):
266             # new-style read-only dirnodes
267             return NewDirectoryNode(self).init_from_uri(u)
268         if INewDirectoryURI.providedBy(u):
269             # new-style dirnodes
270             return NewDirectoryNode(self).init_from_uri(u)
271         if IFileURI.providedBy(u):
272             # CHK
273             return FileNode(u, self)
274         assert IMutableFileURI.providedBy(u), u
275         return MutableFileNode(self).init_from_uri(u)
276
277     def notify_publish(self, p):
278         self.getServiceNamed("mutable-watcher").notify_publish(p)
279     def notify_retrieve(self, r):
280         self.getServiceNamed("mutable-watcher").notify_retrieve(r)
281
282     def create_empty_dirnode(self):
283         n = NewDirectoryNode(self)
284         d = n.create()
285         d.addCallback(lambda res: n)
286         return d
287
288     def create_mutable_file(self, contents=""):
289         n = MutableFileNode(self)
290         d = n.create(contents)
291         d.addCallback(lambda res: n)
292         return d
293
294     def upload(self, uploadable):
295         uploader = self.getServiceNamed("uploader")
296         return uploader.upload(uploadable)
297
298
299     def list_all_uploads(self):
300         uploader = self.getServiceNamed("uploader")
301         return uploader.list_all_uploads()
302     def list_active_uploads(self):
303         uploader = self.getServiceNamed("uploader")
304         return uploader.list_active_uploads()
305     def list_recent_uploads(self):
306         uploader = self.getServiceNamed("uploader")
307         return uploader.list_recent_uploads()
308
309     def list_all_downloads(self):
310         downloader = self.getServiceNamed("downloader")
311         return downloader.list_all_downloads()
312     def list_active_downloads(self):
313         downloader = self.getServiceNamed("downloader")
314         return downloader.list_active_downloads()
315     def list_recent_downloads(self):
316         downloader = self.getServiceNamed("downloader")
317         return downloader.list_recent_downloads()
318
319     def list_all_publish(self):
320         watcher = self.getServiceNamed("mutable-watcher")
321         return watcher.list_all_publish()
322     def list_active_publish(self):
323         watcher = self.getServiceNamed("mutable-watcher")
324         return watcher.list_active_publish()
325     def list_recent_publish(self):
326         watcher = self.getServiceNamed("mutable-watcher")
327         return watcher.list_recent_publish()
328
329     def list_all_retrieve(self):
330         watcher = self.getServiceNamed("mutable-watcher")
331         return watcher.list_all_retrieve()
332     def list_active_retrieve(self):
333         watcher = self.getServiceNamed("mutable-watcher")
334         return watcher.list_active_retrieve()
335     def list_recent_retrieve(self):
336         watcher = self.getServiceNamed("mutable-watcher")
337         return watcher.list_recent_retrieve()