This includes one fix (in test_web) which was testing the wrong thing.
def callRemoteOnly(self, methname, *args, **kwargs):
d = self.callRemote(methname, *args, **kwargs)
+ del d # explicitly ignored
return None
def callRemote(self, methname, *args, **kwargs):
def test_create(self):
ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
"my_version", "oldest_version")
+ self.failUnless(isinstance(ic, IntroducerClient))
def test_listen(self):
i = IntroducerService()
basedir = "test_node/test_secrets_dir"
fileutil.make_dirs(basedir)
n = TestNode(basedir)
+ self.failUnless(isinstance(n, TestNode))
self.failUnless(os.path.exists(os.path.join(basedir, "private")))
def test_secrets_dir_protected(self):
basedir = "test_node/test_secrets_dir_protected"
fileutil.make_dirs(basedir)
n = TestNode(basedir)
+ self.failUnless(isinstance(n, TestNode))
privdir = os.path.join(basedir, "private")
st = os.stat(privdir)
bits = stat.S_IMODE(st[stat.ST_MODE])
def test_oneshot(self):
ol = observer.OneShotObserverList()
rep = repr(ol)
+ self.failUnlessEqual(rep, "<OneShotObserverList [[]]>")
d1 = ol.when_fired()
d2 = ol.when_fired()
def _addmore(res):
d1.addCallback(_addmore)
ol.fire("result")
rep = repr(ol)
+ self.failUnlessEqual(rep, "<OneShotObserverList -> result>")
d4 = ol.when_fired()
dl = defer.DeferredList([d1,d2,d4])
return dl
#ctx = RequestContext()
#unfilled = pt.renderSynchronously(ctx)
lots_of_stan = pt.do_forms(self.getarg)
+ self.failUnlessEqual(type(lots_of_stan), list)
self.fields = {'filled': True,
"num_users": 50e3,
}
#filled = pt.renderSynchronously(ctx)
more_stan = pt.do_forms(self.getarg)
+ self.failUnlessEqual(type(more_stan), list)
# trigger the wraparound configuration
self.fields["num_servers"] = 5
log.msg("UPLOADING AGAIN")
up = upload.Data(DATA, convergence=convergence)
up.max_segment_size = 1024
- d1 = self.uploader.upload(up)
+ return self.uploader.upload(up)
d.addCallback(_upload_again)
def _download_to_data(res):
from twisted.trial import unittest
from allmydata import uri
-from allmydata.util import hashutil
+from allmydata.util import hashutil, base32
from allmydata.interfaces import IURI, IFileURI, IDirnodeURI, IMutableFileURI, \
IVerifierURI
self.failUnlessEqual(d["big_hash"], hashutil.tagged_hash("foo", "bar"))
readable = uri.unpack_extension_readable(ext)
+ self.failUnlessEqual(readable["needed_shares"], 3)
+ self.failUnlessEqual(readable["stuff"], "value")
+ self.failUnlessEqual(readable["size"], 12)
+ self.failUnlessEqual(readable["big_hash"],
+ base32.b2a(hashutil.tagged_hash("foo", "bar")))
+ self.failUnlessEqual(readable["UEB_hash"],
+ base32.b2a(hashutil.uri_extension_hash(ext)))
class Invalid(unittest.TestCase):
def test_from_future(self):
_failIfExists("a")
_failUnlessExists("b")
_failUnlessExists("c")
+ del b2
ctr = [0]
class EqButNotIs:
self.calls[1][0].callback("two-result")
self.calls[2][0].errback(ValueError("three-error"))
+ del d1,d2,d3,d4
+
class SampleError(Exception):
pass
if u["type"] == "file" and u["path"] == [u"good"]][0]
self.failUnlessEqual(ugood["cap"], self.uris["good"])
ugoodcrr = ugood["check-and-repair-results"]
- self.failUnlessEqual(u0crr["repair-attempted"], False)
- self.failUnlessEqual(u0crr["pre-repair-results"]["results"]["count-shares-good"], 10)
+ self.failUnlessEqual(ugoodcrr["repair-attempted"], False)
+ self.failUnlessEqual(ugoodcrr["pre-repair-results"]["results"]["count-shares-good"], 10)
usick = [u for u in units
if u["type"] == "file" and u["path"] == [u"sick"]][0]