]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
retain 10 most recent upload/download status objects, show them in /status . Prep...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1
2 import os, stat, time, re
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
5
6 from twisted.internet import reactor
7 from twisted.application.internet import TimerService
8 from foolscap.logging import log
9
10 import allmydata
11 from allmydata.storage import StorageServer
12 from allmydata.upload import Uploader
13 from allmydata.download import Downloader
14 from allmydata.checker import Checker
15 from allmydata.offloaded import Helper
16 from allmydata.control import ControlServer
17 from allmydata.introducer import IntroducerClient
18 from allmydata.util import hashutil, base32, testutil
19 from allmydata.filenode import FileNode
20 from allmydata.dirnode import NewDirectoryNode
21 from allmydata.mutable import MutableFileNode
22 from allmydata.stats import StatsProvider
23 from allmydata.interfaces import IURI, INewDirectoryURI, \
24      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI
25
26 KiB=1024
27 MiB=1024*KiB
28 GiB=1024*MiB
29 TiB=1024*GiB
30 PiB=1024*TiB
31
32 class Client(node.Node, testutil.PollMixin):
33     PORTNUMFILE = "client.port"
34     STOREDIR = 'storage'
35     NODETYPE = "client"
36     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
37
38     # we're pretty narrow-minded right now
39     OLDEST_SUPPORTED_VERSION = allmydata.__version__
40
41     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
42     # is the number of shares required to reconstruct a file. 'desired' means
43     # that we will abort an upload unless we can allocate space for at least
44     # this many. 'total' is the total number of shares created by encoding.
45     # If everybody has room then this is is how many we will upload.
46     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
47                                    "happy": 7,
48                                    "n": 10,
49                                    "max_segment_size": 1*MiB,
50                                    }
51
52     def __init__(self, basedir="."):
53         node.Node.__init__(self, basedir)
54         self.logSource="Client"
55         self.nickname = self.get_config("nickname")
56         if self.nickname is None:
57             self.nickname = "<unspecified>"
58         self.init_introducer_client()
59         self.init_stats_provider()
60         self.init_lease_secret()
61         self.init_storage()
62         self.init_control()
63         run_helper = self.get_config("run_helper")
64         if run_helper:
65             self.init_helper()
66         helper_furl = self.get_config("helper.furl")
67         self.add_service(Uploader(helper_furl))
68         self.add_service(Downloader())
69         self.add_service(Checker())
70         # ControlServer and Helper are attached after Tub startup
71
72         hotline_file = os.path.join(self.basedir,
73                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
74         if os.path.exists(hotline_file):
75             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
76             self.log("hotline file noticed (%ds old), starting timer" % age)
77             hotline = TimerService(1.0, self._check_hotline, hotline_file)
78             hotline.setServiceParent(self)
79
80         webport = self.get_config("webport")
81         if webport:
82             self.init_web(webport) # strports string
83
84     def init_introducer_client(self):
85         self.introducer_furl = self.get_config("introducer.furl", required=True)
86         ic = IntroducerClient(self.tub, self.introducer_furl,
87                               self.nickname,
88                               str(allmydata.__version__),
89                               str(self.OLDEST_SUPPORTED_VERSION))
90         self.introducer_client = ic
91         ic.setServiceParent(self)
92         # nodes that want to upload and download will need storage servers
93         ic.subscribe_to("storage")
94
95     def init_stats_provider(self):
96         gatherer_furl = self.get_config('stats_gatherer.furl')
97         if gatherer_furl:
98             self.stats_provider = StatsProvider(self, gatherer_furl)
99             self.add_service(self.stats_provider)
100         else:
101             self.stats_provider = None
102
103     def init_lease_secret(self):
104         def make_secret():
105             return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
106         secret_s = self.get_or_create_private_config("secret", make_secret)
107         self._lease_secret = base32.a2b(secret_s)
108
109     def init_storage(self):
110         # should we run a storage server (and publish it for others to use)?
111         provide_storage = (self.get_config("no_storage") is None)
112         if not provide_storage:
113             return
114         readonly_storage = (self.get_config("readonly_storage") is not None)
115
116         storedir = os.path.join(self.basedir, self.STOREDIR)
117         sizelimit = None
118
119         data = self.get_config("sizelimit")
120         if data:
121             m = re.match(r"^(\d+)([kKmMgG]?[bB]?)$", data)
122             if not m:
123                 log.msg("SIZELIMIT_FILE contains unparseable value %s" % data)
124             else:
125                 number, suffix = m.groups()
126                 suffix = suffix.upper()
127                 if suffix.endswith("B"):
128                     suffix = suffix[:-1]
129                 multiplier = {"": 1,
130                               "K": 1000,
131                               "M": 1000 * 1000,
132                               "G": 1000 * 1000 * 1000,
133                               }[suffix]
134                 sizelimit = int(number) * multiplier
135         discard_storage = self.get_config("debug_discard_storage") is not None
136         ss = StorageServer(storedir, sizelimit,
137                            discard_storage, readonly_storage,
138                            self.stats_provider)
139         self.add_service(ss)
140         d = self.when_tub_ready()
141         # we can't do registerReference until the Tub is ready
142         def _publish(res):
143             furl_file = os.path.join(self.basedir, "private", "storage.furl")
144             furl = self.tub.registerReference(ss, furlFile=furl_file)
145             ri_name = RIStorageServer.__remote_name__
146             self.introducer_client.publish(furl, "storage", ri_name)
147         d.addCallback(_publish)
148         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
149
150     def init_control(self):
151         d = self.when_tub_ready()
152         def _publish(res):
153             c = ControlServer()
154             c.setServiceParent(self)
155             control_url = self.tub.registerReference(c)
156             self.write_private_config("control.furl", control_url + "\n")
157         d.addCallback(_publish)
158         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
159
160     def init_helper(self):
161         d = self.when_tub_ready()
162         def _publish(self):
163             h = Helper(os.path.join(self.basedir, "helper"))
164             h.setServiceParent(self)
165             # TODO: this is confusing. BASEDIR/private/helper.furl is created
166             # by the helper. BASEDIR/helper.furl is consumed by the client
167             # who wants to use the helper. I like having the filename be the
168             # same, since that makes 'cp' work smoothly, but the difference
169             # between config inputs and generated outputs is hard to see.
170             helper_furlfile = os.path.join(self.basedir,
171                                            "private", "helper.furl")
172             self.tub.registerReference(h, furlFile=helper_furlfile)
173         d.addCallback(_publish)
174         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
175
176     def init_web(self, webport):
177         self.log("init_web(webport=%s)", args=(webport,))
178
179         from allmydata.webish import WebishServer
180         nodeurl_path = os.path.join(self.basedir, "node.url")
181         ws = WebishServer(webport, nodeurl_path)
182         if self.get_config("webport_allow_localfile") is not None:
183             ws.allow_local_access(True)
184         self.add_service(ws)
185
186     def _check_hotline(self, hotline_file):
187         if os.path.exists(hotline_file):
188             mtime = os.stat(hotline_file)[stat.ST_MTIME]
189             if mtime > time.time() - 20.0:
190                 return
191             else:
192                 self.log("hotline file too old, shutting down")
193         else:
194             self.log("hotline file missing, shutting down")
195         reactor.stop()
196
197     def get_all_peerids(self):
198         return self.introducer_client.get_all_peerids()
199
200     def get_permuted_peers(self, service_name, key):
201         """
202         @return: list of (peerid, connection,)
203         """
204         assert isinstance(service_name, str)
205         assert isinstance(key, str)
206         return self.introducer_client.get_permuted_peers(service_name, key)
207
208     def get_encoding_parameters(self):
209         return self.DEFAULT_ENCODING_PARAMETERS
210
211     def connected_to_introducer(self):
212         if self.introducer_client:
213             return self.introducer_client.connected_to_introducer()
214         return False
215
216     def get_renewal_secret(self):
217         return hashutil.my_renewal_secret_hash(self._lease_secret)
218
219     def get_cancel_secret(self):
220         return hashutil.my_cancel_secret_hash(self._lease_secret)
221
222     def debug_wait_for_client_connections(self, num_clients):
223         """Return a Deferred that fires (with None) when we have connections
224         to the given number of peers. Useful for tests that set up a
225         temporary test network and need to know when it is safe to proceed
226         with an upload or download."""
227         def _check():
228             current_clients = list(self.get_all_peerids())
229             return len(current_clients) >= num_clients
230         d = self.poll(_check, 0.5)
231         d.addCallback(lambda res: None)
232         return d
233
234
235     # these four methods are the primitives for creating filenodes and
236     # dirnodes. The first takes a URI and produces a filenode or (new-style)
237     # dirnode. The other three create brand-new filenodes/dirnodes.
238
239     def create_node_from_uri(self, u):
240         # this returns synchronously.
241         u = IURI(u)
242         if IReadonlyNewDirectoryURI.providedBy(u):
243             # new-style read-only dirnodes
244             return NewDirectoryNode(self).init_from_uri(u)
245         if INewDirectoryURI.providedBy(u):
246             # new-style dirnodes
247             return NewDirectoryNode(self).init_from_uri(u)
248         if IFileURI.providedBy(u):
249             # CHK
250             return FileNode(u, self)
251         assert IMutableFileURI.providedBy(u), u
252         return MutableFileNode(self).init_from_uri(u)
253
254     def create_empty_dirnode(self):
255         n = NewDirectoryNode(self)
256         d = n.create()
257         d.addCallback(lambda res: n)
258         return d
259
260     def create_mutable_file(self, contents=""):
261         n = MutableFileNode(self)
262         d = n.create(contents)
263         d.addCallback(lambda res: n)
264         return d
265
266     def upload(self, uploadable):
267         uploader = self.getServiceNamed("uploader")
268         return uploader.upload(uploadable)
269
270     def list_all_uploads(self):
271         uploader = self.getServiceNamed("uploader")
272         return uploader.list_all_uploads()
273
274     def list_all_downloads(self):
275         downloader = self.getServiceNamed("downloader")
276         return downloader.list_all_downloads()
277
278     def list_recent_uploads(self):
279         uploader = self.getServiceNamed("uploader")
280         return uploader.list_recent_uploads()
281
282     def list_recent_downloads(self):
283         downloader = self.getServiceNamed("downloader")
284         return downloader.list_recent_downloads()