class DropUploader(service.MultiService):
name = 'drop-upload'
- def __init__(self, client, upload_dircap, local_dir, dbfile, inotify=None):
+ def __init__(self, client, upload_dircap, local_dir, dbfile, inotify=None,
+ pending_delay=1.0):
precondition_abspath(local_dir)
service.MultiService.__init__(self)
self._dbfile = dbfile
if inotify is None:
- from twisted.internet import inotify
+ if sys.platform == "win32":
+ from allmydata.windows import inotify
+ else:
+ from twisted.internet import inotify
self._inotify = inotify
if not self._local_path.exists():
self._uploaded_callback = lambda ign: None
self._notifier = inotify.INotify()
+ if hasattr(self._notifier, 'set_pending_delay'):
+ self._notifier.set_pending_delay(pending_delay)
# We don't watch for IN_CREATE, because that would cause us to read and upload a
# possibly-incomplete file before the application has closed it. There should always
class MockDropUploader(service.MultiService):
name = 'drop-upload'
- def __init__(self, client, upload_dircap, local_dir, dbfile, inotify=None):
+ def __init__(self, client, upload_dircap, local_dir, dbfile, inotify=None,
+ pending_delay=1.0):
service.MultiService.__init__(self)
self.client = client
self.upload_dircap = upload_dircap
"""This is skipped unless both Twisted and the platform support inotify."""
def test_drop_upload(self):
- # We should always have runtime.platform.supportsINotify, because we're using
- # Twisted >= 10.1.
- if not runtime.platform.supportsINotify():
- raise unittest.SkipTest("Drop-upload support can only be tested for-real on an OS that supports inotify or equivalent.")
-
self.inotify = None # use the appropriate inotify for the platform
self.basedir = "drop_upload.RealTest.test_drop_upload"
return self._test()
def notify_close_write(self, path):
# Writing to the file causes the notification.
pass
+
+if sys.platform != "win32" and not runtime.platform.supportsINotify():
+ RealTest.skip = "Drop-upload support can only be tested for-real on an OS that supports inotify or equivalent."