]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
mutable stats: track mutable bytes published too
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1
2 import os, stat, time, re
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
5
6 from zope.interface import implements
7 from twisted.internet import reactor
8 from twisted.application.internet import TimerService
9 from foolscap import Referenceable
10 from foolscap.logging import log
11 from pycryptopp.publickey import rsa
12
13 import allmydata
14 from allmydata.storage import StorageServer
15 from allmydata.upload import Uploader
16 from allmydata.download import Downloader
17 from allmydata.checker import Checker
18 from allmydata.offloaded import Helper
19 from allmydata.control import ControlServer
20 from allmydata.introducer import IntroducerClient
21 from allmydata.util import hashutil, base32, testutil
22 from allmydata.filenode import FileNode
23 from allmydata.dirnode import NewDirectoryNode
24 from allmydata.mutable.node import MutableFileNode, MutableWatcher
25 from allmydata.stats import StatsProvider
26 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
27      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
28
29 KiB=1024
30 MiB=1024*KiB
31 GiB=1024*MiB
32 TiB=1024*GiB
33 PiB=1024*TiB
34
35 class StubClient(Referenceable):
36     implements(RIStubClient)
37
38 def _make_secret():
39     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
40
41 class Client(node.Node, testutil.PollMixin):
42     implements(IStatsProducer)
43
44     PORTNUMFILE = "client.port"
45     STOREDIR = 'storage'
46     NODETYPE = "client"
47     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
48
49     # we're pretty narrow-minded right now
50     OLDEST_SUPPORTED_VERSION = allmydata.__version__
51
52     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
53     # is the number of shares required to reconstruct a file. 'desired' means
54     # that we will abort an upload unless we can allocate space for at least
55     # this many. 'total' is the total number of shares created by encoding.
56     # If everybody has room then this is is how many we will upload.
57     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
58                                    "happy": 7,
59                                    "n": 10,
60                                    "max_segment_size": 128*KiB,
61                                    }
62
63     def __init__(self, basedir="."):
64         node.Node.__init__(self, basedir)
65         self.started_timestamp = time.time()
66         self.logSource="Client"
67         self.nickname = self.get_config("nickname")
68         if self.nickname is None:
69             self.nickname = "<unspecified>"
70         self.init_introducer_client()
71         self.init_stats_provider()
72         self.init_lease_secret()
73         self.init_storage()
74         self.init_control()
75         run_helper = self.get_config("run_helper")
76         if run_helper:
77             self.init_helper()
78         self.init_client()
79         self._key_generator = None
80         key_gen_furl = self.get_config('key_generator.furl')
81         if key_gen_furl:
82             self.init_key_gen(key_gen_furl)
83         # ControlServer and Helper are attached after Tub startup
84
85         hotline_file = os.path.join(self.basedir,
86                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
87         if os.path.exists(hotline_file):
88             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
89             self.log("hotline file noticed (%ds old), starting timer" % age)
90             hotline = TimerService(1.0, self._check_hotline, hotline_file)
91             hotline.setServiceParent(self)
92
93         webport = self.get_config("webport")
94         if webport:
95             self.init_web(webport) # strports string
96
97     def init_introducer_client(self):
98         self.introducer_furl = self.get_config("introducer.furl", required=True)
99         ic = IntroducerClient(self.tub, self.introducer_furl,
100                               self.nickname,
101                               str(allmydata.__version__),
102                               str(self.OLDEST_SUPPORTED_VERSION))
103         self.introducer_client = ic
104         # hold off on starting the IntroducerClient until our tub has been
105         # started, so we'll have a useful address on our RemoteReference, so
106         # that the introducer's status page will show us.
107         d = self.when_tub_ready()
108         def _start_introducer_client(res):
109             ic.setServiceParent(self)
110             # nodes that want to upload and download will need storage servers
111             ic.subscribe_to("storage")
112         d.addCallback(_start_introducer_client)
113         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
114
115     def init_stats_provider(self):
116         gatherer_furl = self.get_config('stats_gatherer.furl')
117         if gatherer_furl:
118             self.stats_provider = StatsProvider(self, gatherer_furl)
119             self.add_service(self.stats_provider)
120             self.stats_provider.register_producer(self)
121         else:
122             self.stats_provider = None
123
124     def get_stats(self):
125         return { 'node.uptime': time.time() - self.started_timestamp }
126
127     def init_lease_secret(self):
128         secret_s = self.get_or_create_private_config("secret", _make_secret)
129         self._lease_secret = base32.a2b(secret_s)
130
131     def init_storage(self):
132         # should we run a storage server (and publish it for others to use)?
133         provide_storage = (self.get_config("no_storage") is None)
134         if not provide_storage:
135             return
136         readonly_storage = (self.get_config("readonly_storage") is not None)
137
138         storedir = os.path.join(self.basedir, self.STOREDIR)
139         sizelimit = None
140
141         data = self.get_config("sizelimit")
142         if data:
143             m = re.match(r"^(\d+)([kKmMgG]?[bB]?)$", data)
144             if not m:
145                 log.msg("SIZELIMIT_FILE contains unparseable value %s" % data)
146             else:
147                 number, suffix = m.groups()
148                 suffix = suffix.upper()
149                 if suffix.endswith("B"):
150                     suffix = suffix[:-1]
151                 multiplier = {"": 1,
152                               "K": 1000,
153                               "M": 1000 * 1000,
154                               "G": 1000 * 1000 * 1000,
155                               }[suffix]
156                 sizelimit = int(number) * multiplier
157         discard_storage = self.get_config("debug_discard_storage") is not None
158         ss = StorageServer(storedir, sizelimit,
159                            discard_storage, readonly_storage,
160                            self.stats_provider)
161         self.add_service(ss)
162         d = self.when_tub_ready()
163         # we can't do registerReference until the Tub is ready
164         def _publish(res):
165             furl_file = os.path.join(self.basedir, "private", "storage.furl")
166             furl = self.tub.registerReference(ss, furlFile=furl_file)
167             ri_name = RIStorageServer.__remote_name__
168             self.introducer_client.publish(furl, "storage", ri_name)
169         d.addCallback(_publish)
170         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
171
172     def init_client(self):
173         helper_furl = self.get_config("helper.furl")
174         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
175         self.convergence = base32.a2b(convergence_s)
176         self.add_service(Uploader(helper_furl, self.stats_provider))
177         self.add_service(Downloader(self.stats_provider))
178         self.add_service(Checker())
179         self.add_service(MutableWatcher(self.stats_provider))
180         def _publish(res):
181             # we publish an empty object so that the introducer can count how
182             # many clients are connected and see what versions they're
183             # running.
184             sc = StubClient()
185             furl = self.tub.registerReference(sc)
186             ri_name = RIStubClient.__remote_name__
187             self.introducer_client.publish(furl, "stub_client", ri_name)
188         d = self.when_tub_ready()
189         d.addCallback(_publish)
190         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
191
192     def init_control(self):
193         d = self.when_tub_ready()
194         def _publish(res):
195             c = ControlServer()
196             c.setServiceParent(self)
197             control_url = self.tub.registerReference(c)
198             self.write_private_config("control.furl", control_url + "\n")
199         d.addCallback(_publish)
200         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
201
202     def init_helper(self):
203         d = self.when_tub_ready()
204         def _publish(self):
205             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
206             h.setServiceParent(self)
207             # TODO: this is confusing. BASEDIR/private/helper.furl is created
208             # by the helper. BASEDIR/helper.furl is consumed by the client
209             # who wants to use the helper. I like having the filename be the
210             # same, since that makes 'cp' work smoothly, but the difference
211             # between config inputs and generated outputs is hard to see.
212             helper_furlfile = os.path.join(self.basedir,
213                                            "private", "helper.furl")
214             self.tub.registerReference(h, furlFile=helper_furlfile)
215         d.addCallback(_publish)
216         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
217
218     def init_key_gen(self, key_gen_furl):
219         d = self.when_tub_ready()
220         def _subscribe(self):
221             self.tub.connectTo(key_gen_furl, self._got_key_generator)
222         d.addCallback(_subscribe)
223         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
224
225     def _got_key_generator(self, key_generator):
226         self._key_generator = key_generator
227         key_generator.notifyOnDisconnect(self._lost_key_generator)
228
229     def _lost_key_generator(self):
230         self._key_generator = None
231
232     def init_web(self, webport):
233         self.log("init_web(webport=%s)", args=(webport,))
234
235         from allmydata.webish import WebishServer
236         nodeurl_path = os.path.join(self.basedir, "node.url")
237         ws = WebishServer(webport, nodeurl_path)
238         if self.get_config("webport_allow_localfile") is not None:
239             ws.allow_local_access(True)
240         self.add_service(ws)
241
242     def _check_hotline(self, hotline_file):
243         if os.path.exists(hotline_file):
244             mtime = os.stat(hotline_file)[stat.ST_MTIME]
245             if mtime > time.time() - 20.0:
246                 return
247             else:
248                 self.log("hotline file too old, shutting down")
249         else:
250             self.log("hotline file missing, shutting down")
251         reactor.stop()
252
253     def get_all_peerids(self):
254         return self.introducer_client.get_all_peerids()
255
256     def get_permuted_peers(self, service_name, key):
257         """
258         @return: list of (peerid, connection,)
259         """
260         assert isinstance(service_name, str)
261         assert isinstance(key, str)
262         return self.introducer_client.get_permuted_peers(service_name, key)
263
264     def get_encoding_parameters(self):
265         return self.DEFAULT_ENCODING_PARAMETERS
266
267     def connected_to_introducer(self):
268         if self.introducer_client:
269             return self.introducer_client.connected_to_introducer()
270         return False
271
272     def get_renewal_secret(self):
273         return hashutil.my_renewal_secret_hash(self._lease_secret)
274
275     def get_cancel_secret(self):
276         return hashutil.my_cancel_secret_hash(self._lease_secret)
277
278     def debug_wait_for_client_connections(self, num_clients):
279         """Return a Deferred that fires (with None) when we have connections
280         to the given number of peers. Useful for tests that set up a
281         temporary test network and need to know when it is safe to proceed
282         with an upload or download."""
283         def _check():
284             current_clients = list(self.get_all_peerids())
285             return len(current_clients) >= num_clients
286         d = self.poll(_check, 0.5)
287         d.addCallback(lambda res: None)
288         return d
289
290
291     # these four methods are the primitives for creating filenodes and
292     # dirnodes. The first takes a URI and produces a filenode or (new-style)
293     # dirnode. The other three create brand-new filenodes/dirnodes.
294
295     def create_node_from_uri(self, u):
296         # this returns synchronously.
297         u = IURI(u)
298         if IReadonlyNewDirectoryURI.providedBy(u):
299             # new-style read-only dirnodes
300             return NewDirectoryNode(self).init_from_uri(u)
301         if INewDirectoryURI.providedBy(u):
302             # new-style dirnodes
303             return NewDirectoryNode(self).init_from_uri(u)
304         if IFileURI.providedBy(u):
305             # CHK
306             return FileNode(u, self)
307         assert IMutableFileURI.providedBy(u), u
308         return MutableFileNode(self).init_from_uri(u)
309
310     def notify_publish(self, publish_status, size):
311         self.getServiceNamed("mutable-watcher").notify_publish(publish_status,
312                                                                size)
313     def notify_retrieve(self, retrieve_status):
314         self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
315     def notify_mapupdate(self, update_status):
316         self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status)
317
318     def create_empty_dirnode(self):
319         n = NewDirectoryNode(self)
320         d = n.create(self._generate_pubprivkeys)
321         d.addCallback(lambda res: n)
322         return d
323
324     def create_mutable_file(self, contents=""):
325         n = MutableFileNode(self)
326         d = n.create(contents, self._generate_pubprivkeys)
327         d.addCallback(lambda res: n)
328         return d
329
330     def _generate_pubprivkeys(self, key_size):
331         if self._key_generator:
332             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
333             def make_key_objs((verifying_key, signing_key)):
334                 v = rsa.create_verifying_key_from_string(verifying_key)
335                 s = rsa.create_signing_key_from_string(signing_key)
336                 return v, s
337             d.addCallback(make_key_objs)
338             return d
339         else:
340             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
341             # secs
342             signer = rsa.generate(key_size)
343             verifier = signer.get_verifying_key()
344             return verifier, signer
345
346     def upload(self, uploadable):
347         uploader = self.getServiceNamed("uploader")
348         return uploader.upload(uploadable)
349
350
351     def list_all_upload_statuses(self):
352         uploader = self.getServiceNamed("uploader")
353         return uploader.list_all_upload_statuses()
354
355     def list_all_download_statuses(self):
356         downloader = self.getServiceNamed("downloader")
357         return downloader.list_all_download_statuses()
358
359     def list_all_mapupdate_statuses(self):
360         watcher = self.getServiceNamed("mutable-watcher")
361         return watcher.list_all_mapupdate_statuses()
362     def list_all_publish_statuses(self):
363         watcher = self.getServiceNamed("mutable-watcher")
364         return watcher.list_all_publish_statuses()
365     def list_all_retrieve_statuses(self):
366         watcher = self.getServiceNamed("mutable-watcher")
367         return watcher.list_all_retrieve_statuses()
368
369     def list_all_helper_statuses(self):
370         try:
371             helper = self.getServiceNamed("helper")
372         except KeyError:
373             return []
374         return helper.get_all_upload_statuses()
375