]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
stats: added stats reporting to the upload helper
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1
2 import os, stat, time, re
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
5
6 from zope.interface import implements
7 from twisted.internet import reactor
8 from twisted.application.internet import TimerService
9 from foolscap import Referenceable
10 from foolscap.logging import log
11 from pycryptopp.publickey import rsa
12
13 import allmydata
14 from allmydata.storage import StorageServer
15 from allmydata.upload import Uploader
16 from allmydata.download import Downloader
17 from allmydata.checker import Checker
18 from allmydata.offloaded import Helper
19 from allmydata.control import ControlServer
20 from allmydata.introducer import IntroducerClient
21 from allmydata.util import hashutil, base32, testutil
22 from allmydata.filenode import FileNode
23 from allmydata.dirnode import NewDirectoryNode
24 from allmydata.mutable import MutableFileNode, MutableWatcher
25 from allmydata.stats import StatsProvider
26 from allmydata.interfaces import IURI, INewDirectoryURI, \
27      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
28
29 KiB=1024
30 MiB=1024*KiB
31 GiB=1024*MiB
32 TiB=1024*GiB
33 PiB=1024*TiB
34
35 class StubClient(Referenceable):
36     implements(RIStubClient)
37
38 def _make_secret():
39     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
40
41 class Client(node.Node, testutil.PollMixin):
42     PORTNUMFILE = "client.port"
43     STOREDIR = 'storage'
44     NODETYPE = "client"
45     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
46
47     # we're pretty narrow-minded right now
48     OLDEST_SUPPORTED_VERSION = allmydata.__version__
49
50     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
51     # is the number of shares required to reconstruct a file. 'desired' means
52     # that we will abort an upload unless we can allocate space for at least
53     # this many. 'total' is the total number of shares created by encoding.
54     # If everybody has room then this is is how many we will upload.
55     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
56                                    "happy": 7,
57                                    "n": 10,
58                                    "max_segment_size": 128*KiB,
59                                    }
60
61     def __init__(self, basedir="."):
62         node.Node.__init__(self, basedir)
63         self.logSource="Client"
64         self.nickname = self.get_config("nickname")
65         if self.nickname is None:
66             self.nickname = "<unspecified>"
67         self.init_introducer_client()
68         self.init_stats_provider()
69         self.init_lease_secret()
70         self.init_storage()
71         self.init_control()
72         run_helper = self.get_config("run_helper")
73         if run_helper:
74             self.init_helper()
75         self.init_client()
76         self._key_generator = None
77         key_gen_furl = self.get_config('key_generator.furl')
78         if key_gen_furl:
79             self.init_key_gen(key_gen_furl)
80         # ControlServer and Helper are attached after Tub startup
81
82         hotline_file = os.path.join(self.basedir,
83                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
84         if os.path.exists(hotline_file):
85             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
86             self.log("hotline file noticed (%ds old), starting timer" % age)
87             hotline = TimerService(1.0, self._check_hotline, hotline_file)
88             hotline.setServiceParent(self)
89
90         webport = self.get_config("webport")
91         if webport:
92             self.init_web(webport) # strports string
93
94     def init_introducer_client(self):
95         self.introducer_furl = self.get_config("introducer.furl", required=True)
96         ic = IntroducerClient(self.tub, self.introducer_furl,
97                               self.nickname,
98                               str(allmydata.__version__),
99                               str(self.OLDEST_SUPPORTED_VERSION))
100         self.introducer_client = ic
101         ic.setServiceParent(self)
102         # nodes that want to upload and download will need storage servers
103         ic.subscribe_to("storage")
104
105     def init_stats_provider(self):
106         gatherer_furl = self.get_config('stats_gatherer.furl')
107         if gatherer_furl:
108             self.stats_provider = StatsProvider(self, gatherer_furl)
109             self.add_service(self.stats_provider)
110         else:
111             self.stats_provider = None
112
113     def init_lease_secret(self):
114         secret_s = self.get_or_create_private_config("secret", _make_secret)
115         self._lease_secret = base32.a2b(secret_s)
116
117     def init_storage(self):
118         # should we run a storage server (and publish it for others to use)?
119         provide_storage = (self.get_config("no_storage") is None)
120         if not provide_storage:
121             return
122         readonly_storage = (self.get_config("readonly_storage") is not None)
123
124         storedir = os.path.join(self.basedir, self.STOREDIR)
125         sizelimit = None
126
127         data = self.get_config("sizelimit")
128         if data:
129             m = re.match(r"^(\d+)([kKmMgG]?[bB]?)$", data)
130             if not m:
131                 log.msg("SIZELIMIT_FILE contains unparseable value %s" % data)
132             else:
133                 number, suffix = m.groups()
134                 suffix = suffix.upper()
135                 if suffix.endswith("B"):
136                     suffix = suffix[:-1]
137                 multiplier = {"": 1,
138                               "K": 1000,
139                               "M": 1000 * 1000,
140                               "G": 1000 * 1000 * 1000,
141                               }[suffix]
142                 sizelimit = int(number) * multiplier
143         discard_storage = self.get_config("debug_discard_storage") is not None
144         ss = StorageServer(storedir, sizelimit,
145                            discard_storage, readonly_storage,
146                            self.stats_provider)
147         self.add_service(ss)
148         d = self.when_tub_ready()
149         # we can't do registerReference until the Tub is ready
150         def _publish(res):
151             furl_file = os.path.join(self.basedir, "private", "storage.furl")
152             furl = self.tub.registerReference(ss, furlFile=furl_file)
153             ri_name = RIStorageServer.__remote_name__
154             self.introducer_client.publish(furl, "storage", ri_name)
155         d.addCallback(_publish)
156         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
157
158     def init_client(self):
159         helper_furl = self.get_config("helper.furl")
160         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
161         self.convergence = base32.a2b(convergence_s)
162         self.add_service(Uploader(helper_furl, self.stats_provider))
163         self.add_service(Downloader(self.stats_provider))
164         self.add_service(Checker())
165         self.add_service(MutableWatcher(self.stats_provider))
166         def _publish(res):
167             # we publish an empty object so that the introducer can count how
168             # many clients are connected and see what versions they're
169             # running.
170             sc = StubClient()
171             furl = self.tub.registerReference(sc)
172             ri_name = RIStubClient.__remote_name__
173             self.introducer_client.publish(furl, "stub_client", ri_name)
174         d = self.when_tub_ready()
175         d.addCallback(_publish)
176         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
177
178     def init_control(self):
179         d = self.when_tub_ready()
180         def _publish(res):
181             c = ControlServer()
182             c.setServiceParent(self)
183             control_url = self.tub.registerReference(c)
184             self.write_private_config("control.furl", control_url + "\n")
185         d.addCallback(_publish)
186         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
187
188     def init_helper(self):
189         d = self.when_tub_ready()
190         def _publish(self):
191             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
192             h.setServiceParent(self)
193             # TODO: this is confusing. BASEDIR/private/helper.furl is created
194             # by the helper. BASEDIR/helper.furl is consumed by the client
195             # who wants to use the helper. I like having the filename be the
196             # same, since that makes 'cp' work smoothly, but the difference
197             # between config inputs and generated outputs is hard to see.
198             helper_furlfile = os.path.join(self.basedir,
199                                            "private", "helper.furl")
200             self.tub.registerReference(h, furlFile=helper_furlfile)
201         d.addCallback(_publish)
202         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
203
204     def init_key_gen(self, key_gen_furl):
205         d = self.when_tub_ready()
206         def _subscribe(self):
207             self.tub.connectTo(key_gen_furl, self._got_key_generator)
208         d.addCallback(_subscribe)
209         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
210
211     def _got_key_generator(self, key_generator):
212         self._key_generator = key_generator
213         key_generator.notifyOnDisconnect(self._lost_key_generator)
214
215     def _lost_key_generator(self):
216         self._key_generator = None
217
218     def init_web(self, webport):
219         self.log("init_web(webport=%s)", args=(webport,))
220
221         from allmydata.webish import WebishServer
222         nodeurl_path = os.path.join(self.basedir, "node.url")
223         ws = WebishServer(webport, nodeurl_path)
224         if self.get_config("webport_allow_localfile") is not None:
225             ws.allow_local_access(True)
226         self.add_service(ws)
227
228     def _check_hotline(self, hotline_file):
229         if os.path.exists(hotline_file):
230             mtime = os.stat(hotline_file)[stat.ST_MTIME]
231             if mtime > time.time() - 20.0:
232                 return
233             else:
234                 self.log("hotline file too old, shutting down")
235         else:
236             self.log("hotline file missing, shutting down")
237         reactor.stop()
238
239     def get_all_peerids(self):
240         return self.introducer_client.get_all_peerids()
241
242     def get_permuted_peers(self, service_name, key):
243         """
244         @return: list of (peerid, connection,)
245         """
246         assert isinstance(service_name, str)
247         assert isinstance(key, str)
248         return self.introducer_client.get_permuted_peers(service_name, key)
249
250     def get_encoding_parameters(self):
251         return self.DEFAULT_ENCODING_PARAMETERS
252
253     def connected_to_introducer(self):
254         if self.introducer_client:
255             return self.introducer_client.connected_to_introducer()
256         return False
257
258     def get_renewal_secret(self):
259         return hashutil.my_renewal_secret_hash(self._lease_secret)
260
261     def get_cancel_secret(self):
262         return hashutil.my_cancel_secret_hash(self._lease_secret)
263
264     def debug_wait_for_client_connections(self, num_clients):
265         """Return a Deferred that fires (with None) when we have connections
266         to the given number of peers. Useful for tests that set up a
267         temporary test network and need to know when it is safe to proceed
268         with an upload or download."""
269         def _check():
270             current_clients = list(self.get_all_peerids())
271             return len(current_clients) >= num_clients
272         d = self.poll(_check, 0.5)
273         d.addCallback(lambda res: None)
274         return d
275
276
277     # these four methods are the primitives for creating filenodes and
278     # dirnodes. The first takes a URI and produces a filenode or (new-style)
279     # dirnode. The other three create brand-new filenodes/dirnodes.
280
281     def create_node_from_uri(self, u):
282         # this returns synchronously.
283         u = IURI(u)
284         if IReadonlyNewDirectoryURI.providedBy(u):
285             # new-style read-only dirnodes
286             return NewDirectoryNode(self).init_from_uri(u)
287         if INewDirectoryURI.providedBy(u):
288             # new-style dirnodes
289             return NewDirectoryNode(self).init_from_uri(u)
290         if IFileURI.providedBy(u):
291             # CHK
292             return FileNode(u, self)
293         assert IMutableFileURI.providedBy(u), u
294         return MutableFileNode(self).init_from_uri(u)
295
296     def notify_publish(self, p):
297         self.getServiceNamed("mutable-watcher").notify_publish(p)
298     def notify_retrieve(self, r):
299         self.getServiceNamed("mutable-watcher").notify_retrieve(r)
300
301     def create_empty_dirnode(self):
302         n = NewDirectoryNode(self)
303         d = n.create(self._generate_pubprivkeys)
304         d.addCallback(lambda res: n)
305         return d
306
307     def create_mutable_file(self, contents=""):
308         n = MutableFileNode(self)
309         d = n.create(contents, self._generate_pubprivkeys)
310         d.addCallback(lambda res: n)
311         return d
312
313     def _generate_pubprivkeys(self, key_size):
314         if self._key_generator:
315             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
316             def make_key_objs((verifying_key, signing_key)):
317                 v = rsa.create_verifying_key_from_string(verifying_key)
318                 s = rsa.create_signing_key_from_string(signing_key)
319                 return v, s
320             d.addCallback(make_key_objs)
321             return d
322         else:
323             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2 secs
324             signer = rsa.generate(key_size)
325             verifier = signer.get_verifying_key()
326             return verifier, signer
327
328     def upload(self, uploadable):
329         uploader = self.getServiceNamed("uploader")
330         return uploader.upload(uploadable)
331
332
333     def list_all_uploads(self):
334         uploader = self.getServiceNamed("uploader")
335         return uploader.list_all_uploads()
336     def list_active_uploads(self):
337         uploader = self.getServiceNamed("uploader")
338         return uploader.list_active_uploads()
339     def list_recent_uploads(self):
340         uploader = self.getServiceNamed("uploader")
341         return uploader.list_recent_uploads()
342
343     def list_all_downloads(self):
344         downloader = self.getServiceNamed("downloader")
345         return downloader.list_all_downloads()
346     def list_active_downloads(self):
347         downloader = self.getServiceNamed("downloader")
348         return downloader.list_active_downloads()
349     def list_recent_downloads(self):
350         downloader = self.getServiceNamed("downloader")
351         return downloader.list_recent_downloads()
352
353     def list_all_publish(self):
354         watcher = self.getServiceNamed("mutable-watcher")
355         return watcher.list_all_publish()
356     def list_active_publish(self):
357         watcher = self.getServiceNamed("mutable-watcher")
358         return watcher.list_active_publish()
359     def list_recent_publish(self):
360         watcher = self.getServiceNamed("mutable-watcher")
361         return watcher.list_recent_publish()
362
363     def list_all_retrieve(self):
364         watcher = self.getServiceNamed("mutable-watcher")
365         return watcher.list_all_retrieve()
366     def list_active_retrieve(self):
367         watcher = self.getServiceNamed("mutable-watcher")
368         return watcher.list_active_retrieve()
369     def list_recent_retrieve(self):
370         watcher = self.getServiceNamed("mutable-watcher")
371         return watcher.list_recent_retrieve()