2 import os, stat, time, re
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
6 from twisted.internet import reactor
7 from twisted.application.internet import TimerService
8 from foolscap.logging import log
11 from allmydata.storage import StorageServer
12 from allmydata.upload import Uploader
13 from allmydata.download import Downloader
14 from allmydata.checker import Checker
15 from allmydata.offloaded import Helper
16 from allmydata.control import ControlServer
17 from allmydata.introducer import IntroducerClient
18 from allmydata.util import hashutil, base32, testutil
19 from allmydata.filenode import FileNode
20 from allmydata.dirnode import NewDirectoryNode
21 from allmydata.mutable import MutableFileNode, MutableWatcher
22 from allmydata.stats import StatsProvider
23 from allmydata.interfaces import IURI, INewDirectoryURI, \
24 IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI
32 class Client(node.Node, testutil.PollMixin):
33 PORTNUMFILE = "client.port"
36 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
38 # we're pretty narrow-minded right now
39 OLDEST_SUPPORTED_VERSION = allmydata.__version__
41 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
42 # is the number of shares required to reconstruct a file. 'desired' means
43 # that we will abort an upload unless we can allocate space for at least
44 # this many. 'total' is the total number of shares created by encoding.
45 # If everybody has room then this is is how many we will upload.
46 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
49 "max_segment_size": 1*MiB,
52 def __init__(self, basedir="."):
53 node.Node.__init__(self, basedir)
54 self.logSource="Client"
55 self.nickname = self.get_config("nickname")
56 if self.nickname is None:
57 self.nickname = "<unspecified>"
58 self.init_introducer_client()
59 self.init_stats_provider()
60 self.init_lease_secret()
63 run_helper = self.get_config("run_helper")
66 helper_furl = self.get_config("helper.furl")
67 self.add_service(Uploader(helper_furl))
68 self.add_service(Downloader())
69 self.add_service(Checker())
70 self.add_service(MutableWatcher())
71 # ControlServer and Helper are attached after Tub startup
73 hotline_file = os.path.join(self.basedir,
74 self.SUICIDE_PREVENTION_HOTLINE_FILE)
75 if os.path.exists(hotline_file):
76 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
77 self.log("hotline file noticed (%ds old), starting timer" % age)
78 hotline = TimerService(1.0, self._check_hotline, hotline_file)
79 hotline.setServiceParent(self)
81 webport = self.get_config("webport")
83 self.init_web(webport) # strports string
85 def init_introducer_client(self):
86 self.introducer_furl = self.get_config("introducer.furl", required=True)
87 ic = IntroducerClient(self.tub, self.introducer_furl,
89 str(allmydata.__version__),
90 str(self.OLDEST_SUPPORTED_VERSION))
91 self.introducer_client = ic
92 ic.setServiceParent(self)
93 # nodes that want to upload and download will need storage servers
94 ic.subscribe_to("storage")
96 def init_stats_provider(self):
97 gatherer_furl = self.get_config('stats_gatherer.furl')
99 self.stats_provider = StatsProvider(self, gatherer_furl)
100 self.add_service(self.stats_provider)
102 self.stats_provider = None
104 def init_lease_secret(self):
106 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
107 secret_s = self.get_or_create_private_config("secret", make_secret)
108 self._lease_secret = base32.a2b(secret_s)
110 def init_storage(self):
111 # should we run a storage server (and publish it for others to use)?
112 provide_storage = (self.get_config("no_storage") is None)
113 if not provide_storage:
115 readonly_storage = (self.get_config("readonly_storage") is not None)
117 storedir = os.path.join(self.basedir, self.STOREDIR)
120 data = self.get_config("sizelimit")
122 m = re.match(r"^(\d+)([kKmMgG]?[bB]?)$", data)
124 log.msg("SIZELIMIT_FILE contains unparseable value %s" % data)
126 number, suffix = m.groups()
127 suffix = suffix.upper()
128 if suffix.endswith("B"):
133 "G": 1000 * 1000 * 1000,
135 sizelimit = int(number) * multiplier
136 discard_storage = self.get_config("debug_discard_storage") is not None
137 ss = StorageServer(storedir, sizelimit,
138 discard_storage, readonly_storage,
141 d = self.when_tub_ready()
142 # we can't do registerReference until the Tub is ready
144 furl_file = os.path.join(self.basedir, "private", "storage.furl")
145 furl = self.tub.registerReference(ss, furlFile=furl_file)
146 ri_name = RIStorageServer.__remote_name__
147 self.introducer_client.publish(furl, "storage", ri_name)
148 d.addCallback(_publish)
149 d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
151 def init_control(self):
152 d = self.when_tub_ready()
155 c.setServiceParent(self)
156 control_url = self.tub.registerReference(c)
157 self.write_private_config("control.furl", control_url + "\n")
158 d.addCallback(_publish)
159 d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
161 def init_helper(self):
162 d = self.when_tub_ready()
164 h = Helper(os.path.join(self.basedir, "helper"))
165 h.setServiceParent(self)
166 # TODO: this is confusing. BASEDIR/private/helper.furl is created
167 # by the helper. BASEDIR/helper.furl is consumed by the client
168 # who wants to use the helper. I like having the filename be the
169 # same, since that makes 'cp' work smoothly, but the difference
170 # between config inputs and generated outputs is hard to see.
171 helper_furlfile = os.path.join(self.basedir,
172 "private", "helper.furl")
173 self.tub.registerReference(h, furlFile=helper_furlfile)
174 d.addCallback(_publish)
175 d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
177 def init_web(self, webport):
178 self.log("init_web(webport=%s)", args=(webport,))
180 from allmydata.webish import WebishServer
181 nodeurl_path = os.path.join(self.basedir, "node.url")
182 ws = WebishServer(webport, nodeurl_path)
183 if self.get_config("webport_allow_localfile") is not None:
184 ws.allow_local_access(True)
187 def _check_hotline(self, hotline_file):
188 if os.path.exists(hotline_file):
189 mtime = os.stat(hotline_file)[stat.ST_MTIME]
190 if mtime > time.time() - 20.0:
193 self.log("hotline file too old, shutting down")
195 self.log("hotline file missing, shutting down")
198 def get_all_peerids(self):
199 return self.introducer_client.get_all_peerids()
201 def get_permuted_peers(self, service_name, key):
203 @return: list of (peerid, connection,)
205 assert isinstance(service_name, str)
206 assert isinstance(key, str)
207 return self.introducer_client.get_permuted_peers(service_name, key)
209 def get_encoding_parameters(self):
210 return self.DEFAULT_ENCODING_PARAMETERS
212 def connected_to_introducer(self):
213 if self.introducer_client:
214 return self.introducer_client.connected_to_introducer()
217 def get_renewal_secret(self):
218 return hashutil.my_renewal_secret_hash(self._lease_secret)
220 def get_cancel_secret(self):
221 return hashutil.my_cancel_secret_hash(self._lease_secret)
223 def debug_wait_for_client_connections(self, num_clients):
224 """Return a Deferred that fires (with None) when we have connections
225 to the given number of peers. Useful for tests that set up a
226 temporary test network and need to know when it is safe to proceed
227 with an upload or download."""
229 current_clients = list(self.get_all_peerids())
230 return len(current_clients) >= num_clients
231 d = self.poll(_check, 0.5)
232 d.addCallback(lambda res: None)
236 # these four methods are the primitives for creating filenodes and
237 # dirnodes. The first takes a URI and produces a filenode or (new-style)
238 # dirnode. The other three create brand-new filenodes/dirnodes.
240 def create_node_from_uri(self, u):
241 # this returns synchronously.
243 if IReadonlyNewDirectoryURI.providedBy(u):
244 # new-style read-only dirnodes
245 return NewDirectoryNode(self).init_from_uri(u)
246 if INewDirectoryURI.providedBy(u):
248 return NewDirectoryNode(self).init_from_uri(u)
249 if IFileURI.providedBy(u):
251 return FileNode(u, self)
252 assert IMutableFileURI.providedBy(u), u
253 return MutableFileNode(self).init_from_uri(u)
255 def notify_publish(self, p):
256 self.getServiceNamed("mutable-watcher").notify_publish(p)
257 def notify_retrieve(self, r):
258 self.getServiceNamed("mutable-watcher").notify_retrieve(r)
260 def create_empty_dirnode(self):
261 n = NewDirectoryNode(self)
263 d.addCallback(lambda res: n)
266 def create_mutable_file(self, contents=""):
267 n = MutableFileNode(self)
268 d = n.create(contents)
269 d.addCallback(lambda res: n)
272 def upload(self, uploadable):
273 uploader = self.getServiceNamed("uploader")
274 return uploader.upload(uploadable)
277 def list_all_uploads(self):
278 uploader = self.getServiceNamed("uploader")
279 return uploader.list_all_uploads()
280 def list_active_uploads(self):
281 uploader = self.getServiceNamed("uploader")
282 return uploader.list_active_uploads()
283 def list_recent_uploads(self):
284 uploader = self.getServiceNamed("uploader")
285 return uploader.list_recent_uploads()
287 def list_all_downloads(self):
288 downloader = self.getServiceNamed("downloader")
289 return downloader.list_all_downloads()
290 def list_active_downloads(self):
291 downloader = self.getServiceNamed("downloader")
292 return downloader.list_active_downloads()
293 def list_recent_downloads(self):
294 downloader = self.getServiceNamed("downloader")
295 return downloader.list_recent_downloads()
297 def list_all_publish(self):
298 watcher = self.getServiceNamed("mutable-watcher")
299 return watcher.list_all_publish()
300 def list_active_publish(self):
301 watcher = self.getServiceNamed("mutable-watcher")
302 return watcher.list_active_publish()
303 def list_recent_publish(self):
304 watcher = self.getServiceNamed("mutable-watcher")
305 return watcher.list_recent_publish()
307 def list_all_retrieve(self):
308 watcher = self.getServiceNamed("mutable-watcher")
309 return watcher.list_all_retrieve()
310 def list_active_retrieve(self):
311 watcher = self.getServiceNamed("mutable-watcher")
312 return watcher.list_active_retrieve()
313 def list_recent_retrieve(self):
314 watcher = self.getServiceNamed("mutable-watcher")
315 return watcher.list_recent_retrieve()