from allmydata.interfaces import IDirectoryNode
from allmydata.util import log
from allmydata.util.fileutil import precondition_abspath, get_pathinfo
-
from allmydata.util.assertutil import precondition
+from allmydata.util.deferredutil import HookMixin
from allmydata.util.encodingutil import listdir_unicode, to_filepath, \
unicode_from_filepath, quote_local_unicode_path, FilenameEncodingError
from allmydata.immutable.upload import FileName, Data
return service.MultiService.disownServiceParent(self)
-class QueueMixin(object):
+class QueueMixin(HookMixin):
def __init__(self, client, local_path_u, db, name):
self._client = client
self._local_path_u = local_path_u
self._local_path = to_filepath(local_path_u)
self._db = db
self._name = name
+ self._hooks = {'processed': None}
if not self._local_path.exists():
raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
else:
self._lazy_tail.addCallback(lambda ign: self._process(item))
- #self._lazy_tail.addErrback(lambda f: self._log("error: %s" % (f,)))
+ self._lazy_tail.addBoth(self._call_hook, 'processed')
+ self._lazy_tail.addErrback(log.err)
self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._turn_delay, self._turn_deque))
- def _do_callback(self, res):
- if self._ignore_count == 0:
- self._callback(res)
- else:
- self._ignore_count -= 1
- return None # intentionally suppress failures, which have already been logged
-
- def set_callback(self, callback, ignore_count=0):
- """
- set_callback sets a function that will be called after a filesystem change
- (either local or remote) has been processed, successfully or unsuccessfully.
- """
- self._callback = callback
- self._ignore_count = ignore_count
-
class Uploader(QueueMixin):
def __init__(self, client, local_path_u, db, upload_dircap, inotify, pending_delay):
self._log("%r while processing %r" % (f, path_u))
return f
d.addCallbacks(_succeeded, _failed)
- d.addBoth(self._do_callback)
return d
def _get_metadata(self, encoded_name_u):
self._log("download failed: %s" % (str(f),))
self._count('objects_download_failed')
return f
- def remove_from_pending(ign):
- self._pending.remove(name)
d.addCallbacks(succeeded, failed)
- d.addBoth(self._do_callback)
- d.addCallback(remove_from_pending)
+ def remove_from_pending(res):
+ self._pending.remove(name)
+ return res
+ d.addBoth(remove_from_pending)
return d
def _write_downloaded_file(self, name, file_contents):
def _check_move_empty_tree(res):
self.mkdir_nonascii(empty_tree_dir)
- d2 = defer.Deferred()
- self.magicfolder.uploader.set_callback(d2.callback)
+ d2 = self.magicfolder.uploader.set_hook('processed')
os.rename(empty_tree_dir, new_empty_tree_dir)
self.notify(to_filepath(new_empty_tree_dir), self.inotify.IN_MOVED_TO)
return d2
def _check_move_small_tree(res):
self.mkdir_nonascii(small_tree_dir)
fileutil.write(abspath_expanduser_unicode(u"what", base=small_tree_dir), "say when")
- d2 = defer.Deferred()
- self.magicfolder.uploader.set_callback(d2.callback, ignore_count=1)
+ d2 = self.magicfolder.uploader.set_hook('processed', ignore_count=1)
os.rename(small_tree_dir, new_small_tree_dir)
self.notify(to_filepath(new_small_tree_dir), self.inotify.IN_MOVED_TO)
return d2
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created'), 2))
def _check_moved_tree_is_watched(res):
- d2 = defer.Deferred()
- self.magicfolder.uploader.set_callback(d2.callback)
+ d2 = self.magicfolder.uploader.set_hook('processed')
fileutil.write(abspath_expanduser_unicode(u"another", base=new_small_tree_dir), "file")
self.notify(to_filepath(abspath_expanduser_unicode(u"another", base=new_small_tree_dir)), self.inotify.IN_CLOSE_WRITE)
return d2
d.addCallback(self._create_magicfolder)
def create_test_file(result):
- d2 = defer.Deferred()
- self.magicfolder.uploader.set_callback(d2.callback)
+ d2 = self.magicfolder.uploader.set_hook('processed')
test_file = abspath_expanduser_unicode(u"what", base=self.local_dir)
fileutil.write(test_file, "meow")
self.notify(to_filepath(test_file), self.inotify.IN_CLOSE_WRITE)
previously_uploaded = self._get_count('uploader.objects_succeeded')
previously_disappeared = self._get_count('uploader.objects_disappeared')
- # Note: this relies on the fact that we only get one IN_CLOSE_WRITE notification per file
- # (otherwise we would get a defer.AlreadyCalledError). Should we be relying on that?
- d = defer.Deferred()
- self.magicfolder.uploader.set_callback(d.callback)
+ d = self.magicfolder.uploader.set_hook('processed')
path_u = abspath_expanduser_unicode(name_u, base=self.local_dir)
path = to_filepath(path_u)
def Alice_wait_for_upload(result):
print "Alice waits for an upload\n"
- d2 = defer.Deferred()
- self.alice_magicfolder.uploader.set_callback(d2.callback)
+ d2 = self.alice_magicfolder.uploader.set_hook('processed')
return d2
d.addCallback(Alice_wait_for_upload)
d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file1", 0))
def Bob_wait_for_download(result):
print "Bob waits for a download\n"
- d2 = defer.Deferred()
- self.bob_magicfolder.downloader.set_callback(d2.callback)
+ d2 = self.bob_magicfolder.downloader.set_hook('processed')
return d2
d.addCallback(Bob_wait_for_download)
d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 0))
from twisted.internet import defer, reactor
from allmydata.util import log
+from allmydata.util.assertutil import _assert
from allmydata.util.pollmixin import PollMixin
I am a helper mixin that maintains a collection of named hooks, primarily
for use in tests. Each hook is set to an unfired Deferred using 'set_hook',
and can then be fired exactly once at the appropriate time by '_call_hook'.
+ If 'ignore_count' is given, that number of calls to '_call_hook' will be
+ ignored before firing the hook.
I assume a '_hooks' attribute that should set by the class constructor to
a dict mapping each valid hook name to None.
"""
- def set_hook(self, name, d=None):
+ def set_hook(self, name, d=None, ignore_count=0):
"""
Called by the hook observer (e.g. by a test).
If d is not given, an unfired Deferred is created and returned.
"""
if d is None:
d = defer.Deferred()
- assert self._hooks[name] is None, self._hooks[name]
- assert isinstance(d, defer.Deferred), d
- self._hooks[name] = d
+ _assert(ignore_count >= 0, ignore_count=ignore_count)
+ _assert(name in self._hooks, name=name)
+ _assert(self._hooks[name] is None, name=name, hook=self._hooks[name])
+ _assert(isinstance(d, defer.Deferred), d=d)
+
+ self._hooks[name] = (d, ignore_count)
return d
def _call_hook(self, res, name):
"""
- Called to trigger the hook, with argument 'res'. This is a no-op if the
- hook is unset. Otherwise, the hook will be unset, and then its Deferred
- will be fired synchronously.
+ Called to trigger the hook, with argument 'res'. This is a no-op if
+ the hook is unset. If the hook's ignore_count is positive, it will be
+ decremented; if it was already zero, the hook will be unset, and then
+ its Deferred will be fired synchronously.
The expected usage is "deferred.addBoth(self._call_hook, 'hookname')".
This ensures that if 'res' is a failure, the hook will be errbacked,
'res' is returned so that the current result or failure will be passed
through.
"""
- d = self._hooks[name]
- if d is None:
+ hook = self._hooks[name]
+ if hook is None:
return defer.succeed(None)
- self._hooks[name] = None
- _with_log(d.callback, res)
+ (d, ignore_count) = hook
+ if ignore_count > 0:
+ self._hooks[name] = (d, ignore_count - 1)
+ else:
+ self._hooks[name] = None
+ _with_log(d.callback, res)
return res