+++ /dev/null
-
-import sys
-
-from twisted.internet import defer
-from twisted.python.filepath import FilePath
-from twisted.application import service
-from foolscap.api import eventually
-
-from allmydata.interfaces import IDirectoryNode
-
-from allmydata.util.fileutil import abspath_expanduser_unicode, precondition_abspath
-from allmydata.util.encodingutil import listdir_unicode, to_filepath, \
- unicode_from_filepath, quote_local_unicode_path, FilenameEncodingError
-from allmydata.immutable.upload import FileName
-from allmydata import backupdb
-
-
-
-class MagicFolder(service.MultiService):
- name = 'magic-folder'
-
- def __init__(self, client, upload_dircap, local_dir, dbfile, inotify=None,
- pending_delay=1.0):
- precondition_abspath(local_dir)
-
- service.MultiService.__init__(self)
- self._local_dir = abspath_expanduser_unicode(local_dir)
- self._client = client
- self._stats_provider = client.stats_provider
- self._convergence = client.convergence
- self._local_path = to_filepath(self._local_dir)
- self._dbfile = dbfile
-
- self.is_upload_ready = False
-
- if inotify is None:
- if sys.platform == "win32":
- from allmydata.windows import inotify
- else:
- from twisted.internet import inotify
- self._inotify = inotify
-
- if not self._local_path.exists():
- raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
- "but there is no directory at that location."
- % quote_local_unicode_path(local_dir))
- if not self._local_path.isdir():
- raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
- "but the thing at that location is not a directory."
- % quote_local_unicode_path(local_dir))
-
- # TODO: allow a path rather than a cap URI.
- self._parent = self._client.create_node_from_uri(upload_dircap)
- if not IDirectoryNode.providedBy(self._parent):
- raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.")
- if self._parent.is_unknown() or self._parent.is_readonly():
- raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.")
-
- self._uploaded_callback = lambda ign: None
-
- self._notifier = inotify.INotify()
- if hasattr(self._notifier, 'set_pending_delay'):
- self._notifier.set_pending_delay(pending_delay)
-
- # We don't watch for IN_CREATE, because that would cause us to read and upload a
- # possibly-incomplete file before the application has closed it. There should always
- # be an IN_CLOSE_WRITE after an IN_CREATE (I think).
- # TODO: what about IN_MOVE_SELF or IN_UNMOUNT?
- mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO | inotify.IN_ONLYDIR
- self._notifier.watch(self._local_path, mask=mask, callbacks=[self._notify])
-
- def _check_db_file(self, childpath):
- # returns True if the file must be uploaded.
- assert self._db != None
- r = self._db.check_file(childpath)
- filecap = r.was_uploaded()
- if filecap is False:
- return True
-
- def startService(self):
- self._db = backupdb.get_backupdb(self._dbfile)
- if self._db is None:
- return Failure(Exception('ERROR: Unable to load magic folder db.'))
-
- service.MultiService.startService(self)
- d = self._notifier.startReading()
- self._stats_provider.count('drop_upload.dirs_monitored', 1)
- return d
-
- def upload_ready(self):
- """upload_ready is used to signal us to start
- processing the upload items...
- """
- self.is_upload_ready = True
-
- def _notify(self, opaque, path, events_mask):
- self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
-
- self._stats_provider.count('drop_upload.files_queued', 1)
- eventually(self._process, opaque, path, events_mask)
-
- def _process(self, opaque, path, events_mask):
- d = defer.succeed(None)
-
- # FIXME: if this already exists as a mutable file, we replace the directory entry,
- # but we should probably modify the file (as the SFTP frontend does).
- def _add_file(ign):
- name = path.basename()
- # on Windows the name is already Unicode
- if not isinstance(name, unicode):
- name = name.decode(get_filesystem_encoding())
-
- u = FileName(path.path, self._convergence)
- return self._parent.add_file(name, u)
- d.addCallback(_add_file)
-
- def _succeeded(ign):
- self._stats_provider.count('drop_upload.files_queued', -1)
- self._stats_provider.count('drop_upload.files_uploaded', 1)
- def _failed(f):
- self._stats_provider.count('drop_upload.files_queued', -1)
- if path.exists():
- self._log("drop-upload: %r failed to upload due to %r" % (path.path, f))
- self._stats_provider.count('drop_upload.files_failed', 1)
- return f
- else:
- self._log("drop-upload: notified file %r disappeared "
- "(this is normal for temporary files): %r" % (path.path, f))
- self._stats_provider.count('drop_upload.files_disappeared', 1)
- return None
- d.addCallbacks(_succeeded, _failed)
- d.addBoth(self._uploaded_callback)
- return d
-
- def set_uploaded_callback(self, callback):
- """This sets a function that will be called after a file has been uploaded."""
- self._uploaded_callback = callback
-
- def finish(self, for_tests=False):
- self._notifier.stopReading()
- self._stats_provider.count('drop_upload.dirs_monitored', -1)
- if for_tests and hasattr(self._notifier, 'wait_until_stopped'):
- return self._notifier.wait_until_stopped()
- else:
- return defer.succeed(None)
-
- def _log(self, msg):
- self._client.log(msg)
- #open("events", "ab+").write(msg)
--- /dev/null
+
+import sys
+
+from twisted.internet import defer
+from twisted.python.filepath import FilePath
+from twisted.application import service
+from foolscap.api import eventually
+
+from allmydata.interfaces import IDirectoryNode
+
+from allmydata.util.fileutil import abspath_expanduser_unicode, precondition_abspath
+from allmydata.util.encodingutil import listdir_unicode, to_filepath, \
+ unicode_from_filepath, quote_local_unicode_path, FilenameEncodingError
+from allmydata.immutable.upload import FileName
+from allmydata import backupdb
+
+
+
+class MagicFolder(service.MultiService):
+ name = 'magic-folder'
+
+ def __init__(self, client, upload_dircap, local_dir, dbfile, inotify=None,
+ pending_delay=1.0):
+ precondition_abspath(local_dir)
+
+ service.MultiService.__init__(self)
+ self._local_dir = abspath_expanduser_unicode(local_dir)
+ self._client = client
+ self._stats_provider = client.stats_provider
+ self._convergence = client.convergence
+ self._local_path = to_filepath(self._local_dir)
+ self._dbfile = dbfile
+
+ self.is_upload_ready = False
+
+ if inotify is None:
+ if sys.platform == "win32":
+ from allmydata.windows import inotify
+ else:
+ from twisted.internet import inotify
+ self._inotify = inotify
+
+ if not self._local_path.exists():
+ raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
+ "but there is no directory at that location."
+ % quote_local_unicode_path(local_dir))
+ if not self._local_path.isdir():
+ raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
+ "but the thing at that location is not a directory."
+ % quote_local_unicode_path(local_dir))
+
+ # TODO: allow a path rather than a cap URI.
+ self._parent = self._client.create_node_from_uri(upload_dircap)
+ if not IDirectoryNode.providedBy(self._parent):
+ raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.")
+ if self._parent.is_unknown() or self._parent.is_readonly():
+ raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.")
+
+ self._uploaded_callback = lambda ign: None
+
+ self._notifier = inotify.INotify()
+ if hasattr(self._notifier, 'set_pending_delay'):
+ self._notifier.set_pending_delay(pending_delay)
+
+ # We don't watch for IN_CREATE, because that would cause us to read and upload a
+ # possibly-incomplete file before the application has closed it. There should always
+ # be an IN_CLOSE_WRITE after an IN_CREATE (I think).
+ # TODO: what about IN_MOVE_SELF or IN_UNMOUNT?
+ mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO | inotify.IN_ONLYDIR
+ self._notifier.watch(self._local_path, mask=mask, callbacks=[self._notify])
+
+ def _check_db_file(self, childpath):
+ # returns True if the file must be uploaded.
+ assert self._db != None
+ r = self._db.check_file(childpath)
+ filecap = r.was_uploaded()
+ if filecap is False:
+ return True
+
+ def startService(self):
+ self._db = backupdb.get_backupdb(self._dbfile)
+ if self._db is None:
+ return Failure(Exception('ERROR: Unable to load magic folder db.'))
+
+ service.MultiService.startService(self)
+ d = self._notifier.startReading()
+ self._stats_provider.count('drop_upload.dirs_monitored', 1)
+ return d
+
+ def upload_ready(self):
+ """upload_ready is used to signal us to start
+ processing the upload items...
+ """
+ self.is_upload_ready = True
+
+ def _notify(self, opaque, path, events_mask):
+ self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
+
+ self._stats_provider.count('drop_upload.files_queued', 1)
+ eventually(self._process, opaque, path, events_mask)
+
+ def _process(self, opaque, path, events_mask):
+ d = defer.succeed(None)
+
+ # FIXME: if this already exists as a mutable file, we replace the directory entry,
+ # but we should probably modify the file (as the SFTP frontend does).
+ def _add_file(ign):
+ name = path.basename()
+ # on Windows the name is already Unicode
+ if not isinstance(name, unicode):
+ name = name.decode(get_filesystem_encoding())
+
+ u = FileName(path.path, self._convergence)
+ return self._parent.add_file(name, u)
+ d.addCallback(_add_file)
+
+ def _succeeded(ign):
+ self._stats_provider.count('drop_upload.files_queued', -1)
+ self._stats_provider.count('drop_upload.files_uploaded', 1)
+ def _failed(f):
+ self._stats_provider.count('drop_upload.files_queued', -1)
+ if path.exists():
+ self._log("drop-upload: %r failed to upload due to %r" % (path.path, f))
+ self._stats_provider.count('drop_upload.files_failed', 1)
+ return f
+ else:
+ self._log("drop-upload: notified file %r disappeared "
+ "(this is normal for temporary files): %r" % (path.path, f))
+ self._stats_provider.count('drop_upload.files_disappeared', 1)
+ return None
+ d.addCallbacks(_succeeded, _failed)
+ d.addBoth(self._uploaded_callback)
+ return d
+
+ def set_uploaded_callback(self, callback):
+ """This sets a function that will be called after a file has been uploaded."""
+ self._uploaded_callback = callback
+
+ def finish(self, for_tests=False):
+ self._notifier.stopReading()
+ self._stats_provider.count('drop_upload.dirs_monitored', -1)
+ if for_tests and hasattr(self._notifier, 'wait_until_stopped'):
+ return self._notifier.wait_until_stopped()
+ else:
+ return defer.succeed(None)
+
+ def _log(self, msg):
+ self._client.log(msg)
+ #open("events", "ab+").write(msg)
+++ /dev/null
-
-import os, sys
-
-from twisted.trial import unittest
-from twisted.python import runtime
-from twisted.internet import defer
-
-from allmydata.interfaces import IDirectoryNode, NoSuchChildError
-
-from allmydata.util import fake_inotify, fileutil
-from allmydata.util.encodingutil import get_filesystem_encoding, to_filepath
-from allmydata.util.consumer import download_to_data
-from allmydata.test.no_network import GridTestMixin
-from allmydata.test.common_util import ReallyEqualMixin, NonASCIIPathMixin
-from allmydata.test.common import ShouldFailMixin
-
-from allmydata.frontends.magic_folder import MagicFolder
-from allmydata.util.fileutil import abspath_expanduser_unicode
-
-
-class MagicFolderTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin, NonASCIIPathMixin):
- """
- These tests will be run both with a mock notifier, and (on platforms that support it)
- with the real INotify.
- """
-
- def setUp(self):
- GridTestMixin.setUp(self)
- temp = self.mktemp()
- self.basedir = abspath_expanduser_unicode(temp.decode(get_filesystem_encoding()))
- def _get_count(self, name):
- return self.stats_provider.get_stats()["counters"].get(name, 0)
-
- def _test(self):
- self.uploader = None
- self.set_up_grid()
- self.local_dir = os.path.join(self.basedir, self.unicode_or_fallback(u"loc\u0101l_dir", u"local_dir"))
- self.mkdir_nonascii(self.local_dir)
-
- self.client = self.g.clients[0]
- self.stats_provider = self.client.stats_provider
-
- d = self.client.create_dirnode()
- def _made_upload_dir(n):
- self.failUnless(IDirectoryNode.providedBy(n))
- self.upload_dirnode = n
- self.upload_dircap = n.get_uri()
- self.uploader = DropUploader(self.client, self.upload_dircap, self.local_dir.encode('utf-8'),
- inotify=self.inotify)
- return self.uploader.startService()
- d.addCallback(_made_upload_dir)
-
- # Write something short enough for a LIT file.
- d.addCallback(lambda ign: self._test_file(u"short", "test"))
-
- # Write to the same file again with different data.
- d.addCallback(lambda ign: self._test_file(u"short", "different"))
-
- # Test that temporary files are not uploaded.
- d.addCallback(lambda ign: self._test_file(u"tempfile", "test", temporary=True))
-
- # Test that we tolerate creation of a subdirectory.
- d.addCallback(lambda ign: os.mkdir(os.path.join(self.local_dir, u"directory")))
-
- # Write something longer, and also try to test a Unicode name if the fs can represent it.
- name_u = self.unicode_or_fallback(u"l\u00F8ng", u"long")
- d.addCallback(lambda ign: self._test_file(name_u, "test"*100))
-
- # TODO: test that causes an upload failure.
- d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_failed'), 0))
-
- # Prevent unclean reactor errors.
- def _cleanup(res):
- d = defer.succeed(None)
- if self.uploader is not None:
- d.addCallback(lambda ign: self.uploader.finish(for_tests=True))
- d.addCallback(lambda ign: res)
- return d
- d.addBoth(_cleanup)
- return d
-
- def _test_file(self, name_u, data, temporary=False):
- previously_uploaded = self._get_count('drop_upload.files_uploaded')
- previously_disappeared = self._get_count('drop_upload.files_disappeared')
-
- d = defer.Deferred()
-
- # Note: this relies on the fact that we only get one IN_CLOSE_WRITE notification per file
- # (otherwise we would get a defer.AlreadyCalledError). Should we be relying on that?
- self.uploader.set_uploaded_callback(d.callback)
-
- path_u = abspath_expanduser_unicode(name_u, base=self.local_dir)
- path = to_filepath(path_u)
-
- # We don't use FilePath.setContent() here because it creates a temporary file that
- # is renamed into place, which causes events that the test is not expecting.
- f = open(path_u, "wb")
- try:
- if temporary and sys.platform != "win32":
- os.unlink(path_u)
- f.write(data)
- finally:
- f.close()
- if temporary and sys.platform == "win32":
- os.unlink(path_u)
- fileutil.flush_volume(path_u)
- self.notify_close_write(path)
-
- if temporary:
- d.addCallback(lambda ign: self.shouldFail(NoSuchChildError, 'temp file not uploaded', None,
- self.upload_dirnode.get, name_u))
- d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_disappeared'),
- previously_disappeared + 1))
- else:
- d.addCallback(lambda ign: self.upload_dirnode.get(name_u))
- d.addCallback(download_to_data)
- d.addCallback(lambda actual_data: self.failUnlessReallyEqual(actual_data, data))
- d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_uploaded'),
- previously_uploaded + 1))
-
- d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_queued'), 0))
- return d
-
-
-class MockTest(MagicFolderTestMixin, unittest.TestCase):
- """This can run on any platform, and even if twisted.internet.inotify can't be imported."""
-
- def test_errors(self):
- self.set_up_grid()
-
- errors_dir = abspath_expanduser_unicode(u"errors_dir", base=self.basedir)
- os.mkdir(errors_dir)
- not_a_dir = abspath_expanduser_unicode(u"NOT_A_DIR", base=self.basedir)
- fileutil.write(not_a_dir, "")
- magicfolderdb = abspath_expanduser_unicode(u"magicfolderdb", base=self.basedir)
- doesnotexist = abspath_expanduser_unicode(u"doesnotexist", base=self.basedir)
-
- client = self.g.clients[0]
- d = client.create_dirnode()
- def _made_upload_dir(n):
- self.failUnless(IDirectoryNode.providedBy(n))
- upload_dircap = n.get_uri()
- readonly_dircap = n.get_readonly_uri()
-
- self.shouldFail(AssertionError, 'nonexistent local.directory', 'there is no directory',
- MagicFolder, client, upload_dircap, doesnotexist, inotify=fake_inotify)
- self.shouldFail(AssertionError, 'non-directory local.directory', 'is not a directory',
- MagicFolder, client, upload_dircap, not_a_dir, inotify=fake_inotify)
- self.shouldFail(AssertionError, 'bad upload.dircap', 'does not refer to a directory',
- MagicFolder, client, 'bad', errors_dir, inotify=fake_inotify)
- self.shouldFail(AssertionError, 'non-directory upload.dircap', 'does not refer to a directory',
- MagicFolder, client, 'URI:LIT:foo', errors_dir, inotify=fake_inotify)
- self.shouldFail(AssertionError, 'readonly upload.dircap', 'is not a writecap to a directory',
- MagicFolder, client, readonly_dircap, errors_dir, inotify=fake_inotify)
- d.addCallback(_made_upload_dir)
- return d
-
- def test_drop_upload(self):
- self.inotify = fake_inotify
- self.basedir = "drop_upload.MockTest.test_drop_upload"
- return self._test()
-
- def notify_close_write(self, path):
- self.uploader._notifier.event(path, self.inotify.IN_CLOSE_WRITE)
-
-
-class RealTest(MagicFolderTestMixin, unittest.TestCase):
- """This is skipped unless both Twisted and the platform support inotify."""
-
- def test_drop_upload(self):
- self.inotify = None # use the appropriate inotify for the platform
- self.basedir = "drop_upload.RealTest.test_drop_upload"
- return self._test()
-
- def notify_close_write(self, path):
- # Writing to the file causes the notification.
- pass
-
-if sys.platform != "win32" and not runtime.platform.supportsINotify():
- RealTest.skip = "Magic Folder support can only be tested for-real on an OS that supports inotify or equivalent."
--- /dev/null
+
+import os, sys
+
+from twisted.trial import unittest
+from twisted.python import runtime
+from twisted.internet import defer
+
+from allmydata.interfaces import IDirectoryNode, NoSuchChildError
+
+from allmydata.util import fake_inotify, fileutil
+from allmydata.util.encodingutil import get_filesystem_encoding, to_filepath
+from allmydata.util.consumer import download_to_data
+from allmydata.test.no_network import GridTestMixin
+from allmydata.test.common_util import ReallyEqualMixin, NonASCIIPathMixin
+from allmydata.test.common import ShouldFailMixin
+
+from allmydata.frontends.magic_folder import MagicFolder
+from allmydata.util.fileutil import abspath_expanduser_unicode
+
+
+class MagicFolderTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin, NonASCIIPathMixin):
+ """
+ These tests will be run both with a mock notifier, and (on platforms that support it)
+ with the real INotify.
+ """
+
+ def setUp(self):
+ GridTestMixin.setUp(self)
+ temp = self.mktemp()
+ self.basedir = abspath_expanduser_unicode(temp.decode(get_filesystem_encoding()))
+ def _get_count(self, name):
+ return self.stats_provider.get_stats()["counters"].get(name, 0)
+
+ def _test(self):
+ self.uploader = None
+ self.set_up_grid()
+ self.local_dir = os.path.join(self.basedir, self.unicode_or_fallback(u"loc\u0101l_dir", u"local_dir"))
+ self.mkdir_nonascii(self.local_dir)
+
+ self.client = self.g.clients[0]
+ self.stats_provider = self.client.stats_provider
+
+ d = self.client.create_dirnode()
+ def _made_upload_dir(n):
+ self.failUnless(IDirectoryNode.providedBy(n))
+ self.upload_dirnode = n
+ self.upload_dircap = n.get_uri()
+ self.uploader = DropUploader(self.client, self.upload_dircap, self.local_dir.encode('utf-8'),
+ inotify=self.inotify)
+ return self.uploader.startService()
+ d.addCallback(_made_upload_dir)
+
+ # Write something short enough for a LIT file.
+ d.addCallback(lambda ign: self._test_file(u"short", "test"))
+
+ # Write to the same file again with different data.
+ d.addCallback(lambda ign: self._test_file(u"short", "different"))
+
+ # Test that temporary files are not uploaded.
+ d.addCallback(lambda ign: self._test_file(u"tempfile", "test", temporary=True))
+
+ # Test that we tolerate creation of a subdirectory.
+ d.addCallback(lambda ign: os.mkdir(os.path.join(self.local_dir, u"directory")))
+
+ # Write something longer, and also try to test a Unicode name if the fs can represent it.
+ name_u = self.unicode_or_fallback(u"l\u00F8ng", u"long")
+ d.addCallback(lambda ign: self._test_file(name_u, "test"*100))
+
+ # TODO: test that causes an upload failure.
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_failed'), 0))
+
+ # Prevent unclean reactor errors.
+ def _cleanup(res):
+ d = defer.succeed(None)
+ if self.uploader is not None:
+ d.addCallback(lambda ign: self.uploader.finish(for_tests=True))
+ d.addCallback(lambda ign: res)
+ return d
+ d.addBoth(_cleanup)
+ return d
+
+ def _test_file(self, name_u, data, temporary=False):
+ previously_uploaded = self._get_count('drop_upload.files_uploaded')
+ previously_disappeared = self._get_count('drop_upload.files_disappeared')
+
+ d = defer.Deferred()
+
+ # Note: this relies on the fact that we only get one IN_CLOSE_WRITE notification per file
+ # (otherwise we would get a defer.AlreadyCalledError). Should we be relying on that?
+ self.uploader.set_uploaded_callback(d.callback)
+
+ path_u = abspath_expanduser_unicode(name_u, base=self.local_dir)
+ path = to_filepath(path_u)
+
+ # We don't use FilePath.setContent() here because it creates a temporary file that
+ # is renamed into place, which causes events that the test is not expecting.
+ f = open(path_u, "wb")
+ try:
+ if temporary and sys.platform != "win32":
+ os.unlink(path_u)
+ f.write(data)
+ finally:
+ f.close()
+ if temporary and sys.platform == "win32":
+ os.unlink(path_u)
+ fileutil.flush_volume(path_u)
+ self.notify_close_write(path)
+
+ if temporary:
+ d.addCallback(lambda ign: self.shouldFail(NoSuchChildError, 'temp file not uploaded', None,
+ self.upload_dirnode.get, name_u))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_disappeared'),
+ previously_disappeared + 1))
+ else:
+ d.addCallback(lambda ign: self.upload_dirnode.get(name_u))
+ d.addCallback(download_to_data)
+ d.addCallback(lambda actual_data: self.failUnlessReallyEqual(actual_data, data))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_uploaded'),
+ previously_uploaded + 1))
+
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_queued'), 0))
+ return d
+
+
+class MockTest(MagicFolderTestMixin, unittest.TestCase):
+ """This can run on any platform, and even if twisted.internet.inotify can't be imported."""
+
+ def test_errors(self):
+ self.set_up_grid()
+
+ errors_dir = abspath_expanduser_unicode(u"errors_dir", base=self.basedir)
+ os.mkdir(errors_dir)
+ not_a_dir = abspath_expanduser_unicode(u"NOT_A_DIR", base=self.basedir)
+ fileutil.write(not_a_dir, "")
+ magicfolderdb = abspath_expanduser_unicode(u"magicfolderdb", base=self.basedir)
+ doesnotexist = abspath_expanduser_unicode(u"doesnotexist", base=self.basedir)
+
+ client = self.g.clients[0]
+ d = client.create_dirnode()
+ def _made_upload_dir(n):
+ self.failUnless(IDirectoryNode.providedBy(n))
+ upload_dircap = n.get_uri()
+ readonly_dircap = n.get_readonly_uri()
+
+ self.shouldFail(AssertionError, 'nonexistent local.directory', 'there is no directory',
+ MagicFolder, client, upload_dircap, doesnotexist, inotify=fake_inotify)
+ self.shouldFail(AssertionError, 'non-directory local.directory', 'is not a directory',
+ MagicFolder, client, upload_dircap, not_a_dir, inotify=fake_inotify)
+ self.shouldFail(AssertionError, 'bad upload.dircap', 'does not refer to a directory',
+ MagicFolder, client, 'bad', errors_dir, inotify=fake_inotify)
+ self.shouldFail(AssertionError, 'non-directory upload.dircap', 'does not refer to a directory',
+ MagicFolder, client, 'URI:LIT:foo', errors_dir, inotify=fake_inotify)
+ self.shouldFail(AssertionError, 'readonly upload.dircap', 'is not a writecap to a directory',
+ MagicFolder, client, readonly_dircap, errors_dir, inotify=fake_inotify)
+ d.addCallback(_made_upload_dir)
+ return d
+
+ def test_drop_upload(self):
+ self.inotify = fake_inotify
+ self.basedir = "drop_upload.MockTest.test_drop_upload"
+ return self._test()
+
+ def notify_close_write(self, path):
+ self.uploader._notifier.event(path, self.inotify.IN_CLOSE_WRITE)
+
+
+class RealTest(MagicFolderTestMixin, unittest.TestCase):
+ """This is skipped unless both Twisted and the platform support inotify."""
+
+ def test_drop_upload(self):
+ self.inotify = None # use the appropriate inotify for the platform
+ self.basedir = "drop_upload.RealTest.test_drop_upload"
+ return self._test()
+
+ def notify_close_write(self, path):
+ # Writing to the file causes the notification.
+ pass
+
+if sys.platform != "win32" and not runtime.platform.supportsINotify():
+ RealTest.skip = "Magic Folder support can only be tested for-real on an OS that supports inotify or equivalent."