2 import os, stat, time, re
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
6 from zope.interface import implements
7 from twisted.internet import reactor
8 from twisted.application.internet import TimerService
9 from foolscap import Referenceable
10 from foolscap.logging import log
13 from allmydata.storage import StorageServer
14 from allmydata.upload import Uploader
15 from allmydata.download import Downloader
16 from allmydata.checker import Checker
17 from allmydata.offloaded import Helper
18 from allmydata.control import ControlServer
19 from allmydata.introducer import IntroducerClient
20 from allmydata.util import hashutil, base32, testutil
21 from allmydata.filenode import FileNode
22 from allmydata.dirnode import NewDirectoryNode
23 from allmydata.mutable import MutableFileNode, MutableWatcher
24 from allmydata.stats import StatsProvider
25 from allmydata.interfaces import IURI, INewDirectoryURI, \
26 IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
34 class StubClient(Referenceable):
35 implements(RIStubClient)
38 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
40 class Client(node.Node, testutil.PollMixin):
41 PORTNUMFILE = "client.port"
44 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
46 # we're pretty narrow-minded right now
47 OLDEST_SUPPORTED_VERSION = allmydata.__version__
49 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
50 # is the number of shares required to reconstruct a file. 'desired' means
51 # that we will abort an upload unless we can allocate space for at least
52 # this many. 'total' is the total number of shares created by encoding.
53 # If everybody has room then this is is how many we will upload.
54 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
57 "max_segment_size": 128*KiB,
60 def __init__(self, basedir="."):
61 node.Node.__init__(self, basedir)
62 self.logSource="Client"
63 self.nickname = self.get_config("nickname")
64 if self.nickname is None:
65 self.nickname = "<unspecified>"
66 self.init_introducer_client()
67 self.init_stats_provider()
68 self.init_lease_secret()
71 run_helper = self.get_config("run_helper")
75 # ControlServer and Helper are attached after Tub startup
77 hotline_file = os.path.join(self.basedir,
78 self.SUICIDE_PREVENTION_HOTLINE_FILE)
79 if os.path.exists(hotline_file):
80 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
81 self.log("hotline file noticed (%ds old), starting timer" % age)
82 hotline = TimerService(1.0, self._check_hotline, hotline_file)
83 hotline.setServiceParent(self)
85 webport = self.get_config("webport")
87 self.init_web(webport) # strports string
89 def init_introducer_client(self):
90 self.introducer_furl = self.get_config("introducer.furl", required=True)
91 ic = IntroducerClient(self.tub, self.introducer_furl,
93 str(allmydata.__version__),
94 str(self.OLDEST_SUPPORTED_VERSION))
95 self.introducer_client = ic
96 ic.setServiceParent(self)
97 # nodes that want to upload and download will need storage servers
98 ic.subscribe_to("storage")
100 def init_stats_provider(self):
101 gatherer_furl = self.get_config('stats_gatherer.furl')
103 self.stats_provider = StatsProvider(self, gatherer_furl)
104 self.add_service(self.stats_provider)
106 self.stats_provider = None
108 def init_lease_secret(self):
109 secret_s = self.get_or_create_private_config("secret", _make_secret)
110 self._lease_secret = base32.a2b(secret_s)
112 def init_storage(self):
113 # should we run a storage server (and publish it for others to use)?
114 provide_storage = (self.get_config("no_storage") is None)
115 if not provide_storage:
117 readonly_storage = (self.get_config("readonly_storage") is not None)
119 storedir = os.path.join(self.basedir, self.STOREDIR)
122 data = self.get_config("sizelimit")
124 m = re.match(r"^(\d+)([kKmMgG]?[bB]?)$", data)
126 log.msg("SIZELIMIT_FILE contains unparseable value %s" % data)
128 number, suffix = m.groups()
129 suffix = suffix.upper()
130 if suffix.endswith("B"):
135 "G": 1000 * 1000 * 1000,
137 sizelimit = int(number) * multiplier
138 discard_storage = self.get_config("debug_discard_storage") is not None
139 ss = StorageServer(storedir, sizelimit,
140 discard_storage, readonly_storage,
143 d = self.when_tub_ready()
144 # we can't do registerReference until the Tub is ready
146 furl_file = os.path.join(self.basedir, "private", "storage.furl")
147 furl = self.tub.registerReference(ss, furlFile=furl_file)
148 ri_name = RIStorageServer.__remote_name__
149 self.introducer_client.publish(furl, "storage", ri_name)
150 d.addCallback(_publish)
151 d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
153 def init_client(self):
154 helper_furl = self.get_config("helper.furl")
155 convergence_s = self.get_or_create_private_config('convergence', _make_secret)
156 self.convergence = base32.a2b(convergence_s)
157 self.add_service(Uploader(helper_furl))
158 self.add_service(Downloader())
159 self.add_service(Checker())
160 self.add_service(MutableWatcher())
162 # we publish an empty object so that the introducer can count how
163 # many clients are connected and see what versions they're
166 furl = self.tub.registerReference(sc)
167 ri_name = RIStubClient.__remote_name__
168 self.introducer_client.publish(furl, "stub_client", ri_name)
169 d = self.when_tub_ready()
170 d.addCallback(_publish)
171 d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
173 def init_control(self):
174 d = self.when_tub_ready()
177 c.setServiceParent(self)
178 control_url = self.tub.registerReference(c)
179 self.write_private_config("control.furl", control_url + "\n")
180 d.addCallback(_publish)
181 d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
183 def init_helper(self):
184 d = self.when_tub_ready()
186 h = Helper(os.path.join(self.basedir, "helper"))
187 h.setServiceParent(self)
188 # TODO: this is confusing. BASEDIR/private/helper.furl is created
189 # by the helper. BASEDIR/helper.furl is consumed by the client
190 # who wants to use the helper. I like having the filename be the
191 # same, since that makes 'cp' work smoothly, but the difference
192 # between config inputs and generated outputs is hard to see.
193 helper_furlfile = os.path.join(self.basedir,
194 "private", "helper.furl")
195 self.tub.registerReference(h, furlFile=helper_furlfile)
196 d.addCallback(_publish)
197 d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
199 def init_web(self, webport):
200 self.log("init_web(webport=%s)", args=(webport,))
202 from allmydata.webish import WebishServer
203 nodeurl_path = os.path.join(self.basedir, "node.url")
204 ws = WebishServer(webport, nodeurl_path)
205 if self.get_config("webport_allow_localfile") is not None:
206 ws.allow_local_access(True)
209 def _check_hotline(self, hotline_file):
210 if os.path.exists(hotline_file):
211 mtime = os.stat(hotline_file)[stat.ST_MTIME]
212 if mtime > time.time() - 20.0:
215 self.log("hotline file too old, shutting down")
217 self.log("hotline file missing, shutting down")
220 def get_all_peerids(self):
221 return self.introducer_client.get_all_peerids()
223 def get_permuted_peers(self, service_name, key):
225 @return: list of (peerid, connection,)
227 assert isinstance(service_name, str)
228 assert isinstance(key, str)
229 return self.introducer_client.get_permuted_peers(service_name, key)
231 def get_encoding_parameters(self):
232 return self.DEFAULT_ENCODING_PARAMETERS
234 def connected_to_introducer(self):
235 if self.introducer_client:
236 return self.introducer_client.connected_to_introducer()
239 def get_renewal_secret(self):
240 return hashutil.my_renewal_secret_hash(self._lease_secret)
242 def get_cancel_secret(self):
243 return hashutil.my_cancel_secret_hash(self._lease_secret)
245 def debug_wait_for_client_connections(self, num_clients):
246 """Return a Deferred that fires (with None) when we have connections
247 to the given number of peers. Useful for tests that set up a
248 temporary test network and need to know when it is safe to proceed
249 with an upload or download."""
251 current_clients = list(self.get_all_peerids())
252 return len(current_clients) >= num_clients
253 d = self.poll(_check, 0.5)
254 d.addCallback(lambda res: None)
258 # these four methods are the primitives for creating filenodes and
259 # dirnodes. The first takes a URI and produces a filenode or (new-style)
260 # dirnode. The other three create brand-new filenodes/dirnodes.
262 def create_node_from_uri(self, u):
263 # this returns synchronously.
265 if IReadonlyNewDirectoryURI.providedBy(u):
266 # new-style read-only dirnodes
267 return NewDirectoryNode(self).init_from_uri(u)
268 if INewDirectoryURI.providedBy(u):
270 return NewDirectoryNode(self).init_from_uri(u)
271 if IFileURI.providedBy(u):
273 return FileNode(u, self)
274 assert IMutableFileURI.providedBy(u), u
275 return MutableFileNode(self).init_from_uri(u)
277 def notify_publish(self, p):
278 self.getServiceNamed("mutable-watcher").notify_publish(p)
279 def notify_retrieve(self, r):
280 self.getServiceNamed("mutable-watcher").notify_retrieve(r)
282 def create_empty_dirnode(self):
283 n = NewDirectoryNode(self)
285 d.addCallback(lambda res: n)
288 def create_mutable_file(self, contents=""):
289 n = MutableFileNode(self)
290 d = n.create(contents)
291 d.addCallback(lambda res: n)
294 def upload(self, uploadable):
295 uploader = self.getServiceNamed("uploader")
296 return uploader.upload(uploadable)
299 def list_all_uploads(self):
300 uploader = self.getServiceNamed("uploader")
301 return uploader.list_all_uploads()
302 def list_active_uploads(self):
303 uploader = self.getServiceNamed("uploader")
304 return uploader.list_active_uploads()
305 def list_recent_uploads(self):
306 uploader = self.getServiceNamed("uploader")
307 return uploader.list_recent_uploads()
309 def list_all_downloads(self):
310 downloader = self.getServiceNamed("downloader")
311 return downloader.list_all_downloads()
312 def list_active_downloads(self):
313 downloader = self.getServiceNamed("downloader")
314 return downloader.list_active_downloads()
315 def list_recent_downloads(self):
316 downloader = self.getServiceNamed("downloader")
317 return downloader.list_recent_downloads()
319 def list_all_publish(self):
320 watcher = self.getServiceNamed("mutable-watcher")
321 return watcher.list_all_publish()
322 def list_active_publish(self):
323 watcher = self.getServiceNamed("mutable-watcher")
324 return watcher.list_active_publish()
325 def list_recent_publish(self):
326 watcher = self.getServiceNamed("mutable-watcher")
327 return watcher.list_recent_publish()
329 def list_all_retrieve(self):
330 watcher = self.getServiceNamed("mutable-watcher")
331 return watcher.list_all_retrieve()
332 def list_active_retrieve(self):
333 watcher = self.getServiceNamed("mutable-watcher")
334 return watcher.list_active_retrieve()
335 def list_recent_retrieve(self):
336 watcher = self.getServiceNamed("mutable-watcher")
337 return watcher.list_recent_retrieve()