-import sys
+import sys, os, stat
+from collections import deque
-from twisted.internet import defer
-from twisted.python.filepath import FilePath
+from twisted.internet import defer, reactor, task
+from twisted.python.failure import Failure
from twisted.application import service
-from foolscap.api import eventually
-from allmydata.interfaces import IDirectoryNode
+from allmydata.interfaces import IDirectoryNode, NoSuchChildError
from allmydata.util.fileutil import abspath_expanduser_unicode, precondition_abspath
from allmydata.util.encodingutil import listdir_unicode, to_filepath, \
service.MultiService.__init__(self)
self._local_dir = abspath_expanduser_unicode(local_dir)
+ self._upload_lazy_tail = defer.succeed(None)
+ self._pending = set()
self._client = client
self._stats_provider = client.stats_provider
self._convergence = client.convergence
self._local_path = to_filepath(self._local_dir)
self._dbfile = dbfile
+ self._upload_deque = deque()
self.is_upload_ready = False
if inotify is None:
# possibly-incomplete file before the application has closed it. There should always
# be an IN_CLOSE_WRITE after an IN_CREATE (I think).
# TODO: what about IN_MOVE_SELF or IN_UNMOUNT?
- mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO | inotify.IN_ONLYDIR
- self._notifier.watch(self._local_path, mask=mask, callbacks=[self._notify])
+ self.mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO | inotify.IN_ONLYDIR
+ self._notifier.watch(self._local_path, mask=self.mask, callbacks=[self._notify],
+ autoAdd=True, recursive=True)
def _check_db_file(self, childpath):
# returns True if the file must be uploaded.
if filecap is False:
return True
+ def _scan(self, localpath):
+ if not os.path.isdir(localpath):
+ raise AssertionError("Programmer error: _scan() must be passed a directory path.")
+ quoted_path = quote_local_unicode_path(localpath)
+ try:
+ children = listdir_unicode(localpath)
+ except EnvironmentError:
+ raise(Exception("WARNING: magic folder: permission denied on directory %s" % (quoted_path,)))
+ except FilenameEncodingError:
+ raise(Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error" % (quoted_path,)))
+
+ for child in children:
+ assert isinstance(child, unicode), child
+ childpath = os.path.join(localpath, child)
+ # note: symlinks to directories are both islink() and isdir()
+ isdir = os.path.isdir(childpath)
+ isfile = os.path.isfile(childpath)
+ islink = os.path.islink(childpath)
+
+ if islink:
+ self.warn("WARNING: cannot backup symlink %s" % quote_local_unicode_path(childpath))
+ elif isdir:
+ must_upload = self._check_db_file(childpath)
+ if must_upload:
+ self._append_to_deque(childpath)
+
+ # recurse on the child directory
+ self._scan(childpath)
+ elif isfile:
+ must_upload = self._check_db_file(childpath)
+ if must_upload:
+ self._append_to_deque(childpath)
+ else:
+ self.warn("WARNING: cannot backup special file %s" % quote_local_unicode_path(childpath))
+
def startService(self):
self._db = backupdb.get_backupdb(self._dbfile)
if self._db is None:
service.MultiService.startService(self)
d = self._notifier.startReading()
+
+ self._scan(self._local_dir)
+
self._stats_provider.count('drop_upload.dirs_monitored', 1)
return d
+ def _add_to_dequeue(self, path):
+ # XXX stub function. fix me later.
+ #print "adding file to upload queue %s" % (path,)
+ pass
+
+ def Pause(self):
+ self.is_upload_ready = False
+
+ def Resume(self):
+ self.is_upload_ready = True
+ # XXX
+ self._turn_deque()
+
def upload_ready(self):
"""upload_ready is used to signal us to start
processing the upload items...
"""
self.is_upload_ready = True
+ self._turn_deque()
+
+ def _append_to_deque(self, path):
+ self._upload_deque.append(path)
+ self._pending.add(path)
+ self._stats_provider.count('drop_upload.objects_queued', 1)
+ if self.is_upload_ready:
+ reactor.callLater(0, self._turn_deque)
+
+ def _turn_deque(self):
+ try:
+ path = self._upload_deque.pop()
+ except IndexError:
+ self._log("magic folder upload deque is now empty")
+ self._upload_lazy_tail = defer.succeed(None)
+ return
+ self._upload_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._process, path))
+ self._upload_lazy_tail.addCallback(lambda ign: self._turn_deque())
def _notify(self, opaque, path, events_mask):
self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
+ path_u = unicode_from_filepath(path)
+ if path_u not in self._pending:
+ self._append_to_deque(path_u)
- self._stats_provider.count('drop_upload.objects_queued', 1)
- eventually(self._process, opaque, path, events_mask)
-
- def _process(self, opaque, path, events_mask):
+ def _process(self, path):
d = defer.succeed(None)
- # FIXME: if this already exists as a mutable file, we replace the directory entry,
- # but we should probably modify the file (as the SFTP frontend does).
- def _add_file(ign):
- name = path.basename()
- # on Windows the name is already Unicode
- if not isinstance(name, unicode):
- name = name.decode(get_filesystem_encoding())
-
- u = FileName(path.path, self._convergence)
+ # FIXME (ticket #1712): if this already exists as a mutable file, we replace the
+ # directory entry, but we should probably modify the file (as the SFTP frontend does).
+ def _add_file(ignore, name):
+ u = FileName(path, self._convergence)
return self._parent.add_file(name, u)
- d.addCallback(_add_file)
+
+ def _add_dir(ignore, name):
+ self._notifier.watch(to_filepath(path), mask=self.mask, callbacks=[self._notify], autoAdd=True, recursive=True)
+ d2 = self._parent.create_subdirectory(name)
+ d2.addCallback(lambda ign: self._log("created subdirectory %r" % (path,)))
+ d2.addCallback(lambda ign: self._scan(path))
+ return d2
+
+ def _maybe_upload(val):
+ self._pending.remove(path)
+ name = os.path.basename(path)
+
+ if not os.path.exists(path):
+ self._log("uploader: not uploading non-existent file.")
+ self._stats_provider.count('drop_upload.objects_disappeared', 1)
+ return NoSuchChildError("not uploading non-existent file")
+ elif os.path.islink(path):
+ self._log("operator ERROR: symlink not being processed.")
+ return Failure()
+
+ if os.path.isdir(path):
+ d.addCallback(_add_dir, name)
+ self._stats_provider.count('drop_upload.directories_created', 1)
+ return None
+ elif os.path.isfile(path):
+ d.addCallback(_add_file, name)
+ def add_db_entry(filenode):
+ filecap = filenode.get_uri()
+ s = os.stat(path)
+ size = s[stat.ST_SIZE]
+ ctime = s[stat.ST_CTIME]
+ mtime = s[stat.ST_MTIME]
+ self._db.did_upload_file(filecap, path, mtime, ctime, size)
+ d.addCallback(add_db_entry)
+ self._stats_provider.count('drop_upload.files_uploaded', 1)
+ return None
+ else:
+ self._log("operator ERROR: non-directory/non-regular file not being processed.")
+ return Failure()
+
+ d.addCallback(_maybe_upload)
def _succeeded(ign):
self._stats_provider.count('drop_upload.objects_queued', -1)
else:
return defer.succeed(None)
+ def remove_service(self):
+ return service.MultiService.disownServiceParent(self)
+
def _log(self, msg):
self._client.log(msg)
#open("events", "ab+").write(msg)
-import os, sys
+import os, sys, stat, time
from twisted.trial import unittest
from twisted.python import runtime
from allmydata.test.common import ShouldFailMixin
from allmydata.frontends.drop_upload import DropUploader
+from allmydata.scripts import backupdb
from allmydata.util.fileutil import abspath_expanduser_unicode
GridTestMixin.setUp(self)
temp = self.mktemp()
self.basedir = abspath_expanduser_unicode(temp.decode(get_filesystem_encoding()))
+ self.uploader = None
+ self.dir_node = None
+
def _get_count(self, name):
return self.stats_provider.get_stats()["counters"].get(name, 0)
- def _test(self):
- self.uploader = None
+ def _createdb(self):
+ dbfile = abspath_expanduser_unicode(u"magicfolderdb.sqlite", base=self.basedir)
+ bdb = backupdb.get_backupdb(dbfile)
+ self.failUnless(bdb, "unable to create backupdb from %r" % (dbfile,))
+ self.failUnlessEqual(bdb.VERSION, 2)
+ return bdb
+
+ def _made_upload_dir(self, n):
+ if self.dir_node == None:
+ self.dir_node = n
+ else:
+ n = self.dir_node
+ self.failUnless(IDirectoryNode.providedBy(n))
+ self.upload_dirnode = n
+ self.upload_dircap = n.get_uri()
+
+ def _create_uploader(self, ign):
+ dbfile = abspath_expanduser_unicode(u"magicfolderdb.sqlite", base=self.basedir)
+ self.uploader = DropUploader(self.client, self.upload_dircap, self.local_dir,
+ dbfile, inotify=self.inotify, pending_delay=0.2)
+ self.uploader.setServiceParent(self.client)
+ self.uploader.upload_ready()
+ self.failUnlessEqual(self.uploader._db.VERSION, 2)
+
+ # Prevent unclean reactor errors.
+ def _cleanup(self, res):
+ d = defer.succeed(None)
+ if self.uploader is not None:
+ d.addCallback(lambda ign: self.uploader.finish(for_tests=True))
+ d.addCallback(lambda ign: res)
+ return d
+
+ def test_db_basic(self):
+ fileutil.make_dirs(self.basedir)
+ self._createdb()
+
+ def test_db_persistence(self):
+ """Test that a file upload creates an entry in the database."""
+
+ fileutil.make_dirs(self.basedir)
+ db = self._createdb()
+
+ path = abspath_expanduser_unicode(u"myFile1", base=self.basedir)
+ db.did_upload_file('URI:LIT:1', path, 0, 0, 33)
+
+ c = db.cursor
+ c.execute("SELECT size,mtime,ctime,fileid"
+ " FROM local_files"
+ " WHERE path=?",
+ (path,))
+ row = db.cursor.fetchone()
+ self.failIfEqual(row, None)
+
+ # Second test uses db.check_file instead of SQL query directly
+ # to confirm the previous upload entry in the db.
+ path = abspath_expanduser_unicode(u"myFile2", base=self.basedir)
+ fileutil.write(path, "meow\n")
+ s = os.stat(path)
+ size = s[stat.ST_SIZE]
+ ctime = s[stat.ST_CTIME]
+ mtime = s[stat.ST_MTIME]
+ db.did_upload_file('URI:LIT:2', path, mtime, ctime, size)
+ r = db.check_file(path)
+ self.failUnless(r.was_uploaded())
+
+ def test_uploader_start_service(self):
+ self.set_up_grid()
+
+ self.local_dir = abspath_expanduser_unicode(self.unicode_or_fallback(u"l\u00F8cal_dir", u"local_dir"),
+ base=self.basedir)
+ self.mkdir_nonascii(self.local_dir)
+
+ self.client = self.g.clients[0]
+ self.stats_provider = self.client.stats_provider
+
+ d = self.client.create_dirnode()
+ d.addCallback(self._made_upload_dir)
+ d.addCallback(self._create_uploader)
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.dirs_monitored'), 1))
+ d.addBoth(self._cleanup)
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.dirs_monitored'), 0))
+ return d
+
+ def test_move_tree(self):
+ self.set_up_grid()
+
+ self.local_dir = abspath_expanduser_unicode(self.unicode_or_fallback(u"l\u00F8cal_dir", u"local_dir"),
+ base=self.basedir)
+ self.mkdir_nonascii(self.local_dir)
+
+ self.client = self.g.clients[0]
+ self.stats_provider = self.client.stats_provider
+
+ empty_tree_name = self.unicode_or_fallback(u"empty_tr\u00EAe", u"empty_tree")
+ empty_tree_dir = abspath_expanduser_unicode(empty_tree_name, base=self.basedir)
+ new_empty_tree_dir = abspath_expanduser_unicode(empty_tree_name, base=self.local_dir)
+
+ small_tree_name = self.unicode_or_fallback(u"small_tr\u00EAe", u"empty_tree")
+ small_tree_dir = abspath_expanduser_unicode(small_tree_name, base=self.basedir)
+ new_small_tree_dir = abspath_expanduser_unicode(small_tree_name, base=self.local_dir)
+
+ d = self.client.create_dirnode()
+ d.addCallback(self._made_upload_dir)
+
+ d.addCallback(self._create_uploader)
+
+ def _check_move_empty_tree(res):
+ self.mkdir_nonascii(empty_tree_dir)
+ d2 = defer.Deferred()
+ self.uploader.set_uploaded_callback(d2.callback, ignore_count=0)
+ os.rename(empty_tree_dir, new_empty_tree_dir)
+ self.notify_close_write(to_filepath(new_empty_tree_dir))
+ return d2
+ d.addCallback(_check_move_empty_tree)
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_uploaded'), 1))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_uploaded'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_queued'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.directories_created'), 1))
+
+ def _check_move_small_tree(res):
+ self.mkdir_nonascii(small_tree_dir)
+ fileutil.write(abspath_expanduser_unicode(u"what", base=small_tree_dir), "say when")
+ d2 = defer.Deferred()
+ self.uploader.set_uploaded_callback(d2.callback, ignore_count=1)
+ os.rename(small_tree_dir, new_small_tree_dir)
+ self.notify_close_write(to_filepath(new_small_tree_dir))
+ return d2
+ d.addCallback(_check_move_small_tree)
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_uploaded'), 3))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_uploaded'), 1))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_queued'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.directories_created'), 2))
+
+ def _check_moved_tree_is_watched(res):
+ d2 = defer.Deferred()
+ self.uploader.set_uploaded_callback(d2.callback, ignore_count=0)
+ fileutil.write(abspath_expanduser_unicode(u"another", base=new_small_tree_dir), "file")
+ self.notify_close_write(to_filepath(abspath_expanduser_unicode(u"another", base=new_small_tree_dir)))
+ return d2
+ d.addCallback(_check_moved_tree_is_watched)
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_uploaded'), 4))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_uploaded'), 2))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_queued'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.directories_created'), 2))
+
+ d.addBoth(self._cleanup)
+ return d
+
+ def test_persistence(self):
+ """
+ Perform an upload of a given file and then stop the client.
+ Start a new client and uploader... and verify that the file is NOT uploaded
+ a second time. This test is meant to test the database persistence along with
+ the startup and shutdown code paths of the uploader.
+ """
+ self.set_up_grid()
+ self.local_dir = abspath_expanduser_unicode(u"test_persistence", base=self.basedir)
+ self.mkdir_nonascii(self.local_dir)
+
+ self.client = self.g.clients[0]
+ self.stats_provider = self.client.stats_provider
+ d = self.client.create_dirnode()
+ d.addCallback(self._made_upload_dir)
+ d.addCallback(self._create_uploader)
+
+ def create_file(val):
+ d2 = defer.Deferred()
+ self.uploader.set_uploaded_callback(d2.callback)
+ test_file = abspath_expanduser_unicode(u"what", base=self.local_dir)
+ fileutil.write(test_file, "meow")
+ self.notify_close_write(to_filepath(test_file))
+ return d2
+ d.addCallback(create_file)
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_uploaded'), 1))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_queued'), 0))
+ d.addCallback(self._cleanup)
+
+ def _restart(ign):
+ self.set_up_grid()
+ self.client = self.g.clients[0]
+ self.stats_provider = self.client.stats_provider
+ d.addCallback(_restart)
+ d.addCallback(self._create_uploader)
+ d.addCallback(lambda ign: time.sleep(3))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_uploaded'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_queued'), 0))
+ d.addBoth(self._cleanup)
+ return d
+
+ def test_drop_upload(self):
self.set_up_grid()
self.local_dir = os.path.join(self.basedir, self.unicode_or_fallback(u"loc\u0101l_dir", u"local_dir"))
self.mkdir_nonascii(self.local_dir)
self.stats_provider = self.client.stats_provider
d = self.client.create_dirnode()
- def _made_upload_dir(n):
- self.failUnless(IDirectoryNode.providedBy(n))
- self.upload_dirnode = n
- self.upload_dircap = n.get_uri()
- self.uploader = DropUploader(self.client, self.upload_dircap, self.local_dir.encode('utf-8'),
- inotify=self.inotify)
- return self.uploader.startService()
- d.addCallback(_made_upload_dir)
+
+ d.addCallback(self._made_upload_dir)
+ d.addCallback(self._create_uploader)
# Write something short enough for a LIT file.
- d.addCallback(lambda ign: self._test_file(u"short", "test"))
+ d.addCallback(lambda ign: self._check_file(u"short", "test"))
# Write to the same file again with different data.
- d.addCallback(lambda ign: self._test_file(u"short", "different"))
+ d.addCallback(lambda ign: self._check_file(u"short", "different"))
# Test that temporary files are not uploaded.
- d.addCallback(lambda ign: self._test_file(u"tempfile", "test", temporary=True))
+ d.addCallback(lambda ign: self._check_file(u"tempfile", "test", temporary=True))
# Test that we tolerate creation of a subdirectory.
d.addCallback(lambda ign: os.mkdir(os.path.join(self.local_dir, u"directory")))
# Write something longer, and also try to test a Unicode name if the fs can represent it.
name_u = self.unicode_or_fallback(u"l\u00F8ng", u"long")
- d.addCallback(lambda ign: self._test_file(name_u, "test"*100))
+ d.addCallback(lambda ign: self._check_file(name_u, "test"*100))
# TODO: test that causes an upload failure.
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_failed'), 0))
- # Prevent unclean reactor errors.
- def _cleanup(res):
- d = defer.succeed(None)
- if self.uploader is not None:
- d.addCallback(lambda ign: self.uploader.finish(for_tests=True))
- d.addCallback(lambda ign: res)
- return d
- d.addBoth(_cleanup)
+ d.addBoth(self._cleanup)
return d
- def _test_file(self, name_u, data, temporary=False):
- previously_uploaded = self._get_count('drop_upload.files_uploaded')
- previously_disappeared = self._get_count('drop_upload.files_disappeared')
+ def _check_file(self, name_u, data, temporary=False):
+ previously_uploaded = self._get_count('drop_upload.objects_uploaded')
+ previously_disappeared = self._get_count('drop_upload.objects_disappeared')
d = defer.Deferred()
if temporary:
d.addCallback(lambda ign: self.shouldFail(NoSuchChildError, 'temp file not uploaded', None,
self.upload_dirnode.get, name_u))
- d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_disappeared'),
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_disappeared'),
previously_disappeared + 1))
else:
d.addCallback(lambda ign: self.upload_dirnode.get(name_u))
d.addCallback(download_to_data)
d.addCallback(lambda actual_data: self.failUnlessReallyEqual(actual_data, data))
- d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_uploaded'),
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_uploaded'),
previously_uploaded + 1))
- d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_queued'), 0))
+ d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.objects_queued'), 0))
return d
class MockTest(DropUploadTestMixin, unittest.TestCase):
"""This can run on any platform, and even if twisted.internet.inotify can't be imported."""
+ def setUp(self):
+ DropUploadTestMixin.setUp(self)
+ self.inotify = fake_inotify
+
+ def notify_close_write(self, path):
+ self.uploader._notifier.event(path, self.inotify.IN_CLOSE_WRITE)
+
def test_errors(self):
self.set_up_grid()
client = self.g.clients[0]
d = client.create_dirnode()
- def _made_upload_dir(n):
+ def _check_errors(n):
self.failUnless(IDirectoryNode.providedBy(n))
upload_dircap = n.get_uri()
readonly_dircap = n.get_readonly_uri()
self.shouldFail(AssertionError, 'nonexistent local.directory', 'there is no directory',
- DropUploader, client, upload_dircap, doesnotexist, inotify=fake_inotify)
+ DropUploader, client, upload_dircap, doesnotexist, magicfolderdb, inotify=fake_inotify)
self.shouldFail(AssertionError, 'non-directory local.directory', 'is not a directory',
- DropUploader, client, upload_dircap, not_a_dir, inotify=fake_inotify)
+ DropUploader, client, upload_dircap, not_a_dir, magicfolderdb, inotify=fake_inotify)
self.shouldFail(AssertionError, 'bad upload.dircap', 'does not refer to a directory',
- DropUploader, client, 'bad', errors_dir, inotify=fake_inotify)
+ DropUploader, client, 'bad', errors_dir, magicfolderdb, inotify=fake_inotify)
self.shouldFail(AssertionError, 'non-directory upload.dircap', 'does not refer to a directory',
- DropUploader, client, 'URI:LIT:foo', errors_dir, inotify=fake_inotify)
+ DropUploader, client, 'URI:LIT:foo', errors_dir, magicfolderdb, inotify=fake_inotify)
self.shouldFail(AssertionError, 'readonly upload.dircap', 'is not a writecap to a directory',
- DropUploader, client, readonly_dircap, errors_dir, inotify=fake_inotify)
- d.addCallback(_made_upload_dir)
+ DropUploader, client, readonly_dircap, errors_dir, magicfolderdb, inotify=fake_inotify)
+ d.addCallback(_check_errors)
return d
- def test_drop_upload(self):
- self.inotify = fake_inotify
- self.basedir = "drop_upload.MockTest.test_drop_upload"
- return self._test()
-
- def notify_close_write(self, path):
- self.uploader._notifier.event(path, self.inotify.IN_CLOSE_WRITE)
-
class RealTest(DropUploadTestMixin, unittest.TestCase):
"""This is skipped unless both Twisted and the platform support inotify."""
- def test_drop_upload(self):
- self.inotify = None # use the appropriate inotify for the platform
- self.basedir = "drop_upload.RealTest.test_drop_upload"
- return self._test()
+ def setUp(self):
+ DropUploadTestMixin.setUp(self)
+ self.inotify = None
def notify_close_write(self, path):
# Writing to the file causes the notification.