3 from twisted.trial import unittest
4 from twisted.application import service
6 from foolscap import Tub, eventual
8 from allmydata import key_generator
9 from allmydata.util import pollmixin
10 from pycryptopp.publickey import rsa
12 def flush_but_dont_ignore(res):
13 d = eventual.flushEventualQueue()
19 class KeyGenService(unittest.TestCase, pollmixin.PollMixin):
21 self.parent = service.MultiService()
22 self.parent.startService()
25 t.setServiceParent(self.parent)
27 t.setLocationAutomatically()
28 return eventual.fireEventually()
31 d = self.parent.stopService()
32 d.addCallback(eventual.fireEventually)
33 d.addBoth(flush_but_dont_ignore)
36 def test_key_gen_service(self):
39 #print time.asctime(), msg
42 #print 'starting key generator service'
44 kgs = key_generator.KeyGeneratorService(display_furl=False, default_key_size=keysize)
45 kgs.key_generator.verbose = True
46 kgs.setServiceParent(self.parent)
47 kgs.key_generator.pool_size = 8
50 return len(kgs.key_generator.keypool) == kgs.key_generator.pool_size
52 # first wait for key gen pool to fill up
53 d = eventual.fireEventually()
54 d.addCallback(p, 'waiting for pool to fill up')
55 d.addCallback(lambda junk: self.poll(keypool_full))
57 d.addCallback(p, 'grabbing a few keys')
58 # grab a few keys, check that pool size shrinks
59 def get_key(junk=None):
60 d = self.tub.getReference(kgs.keygen_furl)
61 d.addCallback(lambda kg: kg.callRemote('get_rsa_key_pair', keysize))
64 def check_poolsize(junk, size):
65 self.failUnlessEqual(len(kgs.key_generator.keypool), size)
68 for i in range(n_keys_to_waste):
69 d.addCallback(get_key)
70 d.addCallback(check_poolsize, kgs.key_generator.pool_size - n_keys_to_waste)
72 d.addCallback(p, 'checking a key works')
73 # check that a retrieved key is actually useful
74 d.addCallback(get_key)
75 def check_key_works(keys):
76 verifying_key, signing_key = keys
77 v = rsa.create_verifying_key_from_string(verifying_key)
78 s = rsa.create_signing_key_from_string(signing_key)
81 self.failUnless(v.verify(junk, sig))
82 d.addCallback(check_key_works)
84 d.addCallback(p, 'checking pool exhaustion')
86 for i in range(kgs.key_generator.pool_size):
87 d.addCallback(get_key)
88 d.addCallback(check_poolsize, 0)
90 # and check it still works (will gen key synchronously on demand)
91 d.addCallback(get_key)
92 d.addCallback(check_key_works)
94 d.addCallback(p, 'checking pool replenishment')
95 # check that the pool will refill
96 d.addCallback(lambda junk: self.poll(keypool_full))