3 from twisted.trial import unittest
4 from twisted.application import service
6 from foolscap.api import Tub, fireEventually, flushEventualQueue
8 from allmydata import key_generator
9 from allmydata.util import pollmixin
10 from allmydata.test.common import TEST_RSA_KEY_SIZE
11 from pycryptopp.publickey import rsa
13 def flush_but_dont_ignore(res):
14 d = flushEventualQueue()
20 class KeyGenService(unittest.TestCase, pollmixin.PollMixin):
22 self.parent = service.MultiService()
23 self.parent.startService()
26 t.setOption("expose-remote-exception-types", False)
27 t.setServiceParent(self.parent)
29 t.setLocationAutomatically()
30 return fireEventually()
33 d = self.parent.stopService()
34 d.addCallback(fireEventually)
35 d.addBoth(flush_but_dont_ignore)
38 def test_key_gen_service(self):
41 #print time.asctime(), msg
44 #print 'starting key generator service'
45 keysize = TEST_RSA_KEY_SIZE
46 kgs = key_generator.KeyGeneratorService(display_furl=False, default_key_size=keysize, basedir="key_generator_service")
47 kgs.key_generator.verbose = True
48 kgs.setServiceParent(self.parent)
49 kgs.key_generator.pool_size = 8
52 return len(kgs.key_generator.keypool) == kgs.key_generator.pool_size
54 # first wait for key gen pool to fill up
56 d.addCallback(p, 'waiting for pool to fill up')
57 d.addCallback(lambda junk: self.poll(keypool_full))
59 d.addCallback(p, 'grabbing a few keys')
60 # grab a few keys, check that pool size shrinks
61 def get_key(junk=None):
62 d = self.tub.getReference(kgs.keygen_furl)
63 d.addCallback(lambda kg: kg.callRemote('get_rsa_key_pair', keysize))
66 def check_poolsize(junk, size):
67 self.failUnlessEqual(len(kgs.key_generator.keypool), size)
70 for i in range(n_keys_to_waste):
71 d.addCallback(get_key)
72 d.addCallback(check_poolsize, kgs.key_generator.pool_size - n_keys_to_waste)
74 d.addCallback(p, 'checking a key works')
75 # check that a retrieved key is actually useful
76 d.addCallback(get_key)
77 def check_key_works(keys):
78 verifying_key, signing_key = keys
79 v = rsa.create_verifying_key_from_string(verifying_key)
80 s = rsa.create_signing_key_from_string(signing_key)
83 self.failUnless(v.verify(junk, sig))
84 d.addCallback(check_key_works)
86 d.addCallback(p, 'checking pool exhaustion')
88 for i in range(kgs.key_generator.pool_size):
89 d.addCallback(get_key)
90 d.addCallback(check_poolsize, 0)
92 # and check it still works (will gen key synchronously on demand)
93 d.addCallback(get_key)
94 d.addCallback(check_key_works)
96 d.addCallback(p, 'checking pool replenishment')
97 # check that the pool will refill
98 d.addCallback(lambda junk: self.poll(keypool_full))