From: david-sarah <>
Date: Mon, 8 Aug 2011 23:40:49 +0000 (-0700)
Subject: Drop-upload frontend, rerecorded for 1.9 beta (and correcting a minor mistake). Inclu... 

Drop-upload frontend, rerecorded for 1.9 beta (and correcting a minor mistake). Includes some fixes for Windows but not the Windows inotify implementation. fixes #1429

diff --git a/src/allmydata/ b/src/allmydata/
index 36e20bf1..f998d365 100644
--- a/src/allmydata/
+++ b/src/allmydata/
@@ -19,6 +19,8 @@ install_requires = [
     "zope.interface == 3.3.1, == 3.5.3, == 3.6.1",
     # On Windows we need at least Twisted 9.0 to avoid an indirect dependency on pywin32.
+    # On Linux we need at least Twisted 10.1.0 for inotify support used by the drop-upload
+    # frontend.
     # We also need Twisted 10.1 for the FTP frontend in order for Twisted's FTP server to
     # support asynchronous close.
     "Twisted >= 10.1.0",
diff --git a/src/allmydata/ b/src/allmydata/
index ac3b2e00..eb3d7679 100644
--- a/src/allmydata/
+++ b/src/allmydata/
@@ -150,6 +150,7 @@ class Client(node.Node, pollmixin.PollMixin):
         # ControlServer and Helper are attached after Tub startup
+        self.init_drop_uploader()
         hotline_file = os.path.join(self.basedir,
@@ -421,6 +422,22 @@ class Client(node.Node, pollmixin.PollMixin):
                                  sftp_portstr, pubkey_file, privkey_file)
+    def init_drop_uploader(self):
+        if self.get_config("drop_upload", "enabled", False, boolean=True):
+            upload_uri = self.get_config("drop_upload", "upload.uri", None)
+            local_dir_utf8 = self.get_config("drop_upload", "", None)
+            if upload_uri and local_dir_utf8:
+                try:
+                    from allmydata.frontends import drop_upload
+                    s = drop_upload.DropUploader(self, upload_uri, local_dir_utf8)
+                    s.setServiceParent(self)
+                    s.start()
+                except Exception, e:
+                    self.log("couldn't start drop-uploader: %r", args=(e,))
+            else:
+                self.log("couldn't start drop-uploader: upload.uri or not specified")
     def _check_hotline(self, hotline_file):
         if os.path.exists(hotline_file):
             mtime = os.stat(hotline_file)[stat.ST_MTIME]
diff --git a/src/allmydata/frontends/ b/src/allmydata/frontends/
new file mode 100644
index 00000000..71c471bb
--- /dev/null
+++ b/src/allmydata/frontends/
@@ -0,0 +1,117 @@
+import os, sys
+from twisted.internet import defer
+from twisted.python.filepath import FilePath
+from twisted.application import service
+from foolscap.api import eventually
+from allmydata.interfaces import IDirectoryNode
+from allmydata.util.encodingutil import quote_output, get_filesystem_encoding
+from allmydata.immutable.upload import FileName
+class DropUploader(service.MultiService):
+    def __init__(self, client, upload_uri, local_dir_utf8, inotify=None):
+        service.MultiService.__init__(self)
+        try:
+            local_dir_u = os.path.expanduser(local_dir_utf8.decode('utf-8'))
+            if sys.platform == "win32":
+                local_dir = local_dir_u
+            else:
+                local_dir = local_dir_u.encode(get_filesystem_encoding())
+        except (UnicodeEncodeError, UnicodeDecodeError):
+            raise AssertionError("The drop-upload path %s was not valid UTF-8 or could not be represented in the filesystem encoding."
+                                 % quote_output(local_dir_utf8))
+        self._client = client
+        self._stats_provider = client.stats_provider
+        self._convergence = client.convergence
+        self._local_path = FilePath(local_dir)
+        if inotify is None:
+            from twisted.internet import inotify
+        self._inotify = inotify
+        if not self._local_path.isdir():
+            raise AssertionError("The drop-upload local path %r was not an existing directory." % quote_output(local_dir_u))
+        # TODO: allow a path rather than an URI.
+        self._parent = self._client.create_node_from_uri(upload_uri)
+        if not IDirectoryNode.providedBy(self._parent):
+            raise AssertionError("The drop-upload remote URI is not a directory URI.")
+        if self._parent.is_unknown() or self._parent.is_readonly():
+            raise AssertionError("The drop-upload remote URI does not refer to a writeable directory.")
+        self._uploaded_callback = lambda ign: None
+        self._notifier = inotify.INotify()
+        # We don't watch for IN_CREATE, because that would cause us to read and upload a
+        # possibly-incomplete file before the application has closed it. There should always
+        # be an IN_CLOSE_WRITE after an IN_CREATE (I think).
+        # TODO: what about IN_MOVE_SELF or IN_UNMOUNT?
+        mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO | inotify.IN_ONLYDIR
+, mask=mask, callbacks=[self._notify])
+    def start(self):
+        d = self._notifier.startReading()
+        self._stats_provider.count('drop_upload.dirs_monitored', 1)
+        return d
+    def _notify(self, opaque, path, events_mask):
+        self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
+        self._stats_provider.count('drop_upload.files_queued', 1)
+        eventually(self._process, opaque, path, events_mask)
+    def _process(self, opaque, path, events_mask):
+        d = defer.succeed(None)
+        # FIXME: if this already exists as a mutable file, we replace the directory entry,
+        # but we should probably modify the file (as the SFTP frontend does).
+        def _add_file(ign):
+            name = path.basename()
+            # on Windows the name is already Unicode
+            if not isinstance(name, unicode):
+                name = name.decode(get_filesystem_encoding())
+            u = FileName(path.path, self._convergence)
+            return self._parent.add_file(name, u)
+        d.addCallback(_add_file)
+        def _succeeded(ign):
+            self._stats_provider.count('drop_upload.files_queued', -1)
+            self._stats_provider.count('drop_upload.files_uploaded', 1)
+        def _failed(f):
+            self._stats_provider.count('drop_upload.files_queued', -1)
+            if path.exists():
+                self._log("drop-upload: %r failed to upload due to %r" % (path.path, f))
+                self._stats_provider.count('drop_upload.files_failed', 1)
+                return f
+            else:
+                self._log("drop-upload: notified file %r disappeared "
+                          "(this is normal for temporary files): %r" % (path.path, f))
+                self._stats_provider.count('drop_upload.files_disappeared', 1)
+                return None
+        d.addCallbacks(_succeeded, _failed)
+        d.addBoth(self._uploaded_callback)
+        return d
+    def set_uploaded_callback(self, callback):
+        """This sets a function that will be called after a file has been uploaded."""
+        self._uploaded_callback = callback
+    def finish(self, for_tests=False):
+        self._notifier.stopReading()
+        self._stats_provider.count('drop_upload.dirs_monitored', -1)
+        if for_tests and hasattr(self._notifier, 'wait_until_stopped'):
+            return self._notifier.wait_until_stopped()
+        else:
+            return defer.succeed(None)
+    def _log(self, msg):
+        self._client.log(msg)
+        #open("events", "ab+").write(msg)
diff --git a/src/allmydata/scripts/ b/src/allmydata/scripts/
index 41554fa2..9c223bf0 100644
--- a/src/allmydata/scripts/
+++ b/src/allmydata/scripts/
@@ -152,6 +152,14 @@ def create_node(config, out=sys.stdout, err=sys.stderr):
     c.write("enabled = false\n")
+    c.write("[drop_upload]\n")
+    c.write("# Shall this node automatically upload files created or modified in a local directory?\n")
+    c.write("enabled = false\n")
+    c.write("# This must be an URI for a writeable directory.\n")
+    c.write("upload.uri =\n")
+    c.write(" = ~/drop_upload\n")
+    c.write("\n")
     from allmydata.util import fileutil
diff --git a/src/allmydata/test/ b/src/allmydata/test/
new file mode 100644
index 00000000..73275be5
--- /dev/null
+++ b/src/allmydata/test/
@@ -0,0 +1,185 @@
+import os, sys, platform
+from twisted.trial import unittest
+from twisted.python import filepath, runtime
+from twisted.internet import defer, base
+from allmydata.interfaces import IDirectoryNode, NoSuchChildError
+from allmydata.util import fileutil, fake_inotify
+from allmydata.util.encodingutil import get_filesystem_encoding
+from allmydata.util.consumer import download_to_data
+from allmydata.test.no_network import GridTestMixin
+from allmydata.test.common_util import ReallyEqualMixin
+from allmydata.test.common import ShouldFailMixin
+from allmydata.frontends.drop_upload import DropUploader
+class DropUploadTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin):
+    """
+    These tests will be run both with a mock notifier, and (on platforms that support it)
+    with the real INotify.
+    """
+    def _get_count(self, name):
+        return self.stats_provider.get_stats()["counters"].get(name, 0)
+    def _test(self):
+        self.uploader = None
+        self.set_up_grid()
+        dirname_u = u"loc\u0101l_dir"
+        if sys.platform != "win32":
+            try:
+                u"loc\u0101l_dir".encode(get_filesystem_encoding())
+            except UnicodeEncodeError:
+                dirname_u = u"local_dir"
+        self.local_dir = os.path.join(self.basedir, dirname_u)
+        os.mkdir(self.local_dir)
+        self.client = self.g.clients[0]
+        self.stats_provider = self.client.stats_provider
+        d = self.client.create_dirnode()
+        def _made_upload_dir(n):
+            self.failUnless(IDirectoryNode.providedBy(n))
+            self.upload_dirnode = n
+            self.upload_uri = n.get_uri()
+            self.uploader = DropUploader(self.client, self.upload_uri, self.local_dir.encode('utf-8'),
+                                         inotify=self.inotify)
+            return self.uploader.start()
+        d.addCallback(_made_upload_dir)
+        # Write something short enough for a LIT file.
+        d.addCallback(lambda ign: self._test_file(u"short", "test"))
+        # Write to the same file again with different data.
+        d.addCallback(lambda ign: self._test_file(u"short", "different"))
+        # Test that temporary files are not uploaded.
+        d.addCallback(lambda ign: self._test_file(u"tempfile", "test", temporary=True))
+        # Test that we tolerate creation of a subdirectory.
+        d.addCallback(lambda ign: os.mkdir(os.path.join(self.local_dir, u"directory")))
+        # Write something longer, and also try to test a Unicode name if the fs can represent it.
+        name_u = u"l\u00F8ng"
+        if sys.platform != "win32":
+            try:
+                u"l\u00F8ng".encode(get_filesystem_encoding())
+            except UnicodeEncodeError:
+                name_u = u"long"
+        d.addCallback(lambda ign: self._test_file(name_u, "test"*100))
+        # TODO: test that causes an upload failure.
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_failed'), 0))
+        # Prevent unclean reactor errors.
+        def _cleanup(res):
+            d = defer.succeed(None)
+            if self.uploader is not None:
+                d.addCallback(lambda ign: self.uploader.finish(for_tests=True))
+            d.addCallback(lambda ign: res)
+            return d
+        d.addBoth(_cleanup)
+        return d
+    def _test_file(self, name_u, data, temporary=False):
+        previously_uploaded = self._get_count('drop_upload.files_uploaded')
+        previously_disappeared = self._get_count('drop_upload.files_disappeared')
+        d = defer.Deferred()
+        # Note: this relies on the fact that we only get one IN_CLOSE_WRITE notification per file
+        # (otherwise we would get a defer.AlreadyCalledError). Should we be relying on that?
+        self.uploader.set_uploaded_callback(d.callback)
+        path_u = os.path.join(self.local_dir, name_u)
+        if sys.platform == "win32":
+            path = filepath.FilePath(path_u)
+        else:
+            path = filepath.FilePath(path_u.encode(get_filesystem_encoding()))
+        f = open(path.path, "wb")
+        try:
+            if temporary and sys.platform != "win32":
+                os.unlink(path.path)
+            f.write(data)
+        finally:
+            f.close()
+        if temporary and sys.platform == "win32":
+            os.unlink(path.path)
+        self.notify_close_write(path)
+        if temporary:
+            d.addCallback(lambda ign: self.shouldFail(NoSuchChildError, 'temp file not uploaded', None,
+                                                      self.upload_dirnode.get, name_u))
+            d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_disappeared'),
+                                                                 previously_disappeared + 1))
+        else:
+            d.addCallback(lambda ign: self.upload_dirnode.get(name_u))
+            d.addCallback(download_to_data)
+            d.addCallback(lambda actual_data: self.failUnlessReallyEqual(actual_data, data))
+            d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_uploaded'),
+                                                                 previously_uploaded + 1))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_queued'), 0))
+        return d
+class MockTest(DropUploadTestMixin, unittest.TestCase):
+    """This can run on any platform, and even if twisted.internet.inotify can't be imported."""
+    def test_errors(self):
+        self.basedir = "drop_upload.MockTest.test_errors"
+        self.set_up_grid()
+        errors_dir = os.path.join(self.basedir, "errors_dir")
+        os.mkdir(errors_dir)
+        client = self.g.clients[0]
+        d = client.create_dirnode()
+        def _made_upload_dir(n):
+            self.failUnless(IDirectoryNode.providedBy(n))
+            upload_uri = n.get_uri()
+            readonly_uri = n.get_readonly_uri()
+            self.shouldFail(AssertionError, 'invalid local dir', 'could not be represented',
+                            DropUploader, client, upload_uri, '\xFF', inotify=fake_inotify)
+            self.shouldFail(AssertionError, 'non-existant local dir', 'not an existing directory',
+                            DropUploader, client, upload_uri, os.path.join(self.basedir, "Laputa"), inotify=fake_inotify)
+            self.shouldFail(AssertionError, 'bad URI', 'not a directory URI',
+                            DropUploader, client, 'bad', errors_dir, inotify=fake_inotify)
+            self.shouldFail(AssertionError, 'non-directory URI', 'not a directory URI',
+                            DropUploader, client, 'URI:LIT:foo', errors_dir, inotify=fake_inotify)
+            self.shouldFail(AssertionError, 'readonly directory URI', 'does not refer to a writeable directory',
+                            DropUploader, client, readonly_uri, errors_dir, inotify=fake_inotify)
+        d.addCallback(_made_upload_dir)
+        return d
+    def test_drop_upload(self):
+        self.inotify = fake_inotify
+        self.basedir = "drop_upload.MockTest.test_drop_upload"
+        return self._test()
+    def notify_close_write(self, path):
+        self.uploader._notifier.event(path, self.inotify.IN_CLOSE_WRITE)
+class RealTest(DropUploadTestMixin, unittest.TestCase):
+    """This is skipped unless both Twisted and the platform support inotify."""
+    def test_drop_upload(self):
+        # We should always have runtime.platform.supportsINotify, because we're using
+        # Twisted >= 10.1.
+        if not runtime.platform.supportsINotify():
+            raise unittest.SkipTest("Drop-upload support can only be tested for-real on an OS that supports inotify or equivalent.")
+        self.inotify = None  # use the appropriate inotify for the platform
+        self.basedir = "drop_upload.RealTest.test_drop_upload"
+        return self._test()
+    def notify_close_write(self, path):
+        # Writing to the file causes the notification.
+        pass
diff --git a/src/allmydata/test/ b/src/allmydata/test/
index db2078d3..18d3786a 100644
--- a/src/allmydata/test/
+++ b/src/allmydata/test/
@@ -253,6 +253,8 @@ class CreateNode(unittest.TestCase):
                 self.failUnless("\n\[storage\]\n#.*\nenabled = true\n", content), content)
                 self.failUnless("\nreserved_space = 1G\n" in content)
+            self.failUnless("\n\[drop_upload\]\n#.*\nenabled = false\n", content), content)
         # creating the node a second time should be rejected
         rc, out, err = self.run_tahoe(argv)
         self.failIfEqual(rc, 0, str((out, err, rc)))
diff --git a/src/allmydata/util/ b/src/allmydata/util/
new file mode 100644
index 00000000..793c5309
--- /dev/null
+++ b/src/allmydata/util/
@@ -0,0 +1,99 @@
+# Most of this is copied from Twisted 11.0. The reason for this hack is that
+# twisted.internet.inotify can't be imported when the platform does not support inotify.
+# from /usr/src/linux/include/linux/inotify.h
+IN_ACCESS = 0x00000001L         # File was accessed
+IN_MODIFY = 0x00000002L         # File was modified
+IN_ATTRIB = 0x00000004L         # Metadata changed
+IN_CLOSE_WRITE = 0x00000008L    # Writeable file was closed
+IN_CLOSE_NOWRITE = 0x00000010L  # Unwriteable file closed
+IN_OPEN = 0x00000020L           # File was opened
+IN_MOVED_FROM = 0x00000040L     # File was moved from X
+IN_MOVED_TO = 0x00000080L       # File was moved to Y
+IN_CREATE = 0x00000100L         # Subfile was created
+IN_DELETE = 0x00000200L         # Subfile was delete
+IN_DELETE_SELF = 0x00000400L    # Self was deleted
+IN_MOVE_SELF = 0x00000800L      # Self was moved
+IN_UNMOUNT = 0x00002000L        # Backing fs was unmounted
+IN_Q_OVERFLOW = 0x00004000L     # Event queued overflowed
+IN_IGNORED = 0x00008000L        # File was ignored
+IN_ONLYDIR = 0x01000000         # only watch the path if it is a directory
+IN_DONT_FOLLOW = 0x02000000     # don't follow a sym link
+IN_MASK_ADD = 0x20000000        # add to the mask of an already existing watch
+IN_ISDIR = 0x40000000           # event occurred against dir
+IN_ONESHOT = 0x80000000         # only send event once
+IN_MOVED = IN_MOVED_FROM | IN_MOVED_TO           # moves
+IN_CHANGED = IN_MODIFY | IN_ATTRIB               # changes
+                 IN_CREATE | IN_DELETE |
+                 IN_DELETE_SELF | IN_MOVE_SELF |
+                 IN_UNMOUNT | IN_MOVED_FROM | IN_MOVED_TO)
+    (IN_ACCESS, 'access'),
+    (IN_MODIFY, 'modify'),
+    (IN_ATTRIB, 'attrib'),
+    (IN_CLOSE_WRITE, 'close_write'),
+    (IN_CLOSE_NOWRITE, 'close_nowrite'),
+    (IN_OPEN, 'open'),
+    (IN_MOVED_FROM, 'moved_from'),
+    (IN_MOVED_TO, 'moved_to'),
+    (IN_CREATE, 'create'),
+    (IN_DELETE, 'delete'),
+    (IN_DELETE_SELF, 'delete_self'),
+    (IN_MOVE_SELF, 'move_self'),
+    (IN_UNMOUNT, 'unmount'),
+    (IN_Q_OVERFLOW, 'queue_overflow'),
+    (IN_IGNORED, 'ignored'),
+    (IN_ONLYDIR, 'only_dir'),
+    (IN_DONT_FOLLOW, 'dont_follow'),
+    (IN_MASK_ADD, 'mask_add'),
+    (IN_ISDIR, 'is_dir'),
+    (IN_ONESHOT, 'one_shot')
+def humanReadableMask(mask):
+    """
+    Auxiliary function that converts an hexadecimal mask into a series
+    of human readable flags.
+    """
+    s = []
+    for k, v in _FLAG_TO_HUMAN:
+        if k & mask:
+            s.append(v)
+    return s
+# This class is not copied from Twisted; it acts as a mock.
+class INotify(object):
+    def startReading(self):
+        pass
+    def stopReading(self):
+        pass
+    def watch(self, filepath, mask=IN_WATCH_MASK, autoAdd=False, callbacks=None, recursive=False):
+        self.callbacks = callbacks
+    def event(self, filepath, mask):
+        for cb in self.callbacks:
+            cb(None, filepath, mask)
+__all__ = ["INotify", "humanReadableMask", "IN_WATCH_MASK", "IN_ACCESS",
+           "IN_MASK_ADD", "IN_ISDIR", "IN_ONESHOT", "IN_CLOSE",
+           "IN_MOVED", "IN_CHANGED"]
diff --git a/src/allmydata/web/statistics.xhtml b/src/allmydata/web/statistics.xhtml
index 3246fd80..b10d6793 100644
--- a/src/allmydata/web/statistics.xhtml
+++ b/src/allmydata/web/statistics.xhtml
@@ -9,6 +9,8 @@
 <h1>Node Statistics</h1>
   <li>Load Average: <span n:render="load_average" /></li>
   <li>Peak Load: <span n:render="peak_load" /></li>
@@ -18,6 +20,15 @@
   <li>Files Retrieved (mutable): <span n:render="retrieves" /></li>
+  <li>Local Directories Monitored: <span n:render="drop_monitored" /></li>
+  <li>Files Uploaded: <span n:render="drop_uploads" /></li>
+  <li>File Changes Queued: <span n:render="drop_queued" /></li>
+  <li>Failed Uploads: <span n:render="drop_failed" /></li>
 <h2>Raw Stats:</h2>
 <pre n:render="raw" />
diff --git a/src/allmydata/web/ b/src/allmydata/web/
index fa31f8ec..cdce5ee1 100644
--- a/src/allmydata/web/
+++ b/src/allmydata/web/
@@ -1290,6 +1290,23 @@ class Statistics(rend.Page):
         return "%s files / %s bytes (%s)" % (files, bytes,
+    def render_drop_monitored(self, ctx, data):
+        dirs = data["counters"].get("drop_upload.dirs_monitored", 0)
+        return "%s directories" % (dirs,)
+    def render_drop_uploads(self, ctx, data):
+        # TODO: bytes uploaded
+        files = data["counters"].get("drop_upload.files_uploaded", 0)
+        return "%s files" % (files,)
+    def render_drop_queued(self, ctx, data):
+        files = data["counters"].get("drop_upload.files_queued", 0)
+        return "%s files" % (files,)
+    def render_drop_failed(self, ctx, data):
+        files = data["counters"].get("drop_upload.files_failed", 0)
+        return "%s files" % (files,)
     def render_raw(self, ctx, data):
         raw = pprint.pformat(data)
         return ctx.tag[raw]