1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
5 from zope.interface import implements
6 from twisted.internet import reactor
7 from twisted.application.internet import TimerService
8 from foolscap.api import Referenceable
9 from pycryptopp.publickey import rsa
12 from allmydata.storage.server import StorageServer
13 from allmydata import storage_client
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.download import Downloader
16 from allmydata.immutable.filenode import FileNode, LiteralFileNode
17 from allmydata.immutable.offloaded import Helper
18 from allmydata.control import ControlServer
19 from allmydata.introducer.client import IntroducerClient
20 from allmydata.util import hashutil, base32, pollmixin, cachedir, log
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.uri import LiteralFileURI, UnknownURI
24 from allmydata.dirnode import NewDirectoryNode
25 from allmydata.mutable.filenode import MutableFileNode
26 from allmydata.unknown import UnknownNode
27 from allmydata.stats import StatsProvider
28 from allmydata.history import History
29 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
30 IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient, \
39 class StubClient(Referenceable):
40 implements(RIStubClient)
43 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
45 class Client(node.Node, pollmixin.PollMixin):
46 implements(IStatsProducer)
48 PORTNUMFILE = "client.port"
51 SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
53 # This means that if a storage server treats me as though I were a
54 # 1.0.0 storage client, it will work as they expect.
55 OLDEST_SUPPORTED_VERSION = "1.0.0"
57 # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
58 # is the number of shares required to reconstruct a file. 'desired' means
59 # that we will abort an upload unless we can allocate space for at least
60 # this many. 'total' is the total number of shares created by encoding.
61 # If everybody has room then this is is how many we will upload.
62 DEFAULT_ENCODING_PARAMETERS = {"k": 3,
65 "max_segment_size": 128*KiB,
68 # set this to override the size of the RSA keys created for new mutable
69 # files. The default of None means to let mutable.filenode choose its own
70 # size, which means 2048 bits.
71 DEFAULT_MUTABLE_KEYSIZE = None
73 def __init__(self, basedir="."):
74 node.Node.__init__(self, basedir)
75 self.started_timestamp = time.time()
76 self.logSource="Client"
77 self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
78 self.init_introducer_client()
79 self.init_stats_provider()
80 self.init_lease_secret()
83 if self.get_config("helper", "enabled", False, boolean=True):
86 self._key_generator = None
87 key_gen_furl = self.get_config("client", "key_generator.furl", None)
89 self.init_key_gen(key_gen_furl)
90 # ControlServer and Helper are attached after Tub startup
91 self.init_ftp_server()
92 self.init_sftp_server()
94 hotline_file = os.path.join(self.basedir,
95 self.SUICIDE_PREVENTION_HOTLINE_FILE)
96 if os.path.exists(hotline_file):
97 age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
98 self.log("hotline file noticed (%ds old), starting timer" % age)
99 hotline = TimerService(1.0, self._check_hotline, hotline_file)
100 hotline.setServiceParent(self)
102 # this needs to happen last, so it can use getServiceNamed() to
103 # acquire references to StorageServer and other web-statusable things
104 webport = self.get_config("node", "web.port", None)
106 self.init_web(webport) # strports string
108 def read_old_config_files(self):
109 node.Node.read_old_config_files(self)
110 copy = self._copy_config_from_file
111 copy("introducer.furl", "client", "introducer.furl")
112 copy("helper.furl", "client", "helper.furl")
113 copy("key_generator.furl", "client", "key_generator.furl")
114 copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
115 if os.path.exists(os.path.join(self.basedir, "no_storage")):
116 self.set_config("storage", "enabled", "false")
117 if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
118 self.set_config("storage", "readonly", "true")
119 if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
120 self.set_config("storage", "debug_discard", "true")
121 if os.path.exists(os.path.join(self.basedir, "run_helper")):
122 self.set_config("helper", "enabled", "true")
124 def init_introducer_client(self):
125 self.introducer_furl = self.get_config("client", "introducer.furl")
126 ic = IntroducerClient(self.tub, self.introducer_furl,
128 str(allmydata.__full_version__),
129 str(self.OLDEST_SUPPORTED_VERSION))
130 self.introducer_client = ic
131 # hold off on starting the IntroducerClient until our tub has been
132 # started, so we'll have a useful address on our RemoteReference, so
133 # that the introducer's status page will show us.
134 d = self.when_tub_ready()
135 def _start_introducer_client(res):
136 ic.setServiceParent(self)
137 d.addCallback(_start_introducer_client)
138 d.addErrback(log.err, facility="tahoe.init",
139 level=log.BAD, umid="URyI5w")
141 def init_stats_provider(self):
142 gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
143 self.stats_provider = StatsProvider(self, gatherer_furl)
144 self.add_service(self.stats_provider)
145 self.stats_provider.register_producer(self)
148 return { 'node.uptime': time.time() - self.started_timestamp }
150 def init_lease_secret(self):
151 secret_s = self.get_or_create_private_config("secret", _make_secret)
152 self._lease_secret = base32.a2b(secret_s)
154 def init_storage(self):
155 # should we run a storage server (and publish it for others to use)?
156 if not self.get_config("storage", "enabled", True, boolean=True):
158 readonly = self.get_config("storage", "readonly", False, boolean=True)
160 storedir = os.path.join(self.basedir, self.STOREDIR)
162 data = self.get_config("storage", "reserved_space", None)
165 reserved = parse_abbreviated_size(data)
167 log.msg("[storage]reserved_space= contains unparseable value %s"
171 discard = self.get_config("storage", "debug_discard", False,
174 expire = self.get_config("storage", "expire.enabled", False, boolean=True)
176 mode = self.get_config("storage", "expire.mode") # require a mode
178 mode = self.get_config("storage", "expire.mode", "age")
180 o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
181 if o_l_d is not None:
182 o_l_d = parse_duration(o_l_d)
185 if mode == "cutoff-date":
186 cutoff_date = self.get_config("storage", "expire.cutoff_date")
187 cutoff_date = parse_date(cutoff_date)
190 if self.get_config("storage", "expire.immutable", True, boolean=True):
191 sharetypes.append("immutable")
192 if self.get_config("storage", "expire.mutable", True, boolean=True):
193 sharetypes.append("mutable")
194 expiration_sharetypes = tuple(sharetypes)
196 ss = StorageServer(storedir, self.nodeid,
197 reserved_space=reserved,
198 discard_storage=discard,
199 readonly_storage=readonly,
200 stats_provider=self.stats_provider,
201 expiration_enabled=expire,
202 expiration_mode=mode,
203 expiration_override_lease_duration=o_l_d,
204 expiration_cutoff_date=cutoff_date,
205 expiration_sharetypes=expiration_sharetypes)
208 d = self.when_tub_ready()
209 # we can't do registerReference until the Tub is ready
211 furl_file = os.path.join(self.basedir, "private", "storage.furl")
212 furl = self.tub.registerReference(ss, furlFile=furl_file)
213 ri_name = RIStorageServer.__remote_name__
214 self.introducer_client.publish(furl, "storage", ri_name)
215 d.addCallback(_publish)
216 d.addErrback(log.err, facility="tahoe.init",
217 level=log.BAD, umid="aLGBKw")
219 def init_client(self):
220 helper_furl = self.get_config("client", "helper.furl", None)
221 DEP = self.DEFAULT_ENCODING_PARAMETERS
222 DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
223 DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
224 DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
225 convergence_s = self.get_or_create_private_config('convergence', _make_secret)
226 self.convergence = base32.a2b(convergence_s)
227 self._node_cache = weakref.WeakValueDictionary() # uri -> node
229 self.init_client_storage_broker()
230 self.add_service(History(self.stats_provider))
231 self.add_service(Uploader(helper_furl, self.stats_provider))
232 download_cachedir = os.path.join(self.basedir,
233 "private", "cache", "download")
234 self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
235 self.download_cache_dirman.setServiceParent(self)
236 self.add_service(Downloader(self.stats_provider))
237 self.init_stub_client()
239 def init_client_storage_broker(self):
240 # create a StorageFarmBroker object, for use by Uploader/Downloader
241 # (and everybody else who wants to use storage servers)
242 sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
243 self.storage_broker = sb
245 # load static server specifications from tahoe.cfg, if any.
246 # Not quite ready yet.
247 #if self.config.has_section("client-server-selection"):
248 # server_params = {} # maps serverid to dict of parameters
249 # for (name, value) in self.config.items("client-server-selection"):
250 # pieces = name.split(".")
251 # if pieces[0] == "server":
252 # serverid = pieces[1]
253 # if serverid not in server_params:
254 # server_params[serverid] = {}
255 # server_params[serverid][pieces[2]] = value
256 # for serverid, params in server_params.items():
257 # server_type = params.pop("type")
258 # if server_type == "tahoe-foolscap":
259 # s = storage_client.NativeStorageClient(*params)
261 # msg = ("unrecognized server type '%s' in "
262 # "tahoe.cfg [client-server-selection]server.%s.type"
263 # % (server_type, serverid))
264 # raise storage_client.UnknownServerTypeError(msg)
265 # sb.add_server(s.serverid, s)
267 # check to see if we're supposed to use the introducer too
268 if self.get_config("client-server-selection", "use_introducer",
269 default=True, boolean=True):
270 sb.use_introducer(self.introducer_client)
272 def get_storage_broker(self):
273 return self.storage_broker
275 def init_stub_client(self):
277 # we publish an empty object so that the introducer can count how
278 # many clients are connected and see what versions they're
281 furl = self.tub.registerReference(sc)
282 ri_name = RIStubClient.__remote_name__
283 self.introducer_client.publish(furl, "stub_client", ri_name)
284 d = self.when_tub_ready()
285 d.addCallback(_publish)
286 d.addErrback(log.err, facility="tahoe.init",
287 level=log.BAD, umid="OEHq3g")
289 def get_history(self):
290 return self.getServiceNamed("history")
292 def init_control(self):
293 d = self.when_tub_ready()
296 c.setServiceParent(self)
297 control_url = self.tub.registerReference(c)
298 self.write_private_config("control.furl", control_url + "\n")
299 d.addCallback(_publish)
300 d.addErrback(log.err, facility="tahoe.init",
301 level=log.BAD, umid="d3tNXA")
303 def init_helper(self):
304 d = self.when_tub_ready()
306 h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
307 h.setServiceParent(self)
308 # TODO: this is confusing. BASEDIR/private/helper.furl is created
309 # by the helper. BASEDIR/helper.furl is consumed by the client
310 # who wants to use the helper. I like having the filename be the
311 # same, since that makes 'cp' work smoothly, but the difference
312 # between config inputs and generated outputs is hard to see.
313 helper_furlfile = os.path.join(self.basedir,
314 "private", "helper.furl")
315 self.tub.registerReference(h, furlFile=helper_furlfile)
316 d.addCallback(_publish)
317 d.addErrback(log.err, facility="tahoe.init",
318 level=log.BAD, umid="K0mW5w")
320 def init_key_gen(self, key_gen_furl):
321 d = self.when_tub_ready()
322 def _subscribe(self):
323 self.tub.connectTo(key_gen_furl, self._got_key_generator)
324 d.addCallback(_subscribe)
325 d.addErrback(log.err, facility="tahoe.init",
326 level=log.BAD, umid="z9DMzw")
328 def _got_key_generator(self, key_generator):
329 self._key_generator = key_generator
330 key_generator.notifyOnDisconnect(self._lost_key_generator)
332 def _lost_key_generator(self):
333 self._key_generator = None
335 def init_web(self, webport):
336 self.log("init_web(webport=%s)", args=(webport,))
338 from allmydata.webish import WebishServer
339 nodeurl_path = os.path.join(self.basedir, "node.url")
340 staticdir = self.get_config("node", "web.static", "public_html")
341 staticdir = os.path.expanduser(staticdir)
342 ws = WebishServer(self, webport, nodeurl_path, staticdir)
345 def init_ftp_server(self):
346 if self.get_config("ftpd", "enabled", False, boolean=True):
347 accountfile = self.get_config("ftpd", "accounts.file", None)
348 accounturl = self.get_config("ftpd", "accounts.url", None)
349 ftp_portstr = self.get_config("ftpd", "port", "8021")
351 from allmydata.frontends import ftpd
352 s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
353 s.setServiceParent(self)
355 def init_sftp_server(self):
356 if self.get_config("sftpd", "enabled", False, boolean=True):
357 accountfile = self.get_config("sftpd", "accounts.file", None)
358 accounturl = self.get_config("sftpd", "accounts.url", None)
359 sftp_portstr = self.get_config("sftpd", "port", "8022")
360 pubkey_file = self.get_config("sftpd", "host_pubkey_file")
361 privkey_file = self.get_config("sftpd", "host_privkey_file")
363 from allmydata.frontends import sftpd
364 s = sftpd.SFTPServer(self, accountfile, accounturl,
365 sftp_portstr, pubkey_file, privkey_file)
366 s.setServiceParent(self)
368 def _check_hotline(self, hotline_file):
369 if os.path.exists(hotline_file):
370 mtime = os.stat(hotline_file)[stat.ST_MTIME]
371 if mtime > time.time() - 120.0:
374 self.log("hotline file too old, shutting down")
376 self.log("hotline file missing, shutting down")
379 def get_encoding_parameters(self):
380 return self.DEFAULT_ENCODING_PARAMETERS
382 def connected_to_introducer(self):
383 if self.introducer_client:
384 return self.introducer_client.connected_to_introducer()
387 def get_renewal_secret(self):
388 return hashutil.my_renewal_secret_hash(self._lease_secret)
390 def get_cancel_secret(self):
391 return hashutil.my_cancel_secret_hash(self._lease_secret)
393 def debug_wait_for_client_connections(self, num_clients):
394 """Return a Deferred that fires (with None) when we have connections
395 to the given number of peers. Useful for tests that set up a
396 temporary test network and need to know when it is safe to proceed
397 with an upload or download."""
399 return len(self.storage_broker.get_all_servers()) >= num_clients
400 d = self.poll(_check, 0.5)
401 d.addCallback(lambda res: None)
405 # these four methods are the primitives for creating filenodes and
406 # dirnodes. The first takes a URI and produces a filenode or (new-style)
407 # dirnode. The other three create brand-new filenodes/dirnodes.
409 def create_node_from_uri(self, writecap, readcap=None):
410 # this returns synchronously.
411 u = writecap or readcap
413 # maybe the writecap was hidden because we're in a readonly
414 # directory, and the future cap format doesn't have a readcap, or
416 return UnknownNode(writecap, readcap)
418 if isinstance(u, UnknownURI):
419 return UnknownNode(writecap, readcap)
421 if u_s not in self._node_cache:
422 if IReadonlyNewDirectoryURI.providedBy(u):
423 # new-style read-only dirnodes
424 node = NewDirectoryNode(self).init_from_uri(u)
425 elif INewDirectoryURI.providedBy(u):
427 node = NewDirectoryNode(self).init_from_uri(u)
428 elif IFileURI.providedBy(u):
429 if isinstance(u, LiteralFileURI):
430 node = LiteralFileNode(u, self) # LIT
432 node = FileNode(u, self, self.download_cache_dirman) # CHK
433 elif IMutableFileURI.providedBy(u):
434 node = MutableFileNode(self).init_from_uri(u)
436 raise UnhandledCapTypeError("cap is recognized, but has no Node")
437 self._node_cache[u_s] = node # note: WeakValueDictionary
438 return self._node_cache[u_s]
440 def create_empty_dirnode(self):
441 d = self.create_mutable_file()
442 d.addCallback(NewDirectoryNode.create_with_mutablefile, self)
445 def create_mutable_file(self, contents="", keysize=None):
446 keysize = keysize or self.DEFAULT_MUTABLE_KEYSIZE
447 n = MutableFileNode(self)
448 d = n.create(contents, self._generate_pubprivkeys, keysize=keysize)
449 d.addCallback(lambda res: n)
452 def _generate_pubprivkeys(self, key_size):
453 if self._key_generator:
454 d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
455 def make_key_objs((verifying_key, signing_key)):
456 v = rsa.create_verifying_key_from_string(verifying_key)
457 s = rsa.create_signing_key_from_string(signing_key)
459 d.addCallback(make_key_objs)
462 # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
464 signer = rsa.generate(key_size)
465 verifier = signer.get_verifying_key()
466 return verifier, signer
468 def upload(self, uploadable):
469 uploader = self.getServiceNamed("uploader")
470 return uploader.upload(uploadable, history=self.get_history())
473 def list_all_upload_statuses(self):
474 return self.get_history().list_all_upload_statuses()
476 def list_all_download_statuses(self):
477 return self.get_history().list_all_download_statuses()
479 def list_all_mapupdate_statuses(self):
480 return self.get_history().list_all_mapupdate_statuses()
481 def list_all_publish_statuses(self):
482 return self.get_history().list_all_publish_statuses()
483 def list_all_retrieve_statuses(self):
484 return self.get_history().list_all_retrieve_statuses()
486 def list_all_helper_statuses(self):
488 helper = self.getServiceNamed("helper")
491 return helper.get_all_upload_statuses()