d.addBoth(self.cleanup)
return d
+ @defer.inlineCallbacks
+ def test_delete(self):
+ self.set_up_grid()
+ self.local_dir = os.path.join(self.basedir, u"local_dir")
+ self.mkdir_nonascii(self.local_dir)
+
+ yield self.create_invite_join_magic_folder(u"Alice\u0101", self.local_dir)
+ yield self._restart_client(None)
+
+ try:
+ # create a file
+ up_proc = self.magicfolder.uploader.set_hook('processed')
+ down_proc = self.magicfolder.downloader.set_hook('processed')
+ path = os.path.join(self.local_dir, u'foo')
+ with open(path, 'w') as f:
+ f.write('foo\n')
+ self.notify(to_filepath(path), self.inotify.IN_CLOSE_WRITE)
+ x = yield up_proc
+ self.assertIs(x, None) # some things "return" an Exception instead of raising it
+ self.assertTrue(os.path.exists(path))
+
+ # the real test part: delete the file
+ proc = self.magicfolder.uploader.set_hook('processed')
+ os.unlink(path)
+ self.notify(to_filepath(path), self.inotify.IN_DELETE)
+ x = yield proc
+ self.assertIs(x, None)
+ self.assertFalse(os.path.exists(path))
+
+ # ensure we still have a DB entry, and that the version is 1
+ node, metadata = yield self.magicfolder.downloader._get_collective_latest_file(u'foo')
+ self.assertTrue(node is not None, "Failed to find '{}' in DMD".format(path))
+ self.failUnlessEqual(metadata['version'], 1)
+
+ finally:
+ yield self.cleanup(None)
+
def test_magic_folder(self):
self.set_up_grid()
self.local_dir = os.path.join(self.basedir, self.unicode_or_fallback(u"loc\u0101l_dir", u"local_dir"))