1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
5 from zope.interface import implements
6 from twisted.internet import reactor
7 from twisted.application.internet import TimerService
8 from foolscap.api import Referenceable
9 from pycryptopp.publickey import rsa
12 from allmydata.storage.server import StorageServer
13 from allmydata import storage_client
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.download import Downloader
16 from allmydata.immutable.filenode import FileNode, LiteralFileNode
17 from allmydata.immutable.offloaded import Helper
18 from allmydata.control import ControlServer
19 from allmydata.introducer.client import IntroducerClient
20 from allmydata.util import hashutil, base32, pollmixin, cachedir, log
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.uri import LiteralFileURI, UnknownURI
24 from allmydata.dirnode import NewDirectoryNode
25 from allmydata.mutable.filenode import MutableFileNode
26 from allmydata.unknown import UnknownNode
27 from allmydata.stats import StatsProvider
28 from allmydata.history import History
29 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
30 IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
38 class StubClient(Referenceable):
39 implements(RIStubClient)
42 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
44 class Client(node.Node, pollmixin.PollMixin):
45 implements(IStatsProducer)
47 PORTNUMFILE = "client.port"
50 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
52 # This means that if a storage server treats me as though I were a
53 # 1.0.0 storage client, it will work as they expect.
54 OLDEST_SUPPORTED_VERSION = "1.0.0"
56 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
57 # is the number of shares required to reconstruct a file. 'desired' means
58 # that we will abort an upload unless we can allocate space for at least
59 # this many. 'total' is the total number of shares created by encoding.
60 # If everybody has room then this is is how many we will upload.
61 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
64 "max_segment_size": 128*KiB,
67 # set this to override the size of the RSA keys created for new mutable
68 # files. The default of None means to let mutable.filenode choose its own
69 # size, which means 2048 bits.
70 DEFAULT_MUTABLE_KEYSIZE = None
72 def __init__(self, basedir="."):
73 node.Node.__init__(self, basedir)
74 self.started_timestamp = time.time()
75 self.logSource="Client"
76 self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
77 self.init_introducer_client()
78 self.init_stats_provider()
79 self.init_lease_secret()
82 if self.get_config("helper", "enabled", False, boolean=True):
85 self._key_generator = None
86 key_gen_furl = self.get_config("client", "key_generator.furl", None)
88 self.init_key_gen(key_gen_furl)
89 # ControlServer and Helper are attached after Tub startup
90 self.init_ftp_server()
91 self.init_sftp_server()
93 hotline_file = os.path.join(self.basedir,
94 self.SUICIDE_PREVENTION_HOTLINE_FILE)
95 if os.path.exists(hotline_file):
96 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
97 self.log("hotline file noticed (%ds old), starting timer" % age)
98 hotline = TimerService(1.0, self._check_hotline, hotline_file)
99 hotline.setServiceParent(self)
101 # this needs to happen last, so it can use getServiceNamed() to
102 # acquire references to StorageServer and other web-statusable things
103 webport = self.get_config("node", "web.port", None)
105 self.init_web(webport) # strports string
107 def read_old_config_files(self):
108 node.Node.read_old_config_files(self)
109 copy = self._copy_config_from_file
110 copy("introducer.furl", "client", "introducer.furl")
111 copy("helper.furl", "client", "helper.furl")
112 copy("key_generator.furl", "client", "key_generator.furl")
113 copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
114 if os.path.exists(os.path.join(self.basedir, "no_storage")):
115 self.set_config("storage", "enabled", "false")
116 if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
117 self.set_config("storage", "readonly", "true")
118 if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
119 self.set_config("storage", "debug_discard", "true")
120 if os.path.exists(os.path.join(self.basedir, "run_helper")):
121 self.set_config("helper", "enabled", "true")
123 def init_introducer_client(self):
124 self.introducer_furl = self.get_config("client", "introducer.furl")
125 ic = IntroducerClient(self.tub, self.introducer_furl,
127 str(allmydata.__full_version__),
128 str(self.OLDEST_SUPPORTED_VERSION))
129 self.introducer_client = ic
130 # hold off on starting the IntroducerClient until our tub has been
131 # started, so we'll have a useful address on our RemoteReference, so
132 # that the introducer's status page will show us.
133 d = self.when_tub_ready()
134 def _start_introducer_client(res):
135 ic.setServiceParent(self)
136 d.addCallback(_start_introducer_client)
137 d.addErrback(log.err, facility="tahoe.init",
138 level=log.BAD, umid="URyI5w")
140 def init_stats_provider(self):
141 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
142 self.stats_provider = StatsProvider(self, gatherer_furl)
143 self.add_service(self.stats_provider)
144 self.stats_provider.register_producer(self)
147 return { 'node.uptime': time.time() - self.started_timestamp }
149 def init_lease_secret(self):
150 secret_s = self.get_or_create_private_config("secret", _make_secret)
151 self._lease_secret = base32.a2b(secret_s)
153 def init_storage(self):
154 # should we run a storage server (and publish it for others to use)?
155 if not self.get_config("storage", "enabled", True, boolean=True):
157 readonly = self.get_config("storage", "readonly", False, boolean=True)
159 storedir = os.path.join(self.basedir, self.STOREDIR)
161 data = self.get_config("storage", "reserved_space", None)
164 reserved = parse_abbreviated_size(data)
166 log.msg("[storage]reserved_space= contains unparseable value %s"
170 discard = self.get_config("storage", "debug_discard", False,
173 expire = self.get_config("storage", "expire.enabled", False, boolean=True)
175 mode = self.get_config("storage", "expire.mode") # require a mode
177 mode = self.get_config("storage", "expire.mode", "age")
179 o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
180 if o_l_d is not None:
181 o_l_d = parse_duration(o_l_d)
184 if mode == "cutoff-date":
185 cutoff_date = self.get_config("storage", "expire.cutoff_date")
186 cutoff_date = parse_date(cutoff_date)
189 if self.get_config("storage", "expire.immutable", True, boolean=True):
190 sharetypes.append("immutable")
191 if self.get_config("storage", "expire.mutable", True, boolean=True):
192 sharetypes.append("mutable")
193 expiration_sharetypes = tuple(sharetypes)
195 ss = StorageServer(storedir, self.nodeid,
196 reserved_space=reserved,
197 discard_storage=discard,
198 readonly_storage=readonly,
199 stats_provider=self.stats_provider,
200 expiration_enabled=expire,
201 expiration_mode=mode,
202 expiration_override_lease_duration=o_l_d,
203 expiration_cutoff_date=cutoff_date,
204 expiration_sharetypes=expiration_sharetypes)
207 d = self.when_tub_ready()
208 # we can't do registerReference until the Tub is ready
210 furl_file = os.path.join(self.basedir, "private", "storage.furl")
211 furl = self.tub.registerReference(ss, furlFile=furl_file)
212 ri_name = RIStorageServer.__remote_name__
213 self.introducer_client.publish(furl, "storage", ri_name)
214 d.addCallback(_publish)
215 d.addErrback(log.err, facility="tahoe.init",
216 level=log.BAD, umid="aLGBKw")
218 def init_client(self):
219 helper_furl = self.get_config("client", "helper.furl", None)
220 DEP = self.DEFAULT_ENCODING_PARAMETERS
221 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
222 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
223 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
224 convergence_s = self.get_or_create_private_config('convergence', _make_secret)
225 self.convergence = base32.a2b(convergence_s)
226 self._node_cache = weakref.WeakValueDictionary() # uri -> node
228 self.init_client_storage_broker()
229 self.add_service(History(self.stats_provider))
230 self.add_service(Uploader(helper_furl, self.stats_provider))
231 download_cachedir = os.path.join(self.basedir,
232 "private", "cache", "download")
233 self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
234 self.download_cache.setServiceParent(self)
235 self.add_service(Downloader(self.stats_provider))
236 self.init_stub_client()
238 def init_client_storage_broker(self):
239 # create a StorageFarmBroker object, for use by Uploader/Downloader
240 # (and everybody else who wants to use storage servers)
241 sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
242 self.storage_broker = sb
244 # load static server specifications from tahoe.cfg, if any.
245 # Not quite ready yet.
246 #if self.config.has_section("client-server-selection"):
247 # server_params = {} # maps serverid to dict of parameters
248 # for (name, value) in self.config.items("client-server-selection"):
249 # pieces = name.split(".")
250 # if pieces[0] == "server":
251 # serverid = pieces[1]
252 # if serverid not in server_params:
253 # server_params[serverid] = {}
254 # server_params[serverid][pieces[2]] = value
255 # for serverid, params in server_params.items():
256 # server_type = params.pop("type")
257 # if server_type == "tahoe-foolscap":
258 # s = storage_client.NativeStorageClient(*params)
260 # msg = ("unrecognized server type '%s' in "
261 # "tahoe.cfg [client-server-selection]server.%s.type"
262 # % (server_type, serverid))
263 # raise storage_client.UnknownServerTypeError(msg)
264 # sb.add_server(s.serverid, s)
266 # check to see if we're supposed to use the introducer too
267 if self.get_config("client-server-selection", "use_introducer",
268 default=True, boolean=True):
269 sb.use_introducer(self.introducer_client)
271 def get_storage_broker(self):
272 return self.storage_broker
274 def init_stub_client(self):
276 # we publish an empty object so that the introducer can count how
277 # many clients are connected and see what versions they're
280 furl = self.tub.registerReference(sc)
281 ri_name = RIStubClient.__remote_name__
282 self.introducer_client.publish(furl, "stub_client", ri_name)
283 d = self.when_tub_ready()
284 d.addCallback(_publish)
285 d.addErrback(log.err, facility="tahoe.init",
286 level=log.BAD, umid="OEHq3g")
288 def get_history(self):
289 return self.getServiceNamed("history")
291 def init_control(self):
292 d = self.when_tub_ready()
295 c.setServiceParent(self)
296 control_url = self.tub.registerReference(c)
297 self.write_private_config("control.furl", control_url + "\n")
298 d.addCallback(_publish)
299 d.addErrback(log.err, facility="tahoe.init",
300 level=log.BAD, umid="d3tNXA")
302 def init_helper(self):
303 d = self.when_tub_ready()
305 h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
306 h.setServiceParent(self)
307 # TODO: this is confusing. BASEDIR/private/helper.furl is created
308 # by the helper. BASEDIR/helper.furl is consumed by the client
309 # who wants to use the helper. I like having the filename be the
310 # same, since that makes 'cp' work smoothly, but the difference
311 # between config inputs and generated outputs is hard to see.
312 helper_furlfile = os.path.join(self.basedir,
313 "private", "helper.furl")
314 self.tub.registerReference(h, furlFile=helper_furlfile)
315 d.addCallback(_publish)
316 d.addErrback(log.err, facility="tahoe.init",
317 level=log.BAD, umid="K0mW5w")
319 def init_key_gen(self, key_gen_furl):
320 d = self.when_tub_ready()
321 def _subscribe(self):
322 self.tub.connectTo(key_gen_furl, self._got_key_generator)
323 d.addCallback(_subscribe)
324 d.addErrback(log.err, facility="tahoe.init",
325 level=log.BAD, umid="z9DMzw")
327 def _got_key_generator(self, key_generator):
328 self._key_generator = key_generator
329 key_generator.notifyOnDisconnect(self._lost_key_generator)
331 def _lost_key_generator(self):
332 self._key_generator = None
334 def init_web(self, webport):
335 self.log("init_web(webport=%s)", args=(webport,))
337 from allmydata.webish import WebishServer
338 nodeurl_path = os.path.join(self.basedir, "node.url")
339 staticdir = self.get_config("node", "web.static", "public_html")
340 staticdir = os.path.expanduser(staticdir)
341 ws = WebishServer(self, webport, nodeurl_path, staticdir)
344 def init_ftp_server(self):
345 if self.get_config("ftpd", "enabled", False, boolean=True):
346 accountfile = self.get_config("ftpd", "accounts.file", None)
347 accounturl = self.get_config("ftpd", "accounts.url", None)
348 ftp_portstr = self.get_config("ftpd", "port", "8021")
350 from allmydata.frontends import ftpd
351 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
352 s.setServiceParent(self)
354 def init_sftp_server(self):
355 if self.get_config("sftpd", "enabled", False, boolean=True):
356 accountfile = self.get_config("sftpd", "accounts.file", None)
357 accounturl = self.get_config("sftpd", "accounts.url", None)
358 sftp_portstr = self.get_config("sftpd", "port", "8022")
359 pubkey_file = self.get_config("sftpd", "host_pubkey_file")
360 privkey_file = self.get_config("sftpd", "host_privkey_file")
362 from allmydata.frontends import sftpd
363 s = sftpd.SFTPServer(self, accountfile, accounturl,
364 sftp_portstr, pubkey_file, privkey_file)
365 s.setServiceParent(self)
367 def _check_hotline(self, hotline_file):
368 if os.path.exists(hotline_file):
369 mtime = os.stat(hotline_file)[stat.ST_MTIME]
370 if mtime > time.time() - 120.0:
373 self.log("hotline file too old, shutting down")
375 self.log("hotline file missing, shutting down")
378 def get_encoding_parameters(self):
379 return self.DEFAULT_ENCODING_PARAMETERS
381 def connected_to_introducer(self):
382 if self.introducer_client:
383 return self.introducer_client.connected_to_introducer()
386 def get_renewal_secret(self):
387 return hashutil.my_renewal_secret_hash(self._lease_secret)
389 def get_cancel_secret(self):
390 return hashutil.my_cancel_secret_hash(self._lease_secret)
392 def debug_wait_for_client_connections(self, num_clients):
393 """Return a Deferred that fires (with None) when we have connections
394 to the given number of peers. Useful for tests that set up a
395 temporary test network and need to know when it is safe to proceed
396 with an upload or download."""
398 return len(self.storage_broker.get_all_servers()) >= num_clients
399 d = self.poll(_check, 0.5)
400 d.addCallback(lambda res: None)
404 # these four methods are the primitives for creating filenodes and
405 # dirnodes. The first takes a URI and produces a filenode or (new-style)
406 # dirnode. The other three create brand-new filenodes/dirnodes.
408 def create_node_from_uri(self, writecap, readcap=None):
409 # this returns synchronously.
410 u = writecap or readcap
412 # maybe the writecap was hidden because we're in a readonly
413 # directory, and the future cap format doesn't have a readcap, or
415 return UnknownNode(writecap, readcap)
417 if isinstance(u, UnknownURI):
418 return UnknownNode(writecap, readcap)
420 if u_s not in self._node_cache:
421 if IReadonlyNewDirectoryURI.providedBy(u):
422 # new-style read-only dirnodes
423 node = NewDirectoryNode(self).init_from_uri(u)
424 elif INewDirectoryURI.providedBy(u):
426 node = NewDirectoryNode(self).init_from_uri(u)
427 elif IFileURI.providedBy(u):
428 if isinstance(u, LiteralFileURI):
429 node = LiteralFileNode(u, self) # LIT
431 key = base32.b2a(u.storage_index)
432 cachefile = self.download_cache.get_file(key)
433 node = FileNode(u, self, cachefile) # CHK
435 assert IMutableFileURI.providedBy(u), u
436 node = MutableFileNode(self).init_from_uri(u)
437 self._node_cache[u_s] = node # note: WeakValueDictionary
438 return self._node_cache[u_s]
440 def create_empty_dirnode(self):
441 d = self.create_mutable_file()
442 d.addCallback(NewDirectoryNode.create_with_mutablefile, self)
445 def create_mutable_file(self, contents="", keysize=None):
446 keysize = keysize or self.DEFAULT_MUTABLE_KEYSIZE
447 n = MutableFileNode(self)
448 d = n.create(contents, self._generate_pubprivkeys, keysize=keysize)
449 d.addCallback(lambda res: n)
452 def _generate_pubprivkeys(self, key_size):
453 if self._key_generator:
454 d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
455 def make_key_objs((verifying_key, signing_key)):
456 v = rsa.create_verifying_key_from_string(verifying_key)
457 s = rsa.create_signing_key_from_string(signing_key)
459 d.addCallback(make_key_objs)
462 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
464 signer = rsa.generate(key_size)
465 verifier = signer.get_verifying_key()
466 return verifier, signer
468 def upload(self, uploadable):
469 uploader = self.getServiceNamed("uploader")
470 return uploader.upload(uploadable, history=self.get_history())
473 def list_all_upload_statuses(self):
474 return self.get_history().list_all_upload_statuses()
476 def list_all_download_statuses(self):
477 return self.get_history().list_all_download_statuses()
479 def list_all_mapupdate_statuses(self):
480 return self.get_history().list_all_mapupdate_statuses()
481 def list_all_publish_statuses(self):
482 return self.get_history().list_all_publish_statuses()
483 def list_all_retrieve_statuses(self):
484 return self.get_history().list_all_retrieve_statuses()
486 def list_all_helper_statuses(self):
488 helper = self.getServiceNamed("helper")
491 return helper.get_all_upload_statuses()