1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
5 from zope.interface import implements
6 from twisted.internet import reactor
7 from twisted.application.internet import TimerService
8 from foolscap.api import Referenceable
9 from pycryptopp.publickey import rsa
12 from allmydata.storage.server import StorageServer
13 from allmydata import storage_client
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.download import Downloader
16 from allmydata.immutable.filenode import FileNode, LiteralFileNode
17 from allmydata.immutable.offloaded import Helper
18 from allmydata.control import ControlServer
19 from allmydata.introducer.client import IntroducerClient
20 from allmydata.util import hashutil, base32, pollmixin, cachedir, log
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.uri import LiteralFileURI
24 from allmydata.dirnode import NewDirectoryNode
25 from allmydata.mutable.filenode import MutableFileNode
26 from allmydata.stats import StatsProvider
27 from allmydata.history import History
28 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
29 IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
37 class StubClient(Referenceable):
38 implements(RIStubClient)
41 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
43 class Client(node.Node, pollmixin.PollMixin):
44 implements(IStatsProducer)
46 PORTNUMFILE = "client.port"
49 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
51 # This means that if a storage server treats me as though I were a
52 # 1.0.0 storage client, it will work as they expect.
53 OLDEST_SUPPORTED_VERSION = "1.0.0"
55 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
56 # is the number of shares required to reconstruct a file. 'desired' means
57 # that we will abort an upload unless we can allocate space for at least
58 # this many. 'total' is the total number of shares created by encoding.
59 # If everybody has room then this is is how many we will upload.
60 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
63 "max_segment_size": 128*KiB,
66 # set this to override the size of the RSA keys created for new mutable
67 # files. The default of None means to let mutable.filenode choose its own
68 # size, which means 2048 bits.
69 DEFAULT_MUTABLE_KEYSIZE = None
71 def __init__(self, basedir="."):
72 node.Node.__init__(self, basedir)
73 self.started_timestamp = time.time()
74 self.logSource="Client"
75 self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
76 self.init_introducer_client()
77 self.init_stats_provider()
78 self.init_lease_secret()
81 if self.get_config("helper", "enabled", False, boolean=True):
84 self._key_generator = None
85 key_gen_furl = self.get_config("client", "key_generator.furl", None)
87 self.init_key_gen(key_gen_furl)
88 # ControlServer and Helper are attached after Tub startup
89 self.init_ftp_server()
90 self.init_sftp_server()
92 hotline_file = os.path.join(self.basedir,
93 self.SUICIDE_PREVENTION_HOTLINE_FILE)
94 if os.path.exists(hotline_file):
95 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
96 self.log("hotline file noticed (%ds old), starting timer" % age)
97 hotline = TimerService(1.0, self._check_hotline, hotline_file)
98 hotline.setServiceParent(self)
100 # this needs to happen last, so it can use getServiceNamed() to
101 # acquire references to StorageServer and other web-statusable things
102 webport = self.get_config("node", "web.port", None)
104 self.init_web(webport) # strports string
106 def read_old_config_files(self):
107 node.Node.read_old_config_files(self)
108 copy = self._copy_config_from_file
109 copy("introducer.furl", "client", "introducer.furl")
110 copy("helper.furl", "client", "helper.furl")
111 copy("key_generator.furl", "client", "key_generator.furl")
112 copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
113 if os.path.exists(os.path.join(self.basedir, "no_storage")):
114 self.set_config("storage", "enabled", "false")
115 if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
116 self.set_config("storage", "readonly", "true")
117 if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
118 self.set_config("storage", "debug_discard", "true")
119 if os.path.exists(os.path.join(self.basedir, "run_helper")):
120 self.set_config("helper", "enabled", "true")
122 def init_introducer_client(self):
123 self.introducer_furl = self.get_config("client", "introducer.furl")
124 ic = IntroducerClient(self.tub, self.introducer_furl,
126 str(allmydata.__full_version__),
127 str(self.OLDEST_SUPPORTED_VERSION))
128 self.introducer_client = ic
129 # hold off on starting the IntroducerClient until our tub has been
130 # started, so we'll have a useful address on our RemoteReference, so
131 # that the introducer's status page will show us.
132 d = self.when_tub_ready()
133 def _start_introducer_client(res):
134 ic.setServiceParent(self)
135 d.addCallback(_start_introducer_client)
136 d.addErrback(log.err, facility="tahoe.init",
137 level=log.BAD, umid="URyI5w")
139 def init_stats_provider(self):
140 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
141 self.stats_provider = StatsProvider(self, gatherer_furl)
142 self.add_service(self.stats_provider)
143 self.stats_provider.register_producer(self)
146 return { 'node.uptime': time.time() - self.started_timestamp }
148 def init_lease_secret(self):
149 secret_s = self.get_or_create_private_config("secret", _make_secret)
150 self._lease_secret = base32.a2b(secret_s)
152 def init_storage(self):
153 # should we run a storage server (and publish it for others to use)?
154 if not self.get_config("storage", "enabled", True, boolean=True):
156 readonly = self.get_config("storage", "readonly", False, boolean=True)
158 storedir = os.path.join(self.basedir, self.STOREDIR)
160 data = self.get_config("storage", "reserved_space", None)
163 reserved = parse_abbreviated_size(data)
165 log.msg("[storage]reserved_space= contains unparseable value %s"
169 discard = self.get_config("storage", "debug_discard", False,
172 expire = self.get_config("storage", "expire.enabled", False, boolean=True)
174 mode = self.get_config("storage", "expire.mode") # require a mode
176 mode = self.get_config("storage", "expire.mode", "age")
178 o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
179 if o_l_d is not None:
180 o_l_d = parse_duration(o_l_d)
183 if mode == "cutoff-date":
184 cutoff_date = self.get_config("storage", "expire.cutoff_date")
185 cutoff_date = parse_date(cutoff_date)
188 if self.get_config("storage", "expire.immutable", True, boolean=True):
189 sharetypes.append("immutable")
190 if self.get_config("storage", "expire.mutable", True, boolean=True):
191 sharetypes.append("mutable")
192 expiration_sharetypes = tuple(sharetypes)
194 ss = StorageServer(storedir, self.nodeid,
195 reserved_space=reserved,
196 discard_storage=discard,
197 readonly_storage=readonly,
198 stats_provider=self.stats_provider,
199 expiration_enabled=expire,
200 expiration_mode=mode,
201 expiration_override_lease_duration=o_l_d,
202 expiration_cutoff_date=cutoff_date,
203 expiration_sharetypes=expiration_sharetypes)
206 d = self.when_tub_ready()
207 # we can't do registerReference until the Tub is ready
209 furl_file = os.path.join(self.basedir, "private", "storage.furl")
210 furl = self.tub.registerReference(ss, furlFile=furl_file)
211 ri_name = RIStorageServer.__remote_name__
212 self.introducer_client.publish(furl, "storage", ri_name)
213 d.addCallback(_publish)
214 d.addErrback(log.err, facility="tahoe.init",
215 level=log.BAD, umid="aLGBKw")
217 def init_client(self):
218 helper_furl = self.get_config("client", "helper.furl", None)
219 DEP = self.DEFAULT_ENCODING_PARAMETERS
220 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
221 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
222 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
223 convergence_s = self.get_or_create_private_config('convergence', _make_secret)
224 self.convergence = base32.a2b(convergence_s)
225 self._node_cache = weakref.WeakValueDictionary() # uri -> node
227 self.init_client_storage_broker()
228 self.add_service(History(self.stats_provider))
229 self.add_service(Uploader(helper_furl, self.stats_provider))
230 download_cachedir = os.path.join(self.basedir,
231 "private", "cache", "download")
232 self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
233 self.download_cache.setServiceParent(self)
234 self.add_service(Downloader(self.stats_provider))
235 self.init_stub_client()
237 def init_client_storage_broker(self):
238 # create a StorageFarmBroker object, for use by Uploader/Downloader
239 # (and everybody else who wants to use storage servers)
240 sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
241 self.storage_broker = sb
243 # load static server specifications from tahoe.cfg, if any.
244 # Not quite ready yet.
245 #if self.config.has_section("client-server-selection"):
246 # server_params = {} # maps serverid to dict of parameters
247 # for (name, value) in self.config.items("client-server-selection"):
248 # pieces = name.split(".")
249 # if pieces[0] == "server":
250 # serverid = pieces[1]
251 # if serverid not in server_params:
252 # server_params[serverid] = {}
253 # server_params[serverid][pieces[2]] = value
254 # for serverid, params in server_params.items():
255 # server_type = params.pop("type")
256 # if server_type == "tahoe-foolscap":
257 # s = storage_client.NativeStorageClient(*params)
259 # msg = ("unrecognized server type '%s' in "
260 # "tahoe.cfg [client-server-selection]server.%s.type"
261 # % (server_type, serverid))
262 # raise storage_client.UnknownServerTypeError(msg)
263 # sb.add_server(s.serverid, s)
265 # check to see if we're supposed to use the introducer too
266 if self.get_config("client-server-selection", "use_introducer",
267 default=True, boolean=True):
268 sb.use_introducer(self.introducer_client)
270 def get_storage_broker(self):
271 return self.storage_broker
273 def init_stub_client(self):
275 # we publish an empty object so that the introducer can count how
276 # many clients are connected and see what versions they're
279 furl = self.tub.registerReference(sc)
280 ri_name = RIStubClient.__remote_name__
281 self.introducer_client.publish(furl, "stub_client", ri_name)
282 d = self.when_tub_ready()
283 d.addCallback(_publish)
284 d.addErrback(log.err, facility="tahoe.init",
285 level=log.BAD, umid="OEHq3g")
287 def get_history(self):
288 return self.getServiceNamed("history")
290 def init_control(self):
291 d = self.when_tub_ready()
294 c.setServiceParent(self)
295 control_url = self.tub.registerReference(c)
296 self.write_private_config("control.furl", control_url + "\n")
297 d.addCallback(_publish)
298 d.addErrback(log.err, facility="tahoe.init",
299 level=log.BAD, umid="d3tNXA")
301 def init_helper(self):
302 d = self.when_tub_ready()
304 h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
305 h.setServiceParent(self)
306 # TODO: this is confusing. BASEDIR/private/helper.furl is created
307 # by the helper. BASEDIR/helper.furl is consumed by the client
308 # who wants to use the helper. I like having the filename be the
309 # same, since that makes 'cp' work smoothly, but the difference
310 # between config inputs and generated outputs is hard to see.
311 helper_furlfile = os.path.join(self.basedir,
312 "private", "helper.furl")
313 self.tub.registerReference(h, furlFile=helper_furlfile)
314 d.addCallback(_publish)
315 d.addErrback(log.err, facility="tahoe.init",
316 level=log.BAD, umid="K0mW5w")
318 def init_key_gen(self, key_gen_furl):
319 d = self.when_tub_ready()
320 def _subscribe(self):
321 self.tub.connectTo(key_gen_furl, self._got_key_generator)
322 d.addCallback(_subscribe)
323 d.addErrback(log.err, facility="tahoe.init",
324 level=log.BAD, umid="z9DMzw")
326 def _got_key_generator(self, key_generator):
327 self._key_generator = key_generator
328 key_generator.notifyOnDisconnect(self._lost_key_generator)
330 def _lost_key_generator(self):
331 self._key_generator = None
333 def init_web(self, webport):
334 self.log("init_web(webport=%s)", args=(webport,))
336 from allmydata.webish import WebishServer
337 nodeurl_path = os.path.join(self.basedir, "node.url")
338 staticdir = self.get_config("node", "web.static", "public_html")
339 staticdir = os.path.expanduser(staticdir)
340 ws = WebishServer(self, webport, nodeurl_path, staticdir)
343 def init_ftp_server(self):
344 if self.get_config("ftpd", "enabled", False, boolean=True):
345 accountfile = self.get_config("ftpd", "accounts.file", None)
346 accounturl = self.get_config("ftpd", "accounts.url", None)
347 ftp_portstr = self.get_config("ftpd", "port", "8021")
349 from allmydata.frontends import ftpd
350 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
351 s.setServiceParent(self)
353 def init_sftp_server(self):
354 if self.get_config("sftpd", "enabled", False, boolean=True):
355 accountfile = self.get_config("sftpd", "accounts.file", None)
356 accounturl = self.get_config("sftpd", "accounts.url", None)
357 sftp_portstr = self.get_config("sftpd", "port", "8022")
358 pubkey_file = self.get_config("sftpd", "host_pubkey_file")
359 privkey_file = self.get_config("sftpd", "host_privkey_file")
361 from allmydata.frontends import sftpd
362 s = sftpd.SFTPServer(self, accountfile, accounturl,
363 sftp_portstr, pubkey_file, privkey_file)
364 s.setServiceParent(self)
366 def _check_hotline(self, hotline_file):
367 if os.path.exists(hotline_file):
368 mtime = os.stat(hotline_file)[stat.ST_MTIME]
369 if mtime > time.time() - 120.0:
372 self.log("hotline file too old, shutting down")
374 self.log("hotline file missing, shutting down")
377 def get_encoding_parameters(self):
378 return self.DEFAULT_ENCODING_PARAMETERS
380 def connected_to_introducer(self):
381 if self.introducer_client:
382 return self.introducer_client.connected_to_introducer()
385 def get_renewal_secret(self):
386 return hashutil.my_renewal_secret_hash(self._lease_secret)
388 def get_cancel_secret(self):
389 return hashutil.my_cancel_secret_hash(self._lease_secret)
391 def debug_wait_for_client_connections(self, num_clients):
392 """Return a Deferred that fires (with None) when we have connections
393 to the given number of peers. Useful for tests that set up a
394 temporary test network and need to know when it is safe to proceed
395 with an upload or download."""
397 return len(self.storage_broker.get_all_servers()) >= num_clients
398 d = self.poll(_check, 0.5)
399 d.addCallback(lambda res: None)
403 # these four methods are the primitives for creating filenodes and
404 # dirnodes. The first takes a URI and produces a filenode or (new-style)
405 # dirnode. The other three create brand-new filenodes/dirnodes.
407 def create_node_from_uri(self, u, readcap=None):
408 # this returns synchronously.
413 if u_s not in self._node_cache:
414 if IReadonlyNewDirectoryURI.providedBy(u):
415 # new-style read-only dirnodes
416 node = NewDirectoryNode(self).init_from_uri(u)
417 elif INewDirectoryURI.providedBy(u):
419 node = NewDirectoryNode(self).init_from_uri(u)
420 elif IFileURI.providedBy(u):
421 if isinstance(u, LiteralFileURI):
422 node = LiteralFileNode(u, self) # LIT
424 key = base32.b2a(u.storage_index)
425 cachefile = self.download_cache.get_file(key)
426 node = FileNode(u, self, cachefile) # CHK
428 assert IMutableFileURI.providedBy(u), u
429 node = MutableFileNode(self).init_from_uri(u)
430 self._node_cache[u_s] = node
431 return self._node_cache[u_s]
433 def create_empty_dirnode(self):
434 n = NewDirectoryNode(self)
435 d = n.create(self._generate_pubprivkeys, self.DEFAULT_MUTABLE_KEYSIZE)
436 d.addCallback(lambda res: n)
439 def create_mutable_file(self, contents="", keysize=None):
440 keysize = keysize or self.DEFAULT_MUTABLE_KEYSIZE
441 n = MutableFileNode(self)
442 d = n.create(contents, self._generate_pubprivkeys, keysize=keysize)
443 d.addCallback(lambda res: n)
446 def _generate_pubprivkeys(self, key_size):
447 if self._key_generator:
448 d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
449 def make_key_objs((verifying_key, signing_key)):
450 v = rsa.create_verifying_key_from_string(verifying_key)
451 s = rsa.create_signing_key_from_string(signing_key)
453 d.addCallback(make_key_objs)
456 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
458 signer = rsa.generate(key_size)
459 verifier = signer.get_verifying_key()
460 return verifier, signer
462 def upload(self, uploadable):
463 uploader = self.getServiceNamed("uploader")
464 return uploader.upload(uploadable, history=self.get_history())
467 def list_all_upload_statuses(self):
468 return self.get_history().list_all_upload_statuses()
470 def list_all_download_statuses(self):
471 return self.get_history().list_all_download_statuses()
473 def list_all_mapupdate_statuses(self):
474 return self.get_history().list_all_mapupdate_statuses()
475 def list_all_publish_statuses(self):
476 return self.get_history().list_all_publish_statuses()
477 def list_all_retrieve_statuses(self):
478 return self.get_history().list_all_retrieve_statuses()
480 def list_all_helper_statuses(self):
482 helper = self.getServiceNamed("helper")
485 return helper.get_all_upload_statuses()