From: david-sarah Date: Mon, 8 Aug 2011 23:40:49 +0000 (-0700) Subject: Drop-upload frontend, rerecorded for 1.9 beta (and correcting a minor mistake). Inclu... X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/uri//%22%22.?a=commitdiff_plain;h=32a7717205ed824a89f77343ed9aaa2c826bedf6;p=tahoe-lafs%2Ftahoe-lafs.git Drop-upload frontend, rerecorded for 1.9 beta (and correcting a minor mistake). Includes some fixes for Windows but not the Windows inotify implementation. fixes #1429 --- diff --git a/src/allmydata/_auto_deps.py b/src/allmydata/_auto_deps.py index 36e20bf1..f998d365 100644 --- a/src/allmydata/_auto_deps.py +++ b/src/allmydata/_auto_deps.py @@ -19,6 +19,8 @@ install_requires = [ "zope.interface == 3.3.1, == 3.5.3, == 3.6.1", # On Windows we need at least Twisted 9.0 to avoid an indirect dependency on pywin32. + # On Linux we need at least Twisted 10.1.0 for inotify support used by the drop-upload + # frontend. # We also need Twisted 10.1 for the FTP frontend in order for Twisted's FTP server to # support asynchronous close. "Twisted >= 10.1.0", diff --git a/src/allmydata/client.py b/src/allmydata/client.py index ac3b2e00..eb3d7679 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -150,6 +150,7 @@ class Client(node.Node, pollmixin.PollMixin): # ControlServer and Helper are attached after Tub startup self.init_ftp_server() self.init_sftp_server() + self.init_drop_uploader() hotline_file = os.path.join(self.basedir, self.SUICIDE_PREVENTION_HOTLINE_FILE) @@ -421,6 +422,22 @@ class Client(node.Node, pollmixin.PollMixin): sftp_portstr, pubkey_file, privkey_file) s.setServiceParent(self) + def init_drop_uploader(self): + if self.get_config("drop_upload", "enabled", False, boolean=True): + upload_uri = self.get_config("drop_upload", "upload.uri", None) + local_dir_utf8 = self.get_config("drop_upload", "local.directory", None) + + if upload_uri and local_dir_utf8: + try: + from allmydata.frontends import drop_upload + s = drop_upload.DropUploader(self, upload_uri, local_dir_utf8) + s.setServiceParent(self) + s.start() + except Exception, e: + self.log("couldn't start drop-uploader: %r", args=(e,)) + else: + self.log("couldn't start drop-uploader: upload.uri or local.directory not specified") + def _check_hotline(self, hotline_file): if os.path.exists(hotline_file): mtime = os.stat(hotline_file)[stat.ST_MTIME] diff --git a/src/allmydata/frontends/drop_upload.py b/src/allmydata/frontends/drop_upload.py new file mode 100644 index 00000000..71c471bb --- /dev/null +++ b/src/allmydata/frontends/drop_upload.py @@ -0,0 +1,117 @@ + +import os, sys + +from twisted.internet import defer +from twisted.python.filepath import FilePath +from twisted.application import service +from foolscap.api import eventually + +from allmydata.interfaces import IDirectoryNode + +from allmydata.util.encodingutil import quote_output, get_filesystem_encoding +from allmydata.immutable.upload import FileName + + +class DropUploader(service.MultiService): + def __init__(self, client, upload_uri, local_dir_utf8, inotify=None): + service.MultiService.__init__(self) + + try: + local_dir_u = os.path.expanduser(local_dir_utf8.decode('utf-8')) + if sys.platform == "win32": + local_dir = local_dir_u + else: + local_dir = local_dir_u.encode(get_filesystem_encoding()) + except (UnicodeEncodeError, UnicodeDecodeError): + raise AssertionError("The drop-upload path %s was not valid UTF-8 or could not be represented in the filesystem encoding." + % quote_output(local_dir_utf8)) + + self._client = client + self._stats_provider = client.stats_provider + self._convergence = client.convergence + self._local_path = FilePath(local_dir) + + if inotify is None: + from twisted.internet import inotify + self._inotify = inotify + + if not self._local_path.isdir(): + raise AssertionError("The drop-upload local path %r was not an existing directory." % quote_output(local_dir_u)) + + # TODO: allow a path rather than an URI. + self._parent = self._client.create_node_from_uri(upload_uri) + if not IDirectoryNode.providedBy(self._parent): + raise AssertionError("The drop-upload remote URI is not a directory URI.") + if self._parent.is_unknown() or self._parent.is_readonly(): + raise AssertionError("The drop-upload remote URI does not refer to a writeable directory.") + + self._uploaded_callback = lambda ign: None + + self._notifier = inotify.INotify() + + # We don't watch for IN_CREATE, because that would cause us to read and upload a + # possibly-incomplete file before the application has closed it. There should always + # be an IN_CLOSE_WRITE after an IN_CREATE (I think). + # TODO: what about IN_MOVE_SELF or IN_UNMOUNT? + mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO | inotify.IN_ONLYDIR + self._notifier.watch(self._local_path, mask=mask, callbacks=[self._notify]) + + def start(self): + d = self._notifier.startReading() + self._stats_provider.count('drop_upload.dirs_monitored', 1) + return d + + def _notify(self, opaque, path, events_mask): + self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask)))) + + self._stats_provider.count('drop_upload.files_queued', 1) + eventually(self._process, opaque, path, events_mask) + + def _process(self, opaque, path, events_mask): + d = defer.succeed(None) + + # FIXME: if this already exists as a mutable file, we replace the directory entry, + # but we should probably modify the file (as the SFTP frontend does). + def _add_file(ign): + name = path.basename() + # on Windows the name is already Unicode + if not isinstance(name, unicode): + name = name.decode(get_filesystem_encoding()) + + u = FileName(path.path, self._convergence) + return self._parent.add_file(name, u) + d.addCallback(_add_file) + + def _succeeded(ign): + self._stats_provider.count('drop_upload.files_queued', -1) + self._stats_provider.count('drop_upload.files_uploaded', 1) + def _failed(f): + self._stats_provider.count('drop_upload.files_queued', -1) + if path.exists(): + self._log("drop-upload: %r failed to upload due to %r" % (path.path, f)) + self._stats_provider.count('drop_upload.files_failed', 1) + return f + else: + self._log("drop-upload: notified file %r disappeared " + "(this is normal for temporary files): %r" % (path.path, f)) + self._stats_provider.count('drop_upload.files_disappeared', 1) + return None + d.addCallbacks(_succeeded, _failed) + d.addBoth(self._uploaded_callback) + return d + + def set_uploaded_callback(self, callback): + """This sets a function that will be called after a file has been uploaded.""" + self._uploaded_callback = callback + + def finish(self, for_tests=False): + self._notifier.stopReading() + self._stats_provider.count('drop_upload.dirs_monitored', -1) + if for_tests and hasattr(self._notifier, 'wait_until_stopped'): + return self._notifier.wait_until_stopped() + else: + return defer.succeed(None) + + def _log(self, msg): + self._client.log(msg) + #open("events", "ab+").write(msg) diff --git a/src/allmydata/scripts/create_node.py b/src/allmydata/scripts/create_node.py index 41554fa2..9c223bf0 100644 --- a/src/allmydata/scripts/create_node.py +++ b/src/allmydata/scripts/create_node.py @@ -152,6 +152,14 @@ def create_node(config, out=sys.stdout, err=sys.stderr): c.write("enabled = false\n") c.write("\n") + c.write("[drop_upload]\n") + c.write("# Shall this node automatically upload files created or modified in a local directory?\n") + c.write("enabled = false\n") + c.write("# This must be an URI for a writeable directory.\n") + c.write("upload.uri =\n") + c.write("local.directory = ~/drop_upload\n") + c.write("\n") + c.close() from allmydata.util import fileutil diff --git a/src/allmydata/test/test_drop_upload.py b/src/allmydata/test/test_drop_upload.py new file mode 100644 index 00000000..73275be5 --- /dev/null +++ b/src/allmydata/test/test_drop_upload.py @@ -0,0 +1,185 @@ + +import os, sys, platform + +from twisted.trial import unittest +from twisted.python import filepath, runtime +from twisted.internet import defer, base + +from allmydata.interfaces import IDirectoryNode, NoSuchChildError + +from allmydata.util import fileutil, fake_inotify +from allmydata.util.encodingutil import get_filesystem_encoding +from allmydata.util.consumer import download_to_data +from allmydata.test.no_network import GridTestMixin +from allmydata.test.common_util import ReallyEqualMixin +from allmydata.test.common import ShouldFailMixin + +from allmydata.frontends.drop_upload import DropUploader + + +class DropUploadTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin): + """ + These tests will be run both with a mock notifier, and (on platforms that support it) + with the real INotify. + """ + + def _get_count(self, name): + return self.stats_provider.get_stats()["counters"].get(name, 0) + + def _test(self): + self.uploader = None + self.set_up_grid() + dirname_u = u"loc\u0101l_dir" + if sys.platform != "win32": + try: + u"loc\u0101l_dir".encode(get_filesystem_encoding()) + except UnicodeEncodeError: + dirname_u = u"local_dir" + self.local_dir = os.path.join(self.basedir, dirname_u) + os.mkdir(self.local_dir) + + self.client = self.g.clients[0] + self.stats_provider = self.client.stats_provider + + d = self.client.create_dirnode() + def _made_upload_dir(n): + self.failUnless(IDirectoryNode.providedBy(n)) + self.upload_dirnode = n + self.upload_uri = n.get_uri() + self.uploader = DropUploader(self.client, self.upload_uri, self.local_dir.encode('utf-8'), + inotify=self.inotify) + return self.uploader.start() + d.addCallback(_made_upload_dir) + + # Write something short enough for a LIT file. + d.addCallback(lambda ign: self._test_file(u"short", "test")) + + # Write to the same file again with different data. + d.addCallback(lambda ign: self._test_file(u"short", "different")) + + # Test that temporary files are not uploaded. + d.addCallback(lambda ign: self._test_file(u"tempfile", "test", temporary=True)) + + # Test that we tolerate creation of a subdirectory. + d.addCallback(lambda ign: os.mkdir(os.path.join(self.local_dir, u"directory"))) + + # Write something longer, and also try to test a Unicode name if the fs can represent it. + name_u = u"l\u00F8ng" + if sys.platform != "win32": + try: + u"l\u00F8ng".encode(get_filesystem_encoding()) + except UnicodeEncodeError: + name_u = u"long" + d.addCallback(lambda ign: self._test_file(name_u, "test"*100)) + + # TODO: test that causes an upload failure. + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_failed'), 0)) + + # Prevent unclean reactor errors. + def _cleanup(res): + d = defer.succeed(None) + if self.uploader is not None: + d.addCallback(lambda ign: self.uploader.finish(for_tests=True)) + d.addCallback(lambda ign: res) + return d + d.addBoth(_cleanup) + return d + + def _test_file(self, name_u, data, temporary=False): + previously_uploaded = self._get_count('drop_upload.files_uploaded') + previously_disappeared = self._get_count('drop_upload.files_disappeared') + + d = defer.Deferred() + + # Note: this relies on the fact that we only get one IN_CLOSE_WRITE notification per file + # (otherwise we would get a defer.AlreadyCalledError). Should we be relying on that? + self.uploader.set_uploaded_callback(d.callback) + + path_u = os.path.join(self.local_dir, name_u) + if sys.platform == "win32": + path = filepath.FilePath(path_u) + else: + path = filepath.FilePath(path_u.encode(get_filesystem_encoding())) + + f = open(path.path, "wb") + try: + if temporary and sys.platform != "win32": + os.unlink(path.path) + f.write(data) + finally: + f.close() + if temporary and sys.platform == "win32": + os.unlink(path.path) + self.notify_close_write(path) + + if temporary: + d.addCallback(lambda ign: self.shouldFail(NoSuchChildError, 'temp file not uploaded', None, + self.upload_dirnode.get, name_u)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_disappeared'), + previously_disappeared + 1)) + else: + d.addCallback(lambda ign: self.upload_dirnode.get(name_u)) + d.addCallback(download_to_data) + d.addCallback(lambda actual_data: self.failUnlessReallyEqual(actual_data, data)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_uploaded'), + previously_uploaded + 1)) + + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_queued'), 0)) + return d + + +class MockTest(DropUploadTestMixin, unittest.TestCase): + """This can run on any platform, and even if twisted.internet.inotify can't be imported.""" + + def test_errors(self): + self.basedir = "drop_upload.MockTest.test_errors" + self.set_up_grid() + errors_dir = os.path.join(self.basedir, "errors_dir") + os.mkdir(errors_dir) + + client = self.g.clients[0] + d = client.create_dirnode() + def _made_upload_dir(n): + self.failUnless(IDirectoryNode.providedBy(n)) + upload_uri = n.get_uri() + readonly_uri = n.get_readonly_uri() + + self.shouldFail(AssertionError, 'invalid local dir', 'could not be represented', + DropUploader, client, upload_uri, '\xFF', inotify=fake_inotify) + self.shouldFail(AssertionError, 'non-existant local dir', 'not an existing directory', + DropUploader, client, upload_uri, os.path.join(self.basedir, "Laputa"), inotify=fake_inotify) + + self.shouldFail(AssertionError, 'bad URI', 'not a directory URI', + DropUploader, client, 'bad', errors_dir, inotify=fake_inotify) + self.shouldFail(AssertionError, 'non-directory URI', 'not a directory URI', + DropUploader, client, 'URI:LIT:foo', errors_dir, inotify=fake_inotify) + self.shouldFail(AssertionError, 'readonly directory URI', 'does not refer to a writeable directory', + DropUploader, client, readonly_uri, errors_dir, inotify=fake_inotify) + d.addCallback(_made_upload_dir) + return d + + def test_drop_upload(self): + self.inotify = fake_inotify + self.basedir = "drop_upload.MockTest.test_drop_upload" + return self._test() + + def notify_close_write(self, path): + self.uploader._notifier.event(path, self.inotify.IN_CLOSE_WRITE) + + +class RealTest(DropUploadTestMixin, unittest.TestCase): + """This is skipped unless both Twisted and the platform support inotify.""" + + def test_drop_upload(self): + # We should always have runtime.platform.supportsINotify, because we're using + # Twisted >= 10.1. + if not runtime.platform.supportsINotify(): + raise unittest.SkipTest("Drop-upload support can only be tested for-real on an OS that supports inotify or equivalent.") + + self.inotify = None # use the appropriate inotify for the platform + self.basedir = "drop_upload.RealTest.test_drop_upload" + return self._test() + + def notify_close_write(self, path): + # Writing to the file causes the notification. + pass diff --git a/src/allmydata/test/test_runner.py b/src/allmydata/test/test_runner.py index db2078d3..18d3786a 100644 --- a/src/allmydata/test/test_runner.py +++ b/src/allmydata/test/test_runner.py @@ -253,6 +253,8 @@ class CreateNode(unittest.TestCase): self.failUnless(re.search(r"\n\[storage\]\n#.*\nenabled = true\n", content), content) self.failUnless("\nreserved_space = 1G\n" in content) + self.failUnless(re.search(r"\n\[drop_upload\]\n#.*\nenabled = false\n", content), content) + # creating the node a second time should be rejected rc, out, err = self.run_tahoe(argv) self.failIfEqual(rc, 0, str((out, err, rc))) diff --git a/src/allmydata/util/fake_inotify.py b/src/allmydata/util/fake_inotify.py new file mode 100644 index 00000000..793c5309 --- /dev/null +++ b/src/allmydata/util/fake_inotify.py @@ -0,0 +1,99 @@ + +# Most of this is copied from Twisted 11.0. The reason for this hack is that +# twisted.internet.inotify can't be imported when the platform does not support inotify. + + +# from /usr/src/linux/include/linux/inotify.h + +IN_ACCESS = 0x00000001L # File was accessed +IN_MODIFY = 0x00000002L # File was modified +IN_ATTRIB = 0x00000004L # Metadata changed +IN_CLOSE_WRITE = 0x00000008L # Writeable file was closed +IN_CLOSE_NOWRITE = 0x00000010L # Unwriteable file closed +IN_OPEN = 0x00000020L # File was opened +IN_MOVED_FROM = 0x00000040L # File was moved from X +IN_MOVED_TO = 0x00000080L # File was moved to Y +IN_CREATE = 0x00000100L # Subfile was created +IN_DELETE = 0x00000200L # Subfile was delete +IN_DELETE_SELF = 0x00000400L # Self was deleted +IN_MOVE_SELF = 0x00000800L # Self was moved +IN_UNMOUNT = 0x00002000L # Backing fs was unmounted +IN_Q_OVERFLOW = 0x00004000L # Event queued overflowed +IN_IGNORED = 0x00008000L # File was ignored + +IN_ONLYDIR = 0x01000000 # only watch the path if it is a directory +IN_DONT_FOLLOW = 0x02000000 # don't follow a sym link +IN_MASK_ADD = 0x20000000 # add to the mask of an already existing watch +IN_ISDIR = 0x40000000 # event occurred against dir +IN_ONESHOT = 0x80000000 # only send event once + +IN_CLOSE = IN_CLOSE_WRITE | IN_CLOSE_NOWRITE # closes +IN_MOVED = IN_MOVED_FROM | IN_MOVED_TO # moves +IN_CHANGED = IN_MODIFY | IN_ATTRIB # changes + +IN_WATCH_MASK = (IN_MODIFY | IN_ATTRIB | + IN_CREATE | IN_DELETE | + IN_DELETE_SELF | IN_MOVE_SELF | + IN_UNMOUNT | IN_MOVED_FROM | IN_MOVED_TO) + + +_FLAG_TO_HUMAN = [ + (IN_ACCESS, 'access'), + (IN_MODIFY, 'modify'), + (IN_ATTRIB, 'attrib'), + (IN_CLOSE_WRITE, 'close_write'), + (IN_CLOSE_NOWRITE, 'close_nowrite'), + (IN_OPEN, 'open'), + (IN_MOVED_FROM, 'moved_from'), + (IN_MOVED_TO, 'moved_to'), + (IN_CREATE, 'create'), + (IN_DELETE, 'delete'), + (IN_DELETE_SELF, 'delete_self'), + (IN_MOVE_SELF, 'move_self'), + (IN_UNMOUNT, 'unmount'), + (IN_Q_OVERFLOW, 'queue_overflow'), + (IN_IGNORED, 'ignored'), + (IN_ONLYDIR, 'only_dir'), + (IN_DONT_FOLLOW, 'dont_follow'), + (IN_MASK_ADD, 'mask_add'), + (IN_ISDIR, 'is_dir'), + (IN_ONESHOT, 'one_shot') +] + + + +def humanReadableMask(mask): + """ + Auxiliary function that converts an hexadecimal mask into a series + of human readable flags. + """ + s = [] + for k, v in _FLAG_TO_HUMAN: + if k & mask: + s.append(v) + return s + + +# This class is not copied from Twisted; it acts as a mock. +class INotify(object): + def startReading(self): + pass + + def stopReading(self): + pass + + def watch(self, filepath, mask=IN_WATCH_MASK, autoAdd=False, callbacks=None, recursive=False): + self.callbacks = callbacks + + def event(self, filepath, mask): + for cb in self.callbacks: + cb(None, filepath, mask) + + +__all__ = ["INotify", "humanReadableMask", "IN_WATCH_MASK", "IN_ACCESS", + "IN_MODIFY", "IN_ATTRIB", "IN_CLOSE_NOWRITE", "IN_CLOSE_WRITE", + "IN_OPEN", "IN_MOVED_FROM", "IN_MOVED_TO", "IN_CREATE", + "IN_DELETE", "IN_DELETE_SELF", "IN_MOVE_SELF", "IN_UNMOUNT", + "IN_Q_OVERFLOW", "IN_IGNORED", "IN_ONLYDIR", "IN_DONT_FOLLOW", + "IN_MASK_ADD", "IN_ISDIR", "IN_ONESHOT", "IN_CLOSE", + "IN_MOVED", "IN_CHANGED"] diff --git a/src/allmydata/web/statistics.xhtml b/src/allmydata/web/statistics.xhtml index 3246fd80..b10d6793 100644 --- a/src/allmydata/web/statistics.xhtml +++ b/src/allmydata/web/statistics.xhtml @@ -9,6 +9,8 @@

Node Statistics

+

General

+ +

Drop-Uploader

+ + +

Raw Stats:

 
diff --git a/src/allmydata/web/status.py b/src/allmydata/web/status.py
index fa31f8ec..cdce5ee1 100644
--- a/src/allmydata/web/status.py
+++ b/src/allmydata/web/status.py
@@ -1290,6 +1290,23 @@ class Statistics(rend.Page):
         return "%s files / %s bytes (%s)" % (files, bytes,
                                              abbreviate_size(bytes))
 
+    def render_drop_monitored(self, ctx, data):
+        dirs = data["counters"].get("drop_upload.dirs_monitored", 0)
+        return "%s directories" % (dirs,)
+
+    def render_drop_uploads(self, ctx, data):
+        # TODO: bytes uploaded
+        files = data["counters"].get("drop_upload.files_uploaded", 0)
+        return "%s files" % (files,)
+
+    def render_drop_queued(self, ctx, data):
+        files = data["counters"].get("drop_upload.files_queued", 0)
+        return "%s files" % (files,)
+
+    def render_drop_failed(self, ctx, data):
+        files = data["counters"].get("drop_upload.files_failed", 0)
+        return "%s files" % (files,)
+
     def render_raw(self, ctx, data):
         raw = pprint.pformat(data)
         return ctx.tag[raw]