"zope.interface == 3.3.1, == 3.5.3, == 3.6.1",
# On Windows we need at least Twisted 9.0 to avoid an indirect dependency on pywin32.
+ # On Linux we need at least Twisted 10.1.0 for inotify support used by the drop-upload
+ # frontend.
# We also need Twisted 10.1 for the FTP frontend in order for Twisted's FTP server to
# support asynchronous close.
"Twisted >= 10.1.0",
# ControlServer and Helper are attached after Tub startup
self.init_ftp_server()
self.init_sftp_server()
+ self.init_drop_uploader()
hotline_file = os.path.join(self.basedir,
self.SUICIDE_PREVENTION_HOTLINE_FILE)
sftp_portstr, pubkey_file, privkey_file)
s.setServiceParent(self)
+ def init_drop_uploader(self):
+ if self.get_config("drop_upload", "enabled", False, boolean=True):
+ upload_uri = self.get_config("drop_upload", "upload.uri", None)
+ local_dir_utf8 = self.get_config("drop_upload", "local.directory", None)
+
+ if upload_uri and local_dir_utf8:
+ try:
+ from allmydata.frontends import drop_upload
+ s = drop_upload.DropUploader(self, upload_uri, local_dir_utf8)
+ s.setServiceParent(self)
+ s.start()
+ except Exception, e:
+ self.log("couldn't start drop-uploader: %r", args=(e,))
+ else:
+ self.log("couldn't start drop-uploader: upload.uri or local.directory not specified")
+
def _check_hotline(self, hotline_file):
if os.path.exists(hotline_file):
mtime = os.stat(hotline_file)[stat.ST_MTIME]
--- /dev/null
+
+import os, sys
+
+from twisted.internet import defer
+from twisted.python.filepath import FilePath
+from twisted.application import service
+from foolscap.api import eventually
+
+from allmydata.interfaces import IDirectoryNode
+
+from allmydata.util.encodingutil import quote_output, get_filesystem_encoding
+from allmydata.immutable.upload import FileName
+
+
+class DropUploader(service.MultiService):
+ def __init__(self, client, upload_uri, local_dir_utf8, inotify=None):
+ service.MultiService.__init__(self)
+
+ try:
+ local_dir_u = os.path.expanduser(local_dir_utf8.decode('utf-8'))
+ if sys.platform == "win32":
+ local_dir = local_dir_u
+ else:
+ local_dir = local_dir_u.encode(get_filesystem_encoding())
+ except (UnicodeEncodeError, UnicodeDecodeError):
+ raise AssertionError("The drop-upload path %s was not valid UTF-8 or could not be represented in the filesystem encoding."
+ % quote_output(local_dir_utf8))
+
+ self._client = client
+ self._stats_provider = client.stats_provider
+ self._convergence = client.convergence
+ self._local_path = FilePath(local_dir)
+
+ if inotify is None:
+ from twisted.internet import inotify
+ self._inotify = inotify
+
+ if not self._local_path.isdir():
+ raise AssertionError("The drop-upload local path %r was not an existing directory." % quote_output(local_dir_u))
+
+ # TODO: allow a path rather than an URI.
+ self._parent = self._client.create_node_from_uri(upload_uri)
+ if not IDirectoryNode.providedBy(self._parent):
+ raise AssertionError("The drop-upload remote URI is not a directory URI.")
+ if self._parent.is_unknown() or self._parent.is_readonly():
+ raise AssertionError("The drop-upload remote URI does not refer to a writeable directory.")
+
+ self._uploaded_callback = lambda ign: None
+
+ self._notifier = inotify.INotify()
+
+ # We don't watch for IN_CREATE, because that would cause us to read and upload a
+ # possibly-incomplete file before the application has closed it. There should always
+ # be an IN_CLOSE_WRITE after an IN_CREATE (I think).
+ # TODO: what about IN_MOVE_SELF or IN_UNMOUNT?
+ mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO | inotify.IN_ONLYDIR
+ self._notifier.watch(self._local_path, mask=mask, callbacks=[self._notify])
+
+ def start(self):
+ d = self._notifier.startReading()
+ self._stats_provider.count('drop_upload.dirs_monitored', 1)
+ return d
+
+ def _notify(self, opaque, path, events_mask):
+ self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
+
+ self._stats_provider.count('drop_upload.files_queued', 1)
+ eventually(self._process, opaque, path, events_mask)
+
+ def _process(self, opaque, path, events_mask):
+ d = defer.succeed(None)
+
+ # FIXME: if this already exists as a mutable file, we replace the directory entry,
+ # but we should probably modify the file (as the SFTP frontend does).
+ def _add_file(ign):
+ name = path.basename()
+ # on Windows the name is already Unicode
+ if not isinstance(name, unicode):
+ name = name.decode(get_filesystem_encoding())
+
+ u = FileName(path.path, self._convergence)
+ return self._parent.add_file(name, u)
+ d.addCallback(_add_file)
+
+ def _succeeded(ign):
+ self._stats_provider.count('drop_upload.files_queued', -1)
+ self._stats_provider.count('drop_upload.files_uploaded', 1)
+ def _failed(f):
+ self._stats_provider.count('drop_upload.files_queued', -1)
+ if path.exists():
+ self._log("drop-upload: %r failed to upload due to %r" % (path.path, f))
+ self._stats_provider.count('drop_upload.files_failed', 1)
+ return f
+ else:
+ self._log("drop-upload: notified file %r disappeared "
+ "(this is normal for temporary files): %r" % (path.path, f))
+ self._stats_provider.count('drop_upload.files_disappeared', 1)
+ return None
+ d.addCallbacks(_succeeded, _failed)
+ d.addBoth(self._uploaded_callback)
+ return d
+
+ def set_uploaded_callback(self, callback):
+ """This sets a function that will be called after a file has been uploaded."""
+ self._uploaded_callback = callback
+
+ def finish(self, for_tests=False):
+ self._notifier.stopReading()
+ self._stats_provider.count('drop_upload.dirs_monitored', -1)
+ if for_tests and hasattr(self._notifier, 'wait_until_stopped'):
+ return self._notifier.wait_until_stopped()
+ else:
+ return defer.succeed(None)
+
+ def _log(self, msg):
+ self._client.log(msg)
+ #open("events", "ab+").write(msg)
c.write("enabled = false\n")
c.write("\n")
+ c.write("[drop_upload]\n")
+ c.write("# Shall this node automatically upload files created or modified in a local directory?\n")
+ c.write("enabled = false\n")
+ c.write("# This must be an URI for a writeable directory.\n")
+ c.write("upload.uri =\n")
+ c.write("local.directory = ~/drop_upload\n")
+ c.write("\n")
+
c.close()
from allmydata.util import fileutil
--- /dev/null
+
+import os, sys, platform
+
+from twisted.trial import unittest
+from twisted.python import filepath, runtime
+from twisted.internet import defer, base
+
+from allmydata.interfaces import IDirectoryNode, NoSuchChildError
+
+from allmydata.util import fileutil, fake_inotify
+from allmydata.util.encodingutil import get_filesystem_encoding
+from allmydata.util.consumer import download_to_data
+from allmydata.test.no_network import GridTestMixin
+from allmydata.test.common_util import ReallyEqualMixin
+from allmydata.test.common import ShouldFailMixin
+
+from allmydata.frontends.drop_upload import DropUploader
+
+
+class DropUploadTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin):
+ """
+ These tests will be run both with a mock notifier, and (on platforms that support it)
+ with the real INotify.
+ """
+
+ def _get_count(self, name):
+ return self.stats_provider.get_stats()["counters"].get(name, 0)
+
+ def _test(self):
+ self.uploader = None
+ self.set_up_grid()
+ dirname_u = u"loc\u0101l_dir"
+ if sys.platform != "win32":
+ try:
+ u"loc\u0101l_dir".encode(get_filesystem_encoding())
+ except UnicodeEncodeError:
+ dirname_u = u"local_dir"
+ self.local_dir = os.path.join(self.basedir, dirname_u)
+ os.mkdir(self.local_dir)
+
+ self.client = self.g.clients[0]
+ self.stats_provider = self.client.stats_provider
+
+ d = self.client.create_dirnode()
+ def _made_upload_dir(n):
+ self.failUnless(IDirectoryNode.providedBy(n))
+ self.upload_dirnode = n
+ self.upload_uri = n.get_uri()
+ self.uploader = DropUploader(self.client, self.upload_uri, self.local_dir.encode('utf-8'),
+ inotify=self.inotify)
+ return self.uploader.start()
+ d.addCallback(_made_upload_dir)
+
+ # Write something short enough for a LIT file.
+ d.addCallback(lambda ign: self._test_file(u"short", "test"))
+
+ # Write to the same file again with different data.
+ d.addCallback(lambda ign: self._test_file(u"short", "different"))
+
+ # Test that temporary files are not uploaded.
+ d.addCallback(lambda ign: self._test_file(u"tempfile", "test", temporary=True))
+
+ # Test that we tolerate creation of a subdirectory.
+ d.addCallback(lambda ign: os.mkdir(os.path.join(self.local_dir, u"directory")))
+
+ # Write something longer, and also try to test a Unicode name if the fs can represent it.
+ name_u = u"l\u00F8ng"
+ if sys.platform != "win32":
+ try:
+ u"l\u00F8ng".encode(get_filesystem_encoding())
+ except UnicodeEncodeError:
+ name_u = u"long"
+ d.addCallback(lambda ign: self._test_file(name_u, "test"*100))
+
+ # TODO: test that causes an upload failure.
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_failed'), 0))
+
+ # Prevent unclean reactor errors.
+ def _cleanup(res):
+ d = defer.succeed(None)
+ if self.uploader is not None:
+ d.addCallback(lambda ign: self.uploader.finish(for_tests=True))
+ d.addCallback(lambda ign: res)
+ return d
+ d.addBoth(_cleanup)
+ return d
+
+ def _test_file(self, name_u, data, temporary=False):
+ previously_uploaded = self._get_count('drop_upload.files_uploaded')
+ previously_disappeared = self._get_count('drop_upload.files_disappeared')
+
+ d = defer.Deferred()
+
+ # Note: this relies on the fact that we only get one IN_CLOSE_WRITE notification per file
+ # (otherwise we would get a defer.AlreadyCalledError). Should we be relying on that?
+ self.uploader.set_uploaded_callback(d.callback)
+
+ path_u = os.path.join(self.local_dir, name_u)
+ if sys.platform == "win32":
+ path = filepath.FilePath(path_u)
+ else:
+ path = filepath.FilePath(path_u.encode(get_filesystem_encoding()))
+
+ f = open(path.path, "wb")
+ try:
+ if temporary and sys.platform != "win32":
+ os.unlink(path.path)
+ f.write(data)
+ finally:
+ f.close()
+ if temporary and sys.platform == "win32":
+ os.unlink(path.path)
+ self.notify_close_write(path)
+
+ if temporary:
+ d.addCallback(lambda ign: self.shouldFail(NoSuchChildError, 'temp file not uploaded', None,
+ self.upload_dirnode.get, name_u))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_disappeared'),
+ previously_disappeared + 1))
+ else:
+ d.addCallback(lambda ign: self.upload_dirnode.get(name_u))
+ d.addCallback(download_to_data)
+ d.addCallback(lambda actual_data: self.failUnlessReallyEqual(actual_data, data))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_uploaded'),
+ previously_uploaded + 1))
+
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_queued'), 0))
+ return d
+
+
+class MockTest(DropUploadTestMixin, unittest.TestCase):
+ """This can run on any platform, and even if twisted.internet.inotify can't be imported."""
+
+ def test_errors(self):
+ self.basedir = "drop_upload.MockTest.test_errors"
+ self.set_up_grid()
+ errors_dir = os.path.join(self.basedir, "errors_dir")
+ os.mkdir(errors_dir)
+
+ client = self.g.clients[0]
+ d = client.create_dirnode()
+ def _made_upload_dir(n):
+ self.failUnless(IDirectoryNode.providedBy(n))
+ upload_uri = n.get_uri()
+ readonly_uri = n.get_readonly_uri()
+
+ self.shouldFail(AssertionError, 'invalid local dir', 'could not be represented',
+ DropUploader, client, upload_uri, '\xFF', inotify=fake_inotify)
+ self.shouldFail(AssertionError, 'non-existant local dir', 'not an existing directory',
+ DropUploader, client, upload_uri, os.path.join(self.basedir, "Laputa"), inotify=fake_inotify)
+
+ self.shouldFail(AssertionError, 'bad URI', 'not a directory URI',
+ DropUploader, client, 'bad', errors_dir, inotify=fake_inotify)
+ self.shouldFail(AssertionError, 'non-directory URI', 'not a directory URI',
+ DropUploader, client, 'URI:LIT:foo', errors_dir, inotify=fake_inotify)
+ self.shouldFail(AssertionError, 'readonly directory URI', 'does not refer to a writeable directory',
+ DropUploader, client, readonly_uri, errors_dir, inotify=fake_inotify)
+ d.addCallback(_made_upload_dir)
+ return d
+
+ def test_drop_upload(self):
+ self.inotify = fake_inotify
+ self.basedir = "drop_upload.MockTest.test_drop_upload"
+ return self._test()
+
+ def notify_close_write(self, path):
+ self.uploader._notifier.event(path, self.inotify.IN_CLOSE_WRITE)
+
+
+class RealTest(DropUploadTestMixin, unittest.TestCase):
+ """This is skipped unless both Twisted and the platform support inotify."""
+
+ def test_drop_upload(self):
+ # We should always have runtime.platform.supportsINotify, because we're using
+ # Twisted >= 10.1.
+ if not runtime.platform.supportsINotify():
+ raise unittest.SkipTest("Drop-upload support can only be tested for-real on an OS that supports inotify or equivalent.")
+
+ self.inotify = None # use the appropriate inotify for the platform
+ self.basedir = "drop_upload.RealTest.test_drop_upload"
+ return self._test()
+
+ def notify_close_write(self, path):
+ # Writing to the file causes the notification.
+ pass
self.failUnless(re.search(r"\n\[storage\]\n#.*\nenabled = true\n", content), content)
self.failUnless("\nreserved_space = 1G\n" in content)
+ self.failUnless(re.search(r"\n\[drop_upload\]\n#.*\nenabled = false\n", content), content)
+
# creating the node a second time should be rejected
rc, out, err = self.run_tahoe(argv)
self.failIfEqual(rc, 0, str((out, err, rc)))
--- /dev/null
+
+# Most of this is copied from Twisted 11.0. The reason for this hack is that
+# twisted.internet.inotify can't be imported when the platform does not support inotify.
+
+
+# from /usr/src/linux/include/linux/inotify.h
+
+IN_ACCESS = 0x00000001L # File was accessed
+IN_MODIFY = 0x00000002L # File was modified
+IN_ATTRIB = 0x00000004L # Metadata changed
+IN_CLOSE_WRITE = 0x00000008L # Writeable file was closed
+IN_CLOSE_NOWRITE = 0x00000010L # Unwriteable file closed
+IN_OPEN = 0x00000020L # File was opened
+IN_MOVED_FROM = 0x00000040L # File was moved from X
+IN_MOVED_TO = 0x00000080L # File was moved to Y
+IN_CREATE = 0x00000100L # Subfile was created
+IN_DELETE = 0x00000200L # Subfile was delete
+IN_DELETE_SELF = 0x00000400L # Self was deleted
+IN_MOVE_SELF = 0x00000800L # Self was moved
+IN_UNMOUNT = 0x00002000L # Backing fs was unmounted
+IN_Q_OVERFLOW = 0x00004000L # Event queued overflowed
+IN_IGNORED = 0x00008000L # File was ignored
+
+IN_ONLYDIR = 0x01000000 # only watch the path if it is a directory
+IN_DONT_FOLLOW = 0x02000000 # don't follow a sym link
+IN_MASK_ADD = 0x20000000 # add to the mask of an already existing watch
+IN_ISDIR = 0x40000000 # event occurred against dir
+IN_ONESHOT = 0x80000000 # only send event once
+
+IN_CLOSE = IN_CLOSE_WRITE | IN_CLOSE_NOWRITE # closes
+IN_MOVED = IN_MOVED_FROM | IN_MOVED_TO # moves
+IN_CHANGED = IN_MODIFY | IN_ATTRIB # changes
+
+IN_WATCH_MASK = (IN_MODIFY | IN_ATTRIB |
+ IN_CREATE | IN_DELETE |
+ IN_DELETE_SELF | IN_MOVE_SELF |
+ IN_UNMOUNT | IN_MOVED_FROM | IN_MOVED_TO)
+
+
+_FLAG_TO_HUMAN = [
+ (IN_ACCESS, 'access'),
+ (IN_MODIFY, 'modify'),
+ (IN_ATTRIB, 'attrib'),
+ (IN_CLOSE_WRITE, 'close_write'),
+ (IN_CLOSE_NOWRITE, 'close_nowrite'),
+ (IN_OPEN, 'open'),
+ (IN_MOVED_FROM, 'moved_from'),
+ (IN_MOVED_TO, 'moved_to'),
+ (IN_CREATE, 'create'),
+ (IN_DELETE, 'delete'),
+ (IN_DELETE_SELF, 'delete_self'),
+ (IN_MOVE_SELF, 'move_self'),
+ (IN_UNMOUNT, 'unmount'),
+ (IN_Q_OVERFLOW, 'queue_overflow'),
+ (IN_IGNORED, 'ignored'),
+ (IN_ONLYDIR, 'only_dir'),
+ (IN_DONT_FOLLOW, 'dont_follow'),
+ (IN_MASK_ADD, 'mask_add'),
+ (IN_ISDIR, 'is_dir'),
+ (IN_ONESHOT, 'one_shot')
+]
+
+
+
+def humanReadableMask(mask):
+ """
+ Auxiliary function that converts an hexadecimal mask into a series
+ of human readable flags.
+ """
+ s = []
+ for k, v in _FLAG_TO_HUMAN:
+ if k & mask:
+ s.append(v)
+ return s
+
+
+# This class is not copied from Twisted; it acts as a mock.
+class INotify(object):
+ def startReading(self):
+ pass
+
+ def stopReading(self):
+ pass
+
+ def watch(self, filepath, mask=IN_WATCH_MASK, autoAdd=False, callbacks=None, recursive=False):
+ self.callbacks = callbacks
+
+ def event(self, filepath, mask):
+ for cb in self.callbacks:
+ cb(None, filepath, mask)
+
+
+__all__ = ["INotify", "humanReadableMask", "IN_WATCH_MASK", "IN_ACCESS",
+ "IN_MODIFY", "IN_ATTRIB", "IN_CLOSE_NOWRITE", "IN_CLOSE_WRITE",
+ "IN_OPEN", "IN_MOVED_FROM", "IN_MOVED_TO", "IN_CREATE",
+ "IN_DELETE", "IN_DELETE_SELF", "IN_MOVE_SELF", "IN_UNMOUNT",
+ "IN_Q_OVERFLOW", "IN_IGNORED", "IN_ONLYDIR", "IN_DONT_FOLLOW",
+ "IN_MASK_ADD", "IN_ISDIR", "IN_ONESHOT", "IN_CLOSE",
+ "IN_MOVED", "IN_CHANGED"]
<h1>Node Statistics</h1>
+<h2>General</h2>
+
<ul>
<li>Load Average: <span n:render="load_average" /></li>
<li>Peak Load: <span n:render="peak_load" /></li>
<li>Files Retrieved (mutable): <span n:render="retrieves" /></li>
</ul>
+<h2>Drop-Uploader</h2>
+
+<ul>
+ <li>Local Directories Monitored: <span n:render="drop_monitored" /></li>
+ <li>Files Uploaded: <span n:render="drop_uploads" /></li>
+ <li>File Changes Queued: <span n:render="drop_queued" /></li>
+ <li>Failed Uploads: <span n:render="drop_failed" /></li>
+</ul>
+
<h2>Raw Stats:</h2>
<pre n:render="raw" />
return "%s files / %s bytes (%s)" % (files, bytes,
abbreviate_size(bytes))
+ def render_drop_monitored(self, ctx, data):
+ dirs = data["counters"].get("drop_upload.dirs_monitored", 0)
+ return "%s directories" % (dirs,)
+
+ def render_drop_uploads(self, ctx, data):
+ # TODO: bytes uploaded
+ files = data["counters"].get("drop_upload.files_uploaded", 0)
+ return "%s files" % (files,)
+
+ def render_drop_queued(self, ctx, data):
+ files = data["counters"].get("drop_upload.files_queued", 0)
+ return "%s files" % (files,)
+
+ def render_drop_failed(self, ctx, data):
+ files = data["counters"].get("drop_upload.files_failed", 0)
+ return "%s files" % (files,)
+
def render_raw(self, ctx, data):
raw = pprint.pformat(data)
return ctx.tag[raw]