def _get_count(self, name):
return self.stats_provider.get_stats()["counters"].get(name, 0)
+
+ def create(self, dbfile):
+ bdb = backupdb.get_backupdb(dbfile)
+ self.failUnless(bdb, "unable to create backupdb from %r" % (dbfile,))
+ return bdb
+
+ def _test_db_basic(self):
+ fileutil.make_dirs(self.basedir)
+ dbfile = os.path.join(self.basedir, "dbfile")
+ bdb = self.create(dbfile)
+
+ def _test_uploader_start_service(self):
+ self.uploader = None
+ self.set_up_grid()
+ self.client = self.g.clients[0]
+
+ d = self.client.create_dirnode()
+ def _made_upload_dir(n):
+ self.failUnless(IDirectoryNode.providedBy(n))
+ self.upload_dirnode = n
+ self.upload_dircap = n.get_uri()
+ self.uploader = DropUploader(self.client, self.upload_dircap, self.basedir.encode('utf-8'),
+ "magicfolderdb.sqlite", inotify=self.inotify)
+ self.uploader.startService()
+ self.failUnlessEqual(self.uploader._db.VERSION, 2)
+ d.addCallback(_made_upload_dir)
+
+ # Prevent unclean reactor errors.
+ def _cleanup(res):
+ d = defer.succeed(None)
+ if self.uploader is not None:
+ d.addCallback(lambda ign: self.uploader.finish(for_tests=True))
+ d.addCallback(lambda ign: res)
+ return d
+ d.addBoth(_cleanup)
+ return d
+
def _test(self):
self.uploader = None
self.set_up_grid()
readonly_dircap = n.get_readonly_uri()
self.shouldFail(AssertionError, 'invalid local.directory', 'could not be represented',
- DropUploader, client, upload_dircap, '\xFF', inotify=fake_inotify)
+ DropUploader, client, upload_dircap, '\xFF', 'magicfolderdb', inotify=fake_inotify)
self.shouldFail(AssertionError, 'nonexistent local.directory', 'there is no directory',
- DropUploader, client, upload_dircap, os.path.join(self.basedir, "Laputa"), inotify=fake_inotify)
+ DropUploader, client, upload_dircap, os.path.join(self.basedir, "Laputa"), 'magicfolderdb', inotify=fake_inotify)
self.shouldFail(AssertionError, 'non-directory local.directory', 'is not a directory',
- DropUploader, client, upload_dircap, not_a_dir, inotify=fake_inotify)
+ DropUploader, client, upload_dircap, not_a_dir, 'magicfolderdb', inotify=fake_inotify)
self.shouldFail(AssertionError, 'bad upload.dircap', 'does not refer to a directory',
- DropUploader, client, 'bad', errors_dir, inotify=fake_inotify)
+ DropUploader, client, 'bad', errors_dir, 'magicfolderdb', inotify=fake_inotify)
self.shouldFail(AssertionError, 'non-directory upload.dircap', 'does not refer to a directory',
- DropUploader, client, 'URI:LIT:foo', errors_dir, inotify=fake_inotify)
+ DropUploader, client, 'URI:LIT:foo', errors_dir, 'magicfolderdb', inotify=fake_inotify)
self.shouldFail(AssertionError, 'readonly upload.dircap', 'is not a writecap to a directory',
- DropUploader, client, readonly_dircap, errors_dir, inotify=fake_inotify)
+ DropUploader, client, readonly_dircap, errors_dir, 'magicfolderdb', inotify=fake_inotify)
d.addCallback(_made_upload_dir)
return d
self.basedir = "drop_upload.MockTest.test_drop_upload"
return self._test()
+ def test_basic_db(self):
+ self.inotify = fake_inotify
+ self.basedir = "drop_upload.MockTest.test_basic_db"
+ return self._test_db_basic()
+
+ def test_uploader_start_service(self):
+ self.inotify = fake_inotify
+ self.basedir = "drop_upload.MockTest._test_uploader_start_service"
+ return self._test_uploader_start_service()
+
def notify_close_write(self, path):
self.uploader._notifier.event(path, self.inotify.IN_CLOSE_WRITE)
class RealTest(DropUploadTestMixin, unittest.TestCase):
"""This is skipped unless both Twisted and the platform support inotify."""
- def create(self, dbfile):
- bdb = backupdb.get_backupdb(dbfile)
- self.failUnless(bdb, "unable to create backupdb from %r" % (dbfile,))
- return bdb
-
- def test_db_basic(self):
- self.basedir = basedir = os.path.join("dropupload", "basic")
- fileutil.make_dirs(basedir)
- dbfile = os.path.join(basedir, "dbfile")
- bdb = self.create(dbfile)
-
- def test_uploader_startService(self):
- self.uploader = None
- self.inotify = None # use the appropriate inotify for the platform
- self.basedir = "drop_upload.RealTest.test_uploader_startService"
- self.set_up_grid()
- self.client = self.g.clients[0]
-
- d = self.client.create_dirnode()
- def _made_upload_dir(n):
- self.failUnless(IDirectoryNode.providedBy(n))
- self.upload_dirnode = n
- self.upload_dircap = n.get_uri()
- self.uploader = DropUploader(self.client, self.upload_dircap, self.basedir.encode('utf-8'),
- "magicfolderdb.sqlite", inotify=self.inotify)
- self.uploader.startService()
- self.failUnlessEqual(self.uploader._db.VERSION, 2)
- d.addCallback(_made_upload_dir)
-
- # Prevent unclean reactor errors.
- def _cleanup(res):
- d = defer.succeed(None)
- if self.uploader is not None:
- d.addCallback(lambda ign: self.uploader.finish(for_tests=True))
- d.addCallback(lambda ign: res)
- return d
- d.addBoth(_cleanup)
- return d
def test_drop_upload(self):
# We should always have runtime.platform.supportsINotify, because we're using
def notify_close_write(self, path):
# Writing to the file causes the notification.
pass
+
+ def test_basic_db(self):
+ self.inotify = None
+ self.basedir = "drop_upload.RealTest.test_basic_db"
+ return self._test_db_basic()
+
+ def test_uploader_start_service(self):
+ self.inotify = None
+ self.basedir = "drop_upload.RealTest._test_uploader_start_service"
+ return self._test_uploader_start_service()