3 from twisted.trial import unittest
4 from twisted.application import service
6 from foolscap.api import Tub, fireEventually, flushEventualQueue
8 from allmydata import key_generator
9 from allmydata.util import pollmixin
10 from pycryptopp.publickey import rsa
12 def flush_but_dont_ignore(res):
13 d = flushEventualQueue()
19 class KeyGenService(unittest.TestCase, pollmixin.PollMixin):
21 self.parent = service.MultiService()
22 self.parent.startService()
25 t.setOption("expose-remote-exception-types", False)
26 t.setServiceParent(self.parent)
28 t.setLocationAutomatically()
29 return fireEventually()
32 d = self.parent.stopService()
33 d.addCallback(fireEventually)
34 d.addBoth(flush_but_dont_ignore)
37 def test_key_gen_service(self):
40 #print time.asctime(), msg
43 #print 'starting key generator service'
45 kgs = key_generator.KeyGeneratorService(display_furl=False, default_key_size=keysize)
46 kgs.key_generator.verbose = True
47 kgs.setServiceParent(self.parent)
48 kgs.key_generator.pool_size = 8
51 return len(kgs.key_generator.keypool) == kgs.key_generator.pool_size
53 # first wait for key gen pool to fill up
55 d.addCallback(p, 'waiting for pool to fill up')
56 d.addCallback(lambda junk: self.poll(keypool_full))
58 d.addCallback(p, 'grabbing a few keys')
59 # grab a few keys, check that pool size shrinks
60 def get_key(junk=None):
61 d = self.tub.getReference(kgs.keygen_furl)
62 d.addCallback(lambda kg: kg.callRemote('get_rsa_key_pair', keysize))
65 def check_poolsize(junk, size):
66 self.failUnlessEqual(len(kgs.key_generator.keypool), size)
69 for i in range(n_keys_to_waste):
70 d.addCallback(get_key)
71 d.addCallback(check_poolsize, kgs.key_generator.pool_size - n_keys_to_waste)
73 d.addCallback(p, 'checking a key works')
74 # check that a retrieved key is actually useful
75 d.addCallback(get_key)
76 def check_key_works(keys):
77 verifying_key, signing_key = keys
78 v = rsa.create_verifying_key_from_string(verifying_key)
79 s = rsa.create_signing_key_from_string(signing_key)
82 self.failUnless(v.verify(junk, sig))
83 d.addCallback(check_key_works)
85 d.addCallback(p, 'checking pool exhaustion')
87 for i in range(kgs.key_generator.pool_size):
88 d.addCallback(get_key)
89 d.addCallback(check_poolsize, 0)
91 # and check it still works (will gen key synchronously on demand)
92 d.addCallback(get_key)
93 d.addCallback(check_key_works)
95 d.addCallback(p, 'checking pool replenishment')
96 # check that the pool will refill
97 d.addCallback(lambda junk: self.poll(keypool_full))