2 import os, stat, time, re
3 from allmydata.interfaces import RIStorageServer
4 from allmydata import node
6 from twisted.internet import reactor
7 from twisted.application.internet import TimerService
8 from foolscap.logging import log
11 from allmydata.storage import StorageServer
12 from allmydata.upload import Uploader
13 from allmydata.download import Downloader
14 from allmydata.checker import Checker
15 from allmydata.offloaded import Helper
16 from allmydata.control import ControlServer
17 from allmydata.introducer import IntroducerClient
18 from allmydata.util import hashutil, base32, testutil
19 from allmydata.filenode import FileNode
20 from allmydata.dirnode import NewDirectoryNode
21 from allmydata.mutable import MutableFileNode
22 from allmydata.stats import StatsProvider
23 from allmydata.interfaces import IURI, INewDirectoryURI, \
24 IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI
32 class Client(node.Node, testutil.PollMixin):
33 PORTNUMFILE = "client.port"
36 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
38 # we're pretty narrow-minded right now
39 OLDEST_SUPPORTED_VERSION = allmydata.__version__
41 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
42 # is the number of shares required to reconstruct a file. 'desired' means
43 # that we will abort an upload unless we can allocate space for at least
44 # this many. 'total' is the total number of shares created by encoding.
45 # If everybody has room then this is is how many we will upload.
46 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
49 "max_segment_size": 1*MiB,
52 def __init__(self, basedir="."):
53 node.Node.__init__(self, basedir)
54 self.logSource="Client"
55 self.nickname = self.get_config("nickname")
56 if self.nickname is None:
57 self.nickname = "<unspecified>"
58 self.init_introducer_client()
59 self.init_stats_provider()
60 self.init_lease_secret()
63 run_helper = self.get_config("run_helper")
66 helper_furl = self.get_config("helper.furl")
67 self.add_service(Uploader(helper_furl))
68 self.add_service(Downloader())
69 self.add_service(Checker())
70 # ControlServer and Helper are attached after Tub startup
72 hotline_file = os.path.join(self.basedir,
73 self.SUICIDE_PREVENTION_HOTLINE_FILE)
74 if os.path.exists(hotline_file):
75 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
76 self.log("hotline file noticed (%ds old), starting timer" % age)
77 hotline = TimerService(1.0, self._check_hotline, hotline_file)
78 hotline.setServiceParent(self)
80 webport = self.get_config("webport")
82 self.init_web(webport) # strports string
84 def init_introducer_client(self):
85 self.introducer_furl = self.get_config("introducer.furl", required=True)
86 ic = IntroducerClient(self.tub, self.introducer_furl,
88 str(allmydata.__version__),
89 str(self.OLDEST_SUPPORTED_VERSION))
90 self.introducer_client = ic
91 ic.setServiceParent(self)
92 # nodes that want to upload and download will need storage servers
93 ic.subscribe_to("storage")
95 def init_stats_provider(self):
96 gatherer_furl = self.get_config('stats_gatherer.furl')
98 self.stats_provider = StatsProvider(self, gatherer_furl)
99 self.add_service(self.stats_provider)
101 self.stats_provider = None
103 def init_lease_secret(self):
105 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
106 secret_s = self.get_or_create_private_config("secret", make_secret)
107 self._lease_secret = base32.a2b(secret_s)
109 def init_storage(self):
110 # should we run a storage server (and publish it for others to use)?
111 provide_storage = (self.get_config("no_storage") is None)
112 if not provide_storage:
114 readonly_storage = (self.get_config("readonly_storage") is not None)
116 storedir = os.path.join(self.basedir, self.STOREDIR)
119 data = self.get_config("sizelimit")
121 m = re.match(r"^(\d+)([kKmMgG]?[bB]?)$", data)
123 log.msg("SIZELIMIT_FILE contains unparseable value %s" % data)
125 number, suffix = m.groups()
126 suffix = suffix.upper()
127 if suffix.endswith("B"):
132 "G": 1000 * 1000 * 1000,
134 sizelimit = int(number) * multiplier
135 discard_storage = self.get_config("debug_discard_storage") is not None
136 ss = StorageServer(storedir, sizelimit,
137 discard_storage, readonly_storage,
140 d = self.when_tub_ready()
141 # we can't do registerReference until the Tub is ready
143 furl_file = os.path.join(self.basedir, "private", "storage.furl")
144 furl = self.tub.registerReference(ss, furlFile=furl_file)
145 ri_name = RIStorageServer.__remote_name__
146 self.introducer_client.publish(furl, "storage", ri_name)
147 d.addCallback(_publish)
148 d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
150 def init_control(self):
151 d = self.when_tub_ready()
154 c.setServiceParent(self)
155 control_url = self.tub.registerReference(c)
156 self.write_private_config("control.furl", control_url + "\n")
157 d.addCallback(_publish)
158 d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
160 def init_helper(self):
161 d = self.when_tub_ready()
163 h = Helper(os.path.join(self.basedir, "helper"))
164 h.setServiceParent(self)
165 # TODO: this is confusing. BASEDIR/private/helper.furl is created
166 # by the helper. BASEDIR/helper.furl is consumed by the client
167 # who wants to use the helper. I like having the filename be the
168 # same, since that makes 'cp' work smoothly, but the difference
169 # between config inputs and generated outputs is hard to see.
170 helper_furlfile = os.path.join(self.basedir,
171 "private", "helper.furl")
172 self.tub.registerReference(h, furlFile=helper_furlfile)
173 d.addCallback(_publish)
174 d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
176 def init_web(self, webport):
177 self.log("init_web(webport=%s)", args=(webport,))
179 from allmydata.webish import WebishServer
180 nodeurl_path = os.path.join(self.basedir, "node.url")
181 ws = WebishServer(webport, nodeurl_path)
182 if self.get_config("webport_allow_localfile") is not None:
183 ws.allow_local_access(True)
186 def _check_hotline(self, hotline_file):
187 if os.path.exists(hotline_file):
188 mtime = os.stat(hotline_file)[stat.ST_MTIME]
189 if mtime > time.time() - 20.0:
192 self.log("hotline file too old, shutting down")
194 self.log("hotline file missing, shutting down")
197 def get_all_peerids(self):
198 return self.introducer_client.get_all_peerids()
200 def get_permuted_peers(self, service_name, key):
202 @return: list of (peerid, connection,)
204 assert isinstance(service_name, str)
205 assert isinstance(key, str)
206 return self.introducer_client.get_permuted_peers(service_name, key)
208 def get_encoding_parameters(self):
209 return self.DEFAULT_ENCODING_PARAMETERS
211 def connected_to_introducer(self):
212 if self.introducer_client:
213 return self.introducer_client.connected_to_introducer()
216 def get_renewal_secret(self):
217 return hashutil.my_renewal_secret_hash(self._lease_secret)
219 def get_cancel_secret(self):
220 return hashutil.my_cancel_secret_hash(self._lease_secret)
222 def debug_wait_for_client_connections(self, num_clients):
223 """Return a Deferred that fires (with None) when we have connections
224 to the given number of peers. Useful for tests that set up a
225 temporary test network and need to know when it is safe to proceed
226 with an upload or download."""
228 current_clients = list(self.get_all_peerids())
229 return len(current_clients) >= num_clients
230 d = self.poll(_check, 0.5)
231 d.addCallback(lambda res: None)
235 # these four methods are the primitives for creating filenodes and
236 # dirnodes. The first takes a URI and produces a filenode or (new-style)
237 # dirnode. The other three create brand-new filenodes/dirnodes.
239 def create_node_from_uri(self, u):
240 # this returns synchronously.
242 if IReadonlyNewDirectoryURI.providedBy(u):
243 # new-style read-only dirnodes
244 return NewDirectoryNode(self).init_from_uri(u)
245 if INewDirectoryURI.providedBy(u):
247 return NewDirectoryNode(self).init_from_uri(u)
248 if IFileURI.providedBy(u):
250 return FileNode(u, self)
251 assert IMutableFileURI.providedBy(u), u
252 return MutableFileNode(self).init_from_uri(u)
254 def create_empty_dirnode(self):
255 n = NewDirectoryNode(self)
257 d.addCallback(lambda res: n)
260 def create_mutable_file(self, contents=""):
261 n = MutableFileNode(self)
262 d = n.create(contents)
263 d.addCallback(lambda res: n)
266 def upload(self, uploadable):
267 uploader = self.getServiceNamed("uploader")
268 return uploader.upload(uploadable)
270 def list_all_uploads(self):
271 uploader = self.getServiceNamed("uploader")
272 return uploader.list_all_uploads()
274 def list_all_downloads(self):
275 downloader = self.getServiceNamed("downloader")
276 return downloader.list_all_downloads()
278 def list_recent_uploads(self):
279 uploader = self.getServiceNamed("uploader")
280 return uploader.list_recent_uploads()
282 def list_recent_downloads(self):
283 downloader = self.getServiceNamed("downloader")
284 return downloader.list_recent_downloads()