]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
webish: add primitive publish/retrieve status pages
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1
2 import os, stat, time, re
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
5
6 from twisted.internet import reactor
7 from twisted.application.internet import TimerService
8 from foolscap.logging import log
9
10 import allmydata
11 from allmydata.storage import StorageServer
12 from allmydata.upload import Uploader
13 from allmydata.download import Downloader
14 from allmydata.checker import Checker
15 from allmydata.offloaded import Helper
16 from allmydata.control import ControlServer
17 from allmydata.introducer import IntroducerClient
18 from allmydata.util import hashutil, base32, testutil
19 from allmydata.filenode import FileNode
20 from allmydata.dirnode import NewDirectoryNode
21 from allmydata.mutable import MutableFileNode, MutableWatcher
22 from allmydata.stats import StatsProvider
23 from allmydata.interfaces import IURI, INewDirectoryURI, \
24      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI
25
26 KiB=1024
27 MiB=1024*KiB
28 GiB=1024*MiB
29 TiB=1024*GiB
30 PiB=1024*TiB
31
32 class Client(node.Node, testutil.PollMixin):
33     PORTNUMFILE = "client.port"
34     STOREDIR = 'storage'
35     NODETYPE = "client"
36     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
37
38     # we're pretty narrow-minded right now
39     OLDEST_SUPPORTED_VERSION = allmydata.__version__
40
41     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
42     # is the number of shares required to reconstruct a file. 'desired' means
43     # that we will abort an upload unless we can allocate space for at least
44     # this many. 'total' is the total number of shares created by encoding.
45     # If everybody has room then this is is how many we will upload.
46     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
47                                    "happy": 7,
48                                    "n": 10,
49                                    "max_segment_size": 1*MiB,
50                                    }
51
52     def __init__(self, basedir="."):
53         node.Node.__init__(self, basedir)
54         self.logSource="Client"
55         self.nickname = self.get_config("nickname")
56         if self.nickname is None:
57             self.nickname = "<unspecified>"
58         self.init_introducer_client()
59         self.init_stats_provider()
60         self.init_lease_secret()
61         self.init_storage()
62         self.init_control()
63         run_helper = self.get_config("run_helper")
64         if run_helper:
65             self.init_helper()
66         helper_furl = self.get_config("helper.furl")
67         self.add_service(Uploader(helper_furl))
68         self.add_service(Downloader())
69         self.add_service(Checker())
70         self.add_service(MutableWatcher())
71         # ControlServer and Helper are attached after Tub startup
72
73         hotline_file = os.path.join(self.basedir,
74                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
75         if os.path.exists(hotline_file):
76             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
77             self.log("hotline file noticed (%ds old), starting timer" % age)
78             hotline = TimerService(1.0, self._check_hotline, hotline_file)
79             hotline.setServiceParent(self)
80
81         webport = self.get_config("webport")
82         if webport:
83             self.init_web(webport) # strports string
84
85     def init_introducer_client(self):
86         self.introducer_furl = self.get_config("introducer.furl", required=True)
87         ic = IntroducerClient(self.tub, self.introducer_furl,
88                               self.nickname,
89                               str(allmydata.__version__),
90                               str(self.OLDEST_SUPPORTED_VERSION))
91         self.introducer_client = ic
92         ic.setServiceParent(self)
93         # nodes that want to upload and download will need storage servers
94         ic.subscribe_to("storage")
95
96     def init_stats_provider(self):
97         gatherer_furl = self.get_config('stats_gatherer.furl')
98         if gatherer_furl:
99             self.stats_provider = StatsProvider(self, gatherer_furl)
100             self.add_service(self.stats_provider)
101         else:
102             self.stats_provider = None
103
104     def init_lease_secret(self):
105         def make_secret():
106             return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
107         secret_s = self.get_or_create_private_config("secret", make_secret)
108         self._lease_secret = base32.a2b(secret_s)
109
110     def init_storage(self):
111         # should we run a storage server (and publish it for others to use)?
112         provide_storage = (self.get_config("no_storage") is None)
113         if not provide_storage:
114             return
115         readonly_storage = (self.get_config("readonly_storage") is not None)
116
117         storedir = os.path.join(self.basedir, self.STOREDIR)
118         sizelimit = None
119
120         data = self.get_config("sizelimit")
121         if data:
122             m = re.match(r"^(\d+)([kKmMgG]?[bB]?)$", data)
123             if not m:
124                 log.msg("SIZELIMIT_FILE contains unparseable value %s" % data)
125             else:
126                 number, suffix = m.groups()
127                 suffix = suffix.upper()
128                 if suffix.endswith("B"):
129                     suffix = suffix[:-1]
130                 multiplier = {"": 1,
131                               "K": 1000,
132                               "M": 1000 * 1000,
133                               "G": 1000 * 1000 * 1000,
134                               }[suffix]
135                 sizelimit = int(number) * multiplier
136         discard_storage = self.get_config("debug_discard_storage") is not None
137         ss = StorageServer(storedir, sizelimit,
138                            discard_storage, readonly_storage,
139                            self.stats_provider)
140         self.add_service(ss)
141         d = self.when_tub_ready()
142         # we can't do registerReference until the Tub is ready
143         def _publish(res):
144             furl_file = os.path.join(self.basedir, "private", "storage.furl")
145             furl = self.tub.registerReference(ss, furlFile=furl_file)
146             ri_name = RIStorageServer.__remote_name__
147             self.introducer_client.publish(furl, "storage", ri_name)
148         d.addCallback(_publish)
149         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
150
151     def init_control(self):
152         d = self.when_tub_ready()
153         def _publish(res):
154             c = ControlServer()
155             c.setServiceParent(self)
156             control_url = self.tub.registerReference(c)
157             self.write_private_config("control.furl", control_url + "\n")
158         d.addCallback(_publish)
159         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
160
161     def init_helper(self):
162         d = self.when_tub_ready()
163         def _publish(self):
164             h = Helper(os.path.join(self.basedir, "helper"))
165             h.setServiceParent(self)
166             # TODO: this is confusing. BASEDIR/private/helper.furl is created
167             # by the helper. BASEDIR/helper.furl is consumed by the client
168             # who wants to use the helper. I like having the filename be the
169             # same, since that makes 'cp' work smoothly, but the difference
170             # between config inputs and generated outputs is hard to see.
171             helper_furlfile = os.path.join(self.basedir,
172                                            "private", "helper.furl")
173             self.tub.registerReference(h, furlFile=helper_furlfile)
174         d.addCallback(_publish)
175         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
176
177     def init_web(self, webport):
178         self.log("init_web(webport=%s)", args=(webport,))
179
180         from allmydata.webish import WebishServer
181         nodeurl_path = os.path.join(self.basedir, "node.url")
182         ws = WebishServer(webport, nodeurl_path)
183         if self.get_config("webport_allow_localfile") is not None:
184             ws.allow_local_access(True)
185         self.add_service(ws)
186
187     def _check_hotline(self, hotline_file):
188         if os.path.exists(hotline_file):
189             mtime = os.stat(hotline_file)[stat.ST_MTIME]
190             if mtime > time.time() - 20.0:
191                 return
192             else:
193                 self.log("hotline file too old, shutting down")
194         else:
195             self.log("hotline file missing, shutting down")
196         reactor.stop()
197
198     def get_all_peerids(self):
199         return self.introducer_client.get_all_peerids()
200
201     def get_permuted_peers(self, service_name, key):
202         """
203         @return: list of (peerid, connection,)
204         """
205         assert isinstance(service_name, str)
206         assert isinstance(key, str)
207         return self.introducer_client.get_permuted_peers(service_name, key)
208
209     def get_encoding_parameters(self):
210         return self.DEFAULT_ENCODING_PARAMETERS
211
212     def connected_to_introducer(self):
213         if self.introducer_client:
214             return self.introducer_client.connected_to_introducer()
215         return False
216
217     def get_renewal_secret(self):
218         return hashutil.my_renewal_secret_hash(self._lease_secret)
219
220     def get_cancel_secret(self):
221         return hashutil.my_cancel_secret_hash(self._lease_secret)
222
223     def debug_wait_for_client_connections(self, num_clients):
224         """Return a Deferred that fires (with None) when we have connections
225         to the given number of peers. Useful for tests that set up a
226         temporary test network and need to know when it is safe to proceed
227         with an upload or download."""
228         def _check():
229             current_clients = list(self.get_all_peerids())
230             return len(current_clients) >= num_clients
231         d = self.poll(_check, 0.5)
232         d.addCallback(lambda res: None)
233         return d
234
235
236     # these four methods are the primitives for creating filenodes and
237     # dirnodes. The first takes a URI and produces a filenode or (new-style)
238     # dirnode. The other three create brand-new filenodes/dirnodes.
239
240     def create_node_from_uri(self, u):
241         # this returns synchronously.
242         u = IURI(u)
243         if IReadonlyNewDirectoryURI.providedBy(u):
244             # new-style read-only dirnodes
245             return NewDirectoryNode(self).init_from_uri(u)
246         if INewDirectoryURI.providedBy(u):
247             # new-style dirnodes
248             return NewDirectoryNode(self).init_from_uri(u)
249         if IFileURI.providedBy(u):
250             # CHK
251             return FileNode(u, self)
252         assert IMutableFileURI.providedBy(u), u
253         return MutableFileNode(self).init_from_uri(u)
254
255     def notify_publish(self, p):
256         self.getServiceNamed("mutable-watcher").notify_publish(p)
257     def notify_retrieve(self, r):
258         self.getServiceNamed("mutable-watcher").notify_retrieve(r)
259
260     def create_empty_dirnode(self):
261         n = NewDirectoryNode(self)
262         d = n.create()
263         d.addCallback(lambda res: n)
264         return d
265
266     def create_mutable_file(self, contents=""):
267         n = MutableFileNode(self)
268         d = n.create(contents)
269         d.addCallback(lambda res: n)
270         return d
271
272     def upload(self, uploadable):
273         uploader = self.getServiceNamed("uploader")
274         return uploader.upload(uploadable)
275
276
277     def list_all_uploads(self):
278         uploader = self.getServiceNamed("uploader")
279         return uploader.list_all_uploads()
280     def list_active_uploads(self):
281         uploader = self.getServiceNamed("uploader")
282         return uploader.list_active_uploads()
283     def list_recent_uploads(self):
284         uploader = self.getServiceNamed("uploader")
285         return uploader.list_recent_uploads()
286
287     def list_all_downloads(self):
288         downloader = self.getServiceNamed("downloader")
289         return downloader.list_all_downloads()
290     def list_active_downloads(self):
291         downloader = self.getServiceNamed("downloader")
292         return downloader.list_active_downloads()
293     def list_recent_downloads(self):
294         downloader = self.getServiceNamed("downloader")
295         return downloader.list_recent_downloads()
296
297     def list_all_publish(self):
298         watcher = self.getServiceNamed("mutable-watcher")
299         return watcher.list_all_publish()
300     def list_active_publish(self):
301         watcher = self.getServiceNamed("mutable-watcher")
302         return watcher.list_active_publish()
303     def list_recent_publish(self):
304         watcher = self.getServiceNamed("mutable-watcher")
305         return watcher.list_recent_publish()
306
307     def list_all_retrieve(self):
308         watcher = self.getServiceNamed("mutable-watcher")
309         return watcher.list_all_retrieve()
310     def list_active_retrieve(self):
311         watcher = self.getServiceNamed("mutable-watcher")
312         return watcher.list_active_retrieve()
313     def list_recent_retrieve(self):
314         watcher = self.getServiceNamed("mutable-watcher")
315         return watcher.list_recent_retrieve()