]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/client.py
Allow tests to pass with -OO by turning some AssertionErrors (the ones that
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / client.py
1 import os, stat, time, weakref
2 from allmydata.interfaces import RIStorageServer
3 from allmydata import node
4
5 from zope.interface import implements
6 from twisted.internet import reactor
7 from twisted.application.internet import TimerService
8 from foolscap.api import Referenceable
9 from pycryptopp.publickey import rsa
10
11 import allmydata
12 from allmydata.storage.server import StorageServer
13 from allmydata import storage_client
14 from allmydata.immutable.upload import Uploader
15 from allmydata.immutable.download import Downloader
16 from allmydata.immutable.filenode import FileNode, LiteralFileNode
17 from allmydata.immutable.offloaded import Helper
18 from allmydata.control import ControlServer
19 from allmydata.introducer.client import IntroducerClient
20 from allmydata.util import hashutil, base32, pollmixin, cachedir, log
21 from allmydata.util.abbreviate import parse_abbreviated_size
22 from allmydata.util.time_format import parse_duration, parse_date
23 from allmydata.uri import LiteralFileURI, UnknownURI
24 from allmydata.dirnode import NewDirectoryNode
25 from allmydata.mutable.filenode import MutableFileNode
26 from allmydata.unknown import UnknownNode
27 from allmydata.stats import StatsProvider
28 from allmydata.history import History
29 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
30      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient, \
31      UnhandledCapTypeError
32
33 KiB=1024
34 MiB=1024*KiB
35 GiB=1024*MiB
36 TiB=1024*GiB
37 PiB=1024*TiB
38
39 class StubClient(Referenceable):
40     implements(RIStubClient)
41
42 def _make_secret():
43     return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
44
45 class Client(node.Node, pollmixin.PollMixin):
46     implements(IStatsProducer)
47
48     PORTNUMFILE = "client.port"
49     STOREDIR = 'storage'
50     NODETYPE = "client"
51     SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
52
53     # This means that if a storage server treats me as though I were a
54     # 1.0.0 storage client, it will work as they expect.
55     OLDEST_SUPPORTED_VERSION = "1.0.0"
56
57     # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
58     # is the number of shares required to reconstruct a file. 'desired' means
59     # that we will abort an upload unless we can allocate space for at least
60     # this many. 'total' is the total number of shares created by encoding.
61     # If everybody has room then this is is how many we will upload.
62     DEFAULT_ENCODING_PARAMETERS = {"k": 3,
63                                    "happy": 7,
64                                    "n": 10,
65                                    "max_segment_size": 128*KiB,
66                                    }
67
68     # set this to override the size of the RSA keys created for new mutable
69     # files. The default of None means to let mutable.filenode choose its own
70     # size, which means 2048 bits.
71     DEFAULT_MUTABLE_KEYSIZE = None
72
73     def __init__(self, basedir="."):
74         node.Node.__init__(self, basedir)
75         self.started_timestamp = time.time()
76         self.logSource="Client"
77         self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
78         self.init_introducer_client()
79         self.init_stats_provider()
80         self.init_lease_secret()
81         self.init_storage()
82         self.init_control()
83         if self.get_config("helper", "enabled", False, boolean=True):
84             self.init_helper()
85         self.init_client()
86         self._key_generator = None
87         key_gen_furl = self.get_config("client", "key_generator.furl", None)
88         if key_gen_furl:
89             self.init_key_gen(key_gen_furl)
90         # ControlServer and Helper are attached after Tub startup
91         self.init_ftp_server()
92         self.init_sftp_server()
93
94         hotline_file = os.path.join(self.basedir,
95                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
96         if os.path.exists(hotline_file):
97             age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
98             self.log("hotline file noticed (%ds old), starting timer" % age)
99             hotline = TimerService(1.0, self._check_hotline, hotline_file)
100             hotline.setServiceParent(self)
101
102         # this needs to happen last, so it can use getServiceNamed() to
103         # acquire references to StorageServer and other web-statusable things
104         webport = self.get_config("node", "web.port", None)
105         if webport:
106             self.init_web(webport) # strports string
107
108     def read_old_config_files(self):
109         node.Node.read_old_config_files(self)
110         copy = self._copy_config_from_file
111         copy("introducer.furl", "client", "introducer.furl")
112         copy("helper.furl", "client", "helper.furl")
113         copy("key_generator.furl", "client", "key_generator.furl")
114         copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
115         if os.path.exists(os.path.join(self.basedir, "no_storage")):
116             self.set_config("storage", "enabled", "false")
117         if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
118             self.set_config("storage", "readonly", "true")
119         if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
120             self.set_config("storage", "debug_discard", "true")
121         if os.path.exists(os.path.join(self.basedir, "run_helper")):
122             self.set_config("helper", "enabled", "true")
123
124     def init_introducer_client(self):
125         self.introducer_furl = self.get_config("client", "introducer.furl")
126         ic = IntroducerClient(self.tub, self.introducer_furl,
127                               self.nickname,
128                               str(allmydata.__full_version__),
129                               str(self.OLDEST_SUPPORTED_VERSION))
130         self.introducer_client = ic
131         # hold off on starting the IntroducerClient until our tub has been
132         # started, so we'll have a useful address on our RemoteReference, so
133         # that the introducer's status page will show us.
134         d = self.when_tub_ready()
135         def _start_introducer_client(res):
136             ic.setServiceParent(self)
137         d.addCallback(_start_introducer_client)
138         d.addErrback(log.err, facility="tahoe.init",
139                      level=log.BAD, umid="URyI5w")
140
141     def init_stats_provider(self):
142         gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
143         self.stats_provider = StatsProvider(self, gatherer_furl)
144         self.add_service(self.stats_provider)
145         self.stats_provider.register_producer(self)
146
147     def get_stats(self):
148         return { 'node.uptime': time.time() - self.started_timestamp }
149
150     def init_lease_secret(self):
151         secret_s = self.get_or_create_private_config("secret", _make_secret)
152         self._lease_secret = base32.a2b(secret_s)
153
154     def init_storage(self):
155         # should we run a storage server (and publish it for others to use)?
156         if not self.get_config("storage", "enabled", True, boolean=True):
157             return
158         readonly = self.get_config("storage", "readonly", False, boolean=True)
159
160         storedir = os.path.join(self.basedir, self.STOREDIR)
161
162         data = self.get_config("storage", "reserved_space", None)
163         reserved = None
164         try:
165             reserved = parse_abbreviated_size(data)
166         except ValueError:
167             log.msg("[storage]reserved_space= contains unparseable value %s"
168                     % data)
169         if reserved is None:
170             reserved = 0
171         discard = self.get_config("storage", "debug_discard", False,
172                                   boolean=True)
173
174         expire = self.get_config("storage", "expire.enabled", False, boolean=True)
175         if expire:
176             mode = self.get_config("storage", "expire.mode") # require a mode
177         else:
178             mode = self.get_config("storage", "expire.mode", "age")
179
180         o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
181         if o_l_d is not None:
182             o_l_d = parse_duration(o_l_d)
183
184         cutoff_date = None
185         if mode == "cutoff-date":
186             cutoff_date = self.get_config("storage", "expire.cutoff_date")
187             cutoff_date = parse_date(cutoff_date)
188
189         sharetypes = []
190         if self.get_config("storage", "expire.immutable", True, boolean=True):
191             sharetypes.append("immutable")
192         if self.get_config("storage", "expire.mutable", True, boolean=True):
193             sharetypes.append("mutable")
194         expiration_sharetypes = tuple(sharetypes)
195
196         ss = StorageServer(storedir, self.nodeid,
197                            reserved_space=reserved,
198                            discard_storage=discard,
199                            readonly_storage=readonly,
200                            stats_provider=self.stats_provider,
201                            expiration_enabled=expire,
202                            expiration_mode=mode,
203                            expiration_override_lease_duration=o_l_d,
204                            expiration_cutoff_date=cutoff_date,
205                            expiration_sharetypes=expiration_sharetypes)
206         self.add_service(ss)
207
208         d = self.when_tub_ready()
209         # we can't do registerReference until the Tub is ready
210         def _publish(res):
211             furl_file = os.path.join(self.basedir, "private", "storage.furl")
212             furl = self.tub.registerReference(ss, furlFile=furl_file)
213             ri_name = RIStorageServer.__remote_name__
214             self.introducer_client.publish(furl, "storage", ri_name)
215         d.addCallback(_publish)
216         d.addErrback(log.err, facility="tahoe.init",
217                      level=log.BAD, umid="aLGBKw")
218
219     def init_client(self):
220         helper_furl = self.get_config("client", "helper.furl", None)
221         DEP = self.DEFAULT_ENCODING_PARAMETERS
222         DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
223         DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
224         DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
225         convergence_s = self.get_or_create_private_config('convergence', _make_secret)
226         self.convergence = base32.a2b(convergence_s)
227         self._node_cache = weakref.WeakValueDictionary() # uri -> node
228
229         self.init_client_storage_broker()
230         self.add_service(History(self.stats_provider))
231         self.add_service(Uploader(helper_furl, self.stats_provider))
232         download_cachedir = os.path.join(self.basedir,
233                                          "private", "cache", "download")
234         self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
235         self.download_cache_dirman.setServiceParent(self)
236         self.add_service(Downloader(self.stats_provider))
237         self.init_stub_client()
238
239     def init_client_storage_broker(self):
240         # create a StorageFarmBroker object, for use by Uploader/Downloader
241         # (and everybody else who wants to use storage servers)
242         sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
243         self.storage_broker = sb
244
245         # load static server specifications from tahoe.cfg, if any.
246         # Not quite ready yet.
247         #if self.config.has_section("client-server-selection"):
248         #    server_params = {} # maps serverid to dict of parameters
249         #    for (name, value) in self.config.items("client-server-selection"):
250         #        pieces = name.split(".")
251         #        if pieces[0] == "server":
252         #            serverid = pieces[1]
253         #            if serverid not in server_params:
254         #                server_params[serverid] = {}
255         #            server_params[serverid][pieces[2]] = value
256         #    for serverid, params in server_params.items():
257         #        server_type = params.pop("type")
258         #        if server_type == "tahoe-foolscap":
259         #            s = storage_client.NativeStorageClient(*params)
260         #        else:
261         #            msg = ("unrecognized server type '%s' in "
262         #                   "tahoe.cfg [client-server-selection]server.%s.type"
263         #                   % (server_type, serverid))
264         #            raise storage_client.UnknownServerTypeError(msg)
265         #        sb.add_server(s.serverid, s)
266
267         # check to see if we're supposed to use the introducer too
268         if self.get_config("client-server-selection", "use_introducer",
269                            default=True, boolean=True):
270             sb.use_introducer(self.introducer_client)
271
272     def get_storage_broker(self):
273         return self.storage_broker
274
275     def init_stub_client(self):
276         def _publish(res):
277             # we publish an empty object so that the introducer can count how
278             # many clients are connected and see what versions they're
279             # running.
280             sc = StubClient()
281             furl = self.tub.registerReference(sc)
282             ri_name = RIStubClient.__remote_name__
283             self.introducer_client.publish(furl, "stub_client", ri_name)
284         d = self.when_tub_ready()
285         d.addCallback(_publish)
286         d.addErrback(log.err, facility="tahoe.init",
287                      level=log.BAD, umid="OEHq3g")
288
289     def get_history(self):
290         return self.getServiceNamed("history")
291
292     def init_control(self):
293         d = self.when_tub_ready()
294         def _publish(res):
295             c = ControlServer()
296             c.setServiceParent(self)
297             control_url = self.tub.registerReference(c)
298             self.write_private_config("control.furl", control_url + "\n")
299         d.addCallback(_publish)
300         d.addErrback(log.err, facility="tahoe.init",
301                      level=log.BAD, umid="d3tNXA")
302
303     def init_helper(self):
304         d = self.when_tub_ready()
305         def _publish(self):
306             h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
307             h.setServiceParent(self)
308             # TODO: this is confusing. BASEDIR/private/helper.furl is created
309             # by the helper. BASEDIR/helper.furl is consumed by the client
310             # who wants to use the helper. I like having the filename be the
311             # same, since that makes 'cp' work smoothly, but the difference
312             # between config inputs and generated outputs is hard to see.
313             helper_furlfile = os.path.join(self.basedir,
314                                            "private", "helper.furl")
315             self.tub.registerReference(h, furlFile=helper_furlfile)
316         d.addCallback(_publish)
317         d.addErrback(log.err, facility="tahoe.init",
318                      level=log.BAD, umid="K0mW5w")
319
320     def init_key_gen(self, key_gen_furl):
321         d = self.when_tub_ready()
322         def _subscribe(self):
323             self.tub.connectTo(key_gen_furl, self._got_key_generator)
324         d.addCallback(_subscribe)
325         d.addErrback(log.err, facility="tahoe.init",
326                      level=log.BAD, umid="z9DMzw")
327
328     def _got_key_generator(self, key_generator):
329         self._key_generator = key_generator
330         key_generator.notifyOnDisconnect(self._lost_key_generator)
331
332     def _lost_key_generator(self):
333         self._key_generator = None
334
335     def init_web(self, webport):
336         self.log("init_web(webport=%s)", args=(webport,))
337
338         from allmydata.webish import WebishServer
339         nodeurl_path = os.path.join(self.basedir, "node.url")
340         staticdir = self.get_config("node", "web.static", "public_html")
341         staticdir = os.path.expanduser(staticdir)
342         ws = WebishServer(self, webport, nodeurl_path, staticdir)
343         self.add_service(ws)
344
345     def init_ftp_server(self):
346         if self.get_config("ftpd", "enabled", False, boolean=True):
347             accountfile = self.get_config("ftpd", "accounts.file", None)
348             accounturl = self.get_config("ftpd", "accounts.url", None)
349             ftp_portstr = self.get_config("ftpd", "port", "8021")
350
351             from allmydata.frontends import ftpd
352             s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
353             s.setServiceParent(self)
354
355     def init_sftp_server(self):
356         if self.get_config("sftpd", "enabled", False, boolean=True):
357             accountfile = self.get_config("sftpd", "accounts.file", None)
358             accounturl = self.get_config("sftpd", "accounts.url", None)
359             sftp_portstr = self.get_config("sftpd", "port", "8022")
360             pubkey_file = self.get_config("sftpd", "host_pubkey_file")
361             privkey_file = self.get_config("sftpd", "host_privkey_file")
362
363             from allmydata.frontends import sftpd
364             s = sftpd.SFTPServer(self, accountfile, accounturl,
365                                  sftp_portstr, pubkey_file, privkey_file)
366             s.setServiceParent(self)
367
368     def _check_hotline(self, hotline_file):
369         if os.path.exists(hotline_file):
370             mtime = os.stat(hotline_file)[stat.ST_MTIME]
371             if mtime > time.time() - 120.0:
372                 return
373             else:
374                 self.log("hotline file too old, shutting down")
375         else:
376             self.log("hotline file missing, shutting down")
377         reactor.stop()
378
379     def get_encoding_parameters(self):
380         return self.DEFAULT_ENCODING_PARAMETERS
381
382     def connected_to_introducer(self):
383         if self.introducer_client:
384             return self.introducer_client.connected_to_introducer()
385         return False
386
387     def get_renewal_secret(self):
388         return hashutil.my_renewal_secret_hash(self._lease_secret)
389
390     def get_cancel_secret(self):
391         return hashutil.my_cancel_secret_hash(self._lease_secret)
392
393     def debug_wait_for_client_connections(self, num_clients):
394         """Return a Deferred that fires (with None) when we have connections
395         to the given number of peers. Useful for tests that set up a
396         temporary test network and need to know when it is safe to proceed
397         with an upload or download."""
398         def _check():
399             return len(self.storage_broker.get_all_servers()) >= num_clients
400         d = self.poll(_check, 0.5)
401         d.addCallback(lambda res: None)
402         return d
403
404
405     # these four methods are the primitives for creating filenodes and
406     # dirnodes. The first takes a URI and produces a filenode or (new-style)
407     # dirnode. The other three create brand-new filenodes/dirnodes.
408
409     def create_node_from_uri(self, writecap, readcap=None):
410         # this returns synchronously.
411         u = writecap or readcap
412         if not u:
413             # maybe the writecap was hidden because we're in a readonly
414             # directory, and the future cap format doesn't have a readcap, or
415             # something.
416             return UnknownNode(writecap, readcap)
417         u = IURI(u)
418         if isinstance(u, UnknownURI):
419             return UnknownNode(writecap, readcap)
420         u_s = u.to_string()
421         if u_s not in self._node_cache:
422             if IReadonlyNewDirectoryURI.providedBy(u):
423                 # new-style read-only dirnodes
424                 node = NewDirectoryNode(self).init_from_uri(u)
425             elif INewDirectoryURI.providedBy(u):
426                 # new-style dirnodes
427                 node = NewDirectoryNode(self).init_from_uri(u)
428             elif IFileURI.providedBy(u):
429                 if isinstance(u, LiteralFileURI):
430                     node = LiteralFileNode(u, self) # LIT
431                 else:
432                     node = FileNode(u, self, self.download_cache_dirman) # CHK
433             elif IMutableFileURI.providedBy(u):
434                 node = MutableFileNode(self).init_from_uri(u)
435             else:
436                 raise UnhandledCapTypeError("cap is recognized, but has no Node")
437             self._node_cache[u_s] = node  # note: WeakValueDictionary
438         return self._node_cache[u_s]
439
440     def create_empty_dirnode(self):
441         d = self.create_mutable_file()
442         d.addCallback(NewDirectoryNode.create_with_mutablefile, self)
443         return d
444
445     def create_mutable_file(self, contents="", keysize=None):
446         keysize = keysize or self.DEFAULT_MUTABLE_KEYSIZE
447         n = MutableFileNode(self)
448         d = n.create(contents, self._generate_pubprivkeys, keysize=keysize)
449         d.addCallback(lambda res: n)
450         return d
451
452     def _generate_pubprivkeys(self, key_size):
453         if self._key_generator:
454             d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
455             def make_key_objs((verifying_key, signing_key)):
456                 v = rsa.create_verifying_key_from_string(verifying_key)
457                 s = rsa.create_signing_key_from_string(signing_key)
458                 return v, s
459             d.addCallback(make_key_objs)
460             return d
461         else:
462             # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
463             # secs
464             signer = rsa.generate(key_size)
465             verifier = signer.get_verifying_key()
466             return verifier, signer
467
468     def upload(self, uploadable):
469         uploader = self.getServiceNamed("uploader")
470         return uploader.upload(uploadable, history=self.get_history())
471
472
473     def list_all_upload_statuses(self):
474         return self.get_history().list_all_upload_statuses()
475
476     def list_all_download_statuses(self):
477         return self.get_history().list_all_download_statuses()
478
479     def list_all_mapupdate_statuses(self):
480         return self.get_history().list_all_mapupdate_statuses()
481     def list_all_publish_statuses(self):
482         return self.get_history().list_all_publish_statuses()
483     def list_all_retrieve_statuses(self):
484         return self.get_history().list_all_retrieve_statuses()
485
486     def list_all_helper_statuses(self):
487         try:
488             helper = self.getServiceNamed("helper")
489         except KeyError:
490             return []
491         return helper.get_all_upload_statuses()