self.failUnless("node.uptime" in stats)
self.failUnless(isinstance(stats["node.uptime"], float))
+ @mock.patch('allmydata.util.log.msg')
+ @mock.patch('allmydata.frontends.drop_upload.DropUploader')
+ def test_create_drop_uploader(self, mock_drop_uploader, mock_log_msg):
+ class MockDropUploader(service.MultiService):
+ name = 'drop-upload'
+
+ def __init__(self, client, upload_dircap, local_dir_utf8, inotify=None):
+ service.MultiService.__init__(self)
+ self.client = client
+ self.upload_dircap = upload_dircap
+ self.local_dir_utf8 = local_dir_utf8
+ self.inotify = inotify
+
+ mock_drop_uploader.side_effect = MockDropUploader
+
+ upload_dircap = "URI:DIR2:blah"
+ local_dir_utf8 = u"loc\u0101l_dir".encode('utf-8')
+ config = (BASECONFIG +
+ "[storage]\n" +
+ "enabled = false\n" +
+ "[drop_upload]\n" +
+ "enabled = true\n" +
+ "upload.dircap = " + upload_dircap + "\n" +
+ "local.directory = " + local_dir_utf8 + "\n")
+
+ basedir1 = "test_client.Basic.test_create_drop_uploader1"
+ os.mkdir(basedir1)
+ fileutil.write(os.path.join(basedir1, "tahoe.cfg"), config)
+ c1 = client.Client(basedir1)
+ uploader = c1.getServiceNamed('drop-upload')
+ self.failUnless(isinstance(uploader, MockDropUploader), uploader)
+ self.failUnlessReallyEqual(uploader.client, c1)
+ self.failUnlessReallyEqual(uploader.upload_dircap, upload_dircap)
+ self.failUnlessReallyEqual(uploader.local_dir_utf8, local_dir_utf8)
+ self.failUnless(uploader.inotify is None, uploader.inotify)
+ self.failUnless(uploader.running)
+
+ class Boom(Exception):
+ pass
+ mock_drop_uploader.side_effect = Boom("something wrong")
+
+ basedir2 = "test_client.Basic.test_create_drop_uploader2"
+ os.mkdir(basedir2)
+ fileutil.write(os.path.join(basedir2, "tahoe.cfg"),
+ BASECONFIG +
+ "[drop_upload]\n" +
+ "enabled = true\n")
+ c2 = client.Client(basedir2)
+ self.failUnlessRaises(KeyError, c2.getServiceNamed, 'drop-upload')
+ self.failIf([True for arg in mock_log_msg.call_args_list if "Boom" in repr(arg) or "something wrong" in repr(arg)],
+ mock_log_msg.call_args_list)
+ self.failUnless([True for arg in mock_log_msg.call_args_list if "upload.dircap or local.directory not specified" in repr(arg)],
+ mock_log_msg.call_args_list)
+
+ basedir3 = "test_client.Basic.test_create_drop_uploader3"
+ os.mkdir(basedir3)
+ fileutil.write(os.path.join(basedir3, "tahoe.cfg"), config)
+ c3 = client.Client(basedir3)
+ self.failUnless([True for arg in mock_log_msg.call_args_list if "Boom" in repr(arg) and "something wrong" in repr(arg)],
+ mock_log_msg.call_args_list)
+
+
def flush_but_dont_ignore(res):
d = flushEventualQueue()
def _done(ignored):