self._pending = set()
self._callback = lambda ign: None
self._ignore_count = 0
+ self._stopped = False
+ self._turn_delay = 0
def _count(self, counter_name, delta=1):
self._client.stats_provider.count('magic_folder.%s.%s' % (self._name, counter_name), delta)
def _log(self, msg):
- self._client.log("Magic Folder %s: %s" % (self._name, msg))
- #print "_log %s" % (msg,)
+ s = "Magic Folder %s: %s" % (self._name, msg)
+ self._client.log(s)
+ print s
#open("events", "ab+").write(msg)
def _append_to_deque(self, path):
reactor.callLater(0, self._turn_deque)
def _turn_deque(self):
+ if self._stopped:
+ return
try:
- path = self._deque.pop()
+ item = self._deque.pop()
except IndexError:
- self._log("magic folder upload deque is now empty")
- self._lazy_tail = defer.succeed(None)
- return
- self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._process, path))
- self._lazy_tail.addCallback(lambda ign: self._turn_deque())
+ self._log("deque is now empty")
+ self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
+ else:
+ self._lazy_tail.addCallback(lambda ign: self._process(item))
+ #self._lazy_tail.addErrback(lambda f: self._log("error: %s" % (f,)))
+ self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._turn_delay, self._turn_deque))
def _do_callback(self, res):
if self._ignore_count == 0:
path_u = unicode_from_filepath(path)
self._append_to_deque(path_u)
+ def _when_queue_is_empty(self):
+ return defer.succeed(None)
+
def _process(self, path_u):
precondition(isinstance(path_u, unicode), path_u)
d = defer.succeed(None)
if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
- self._remote_scan_delay = 3 # XXX
+ self._turn_delay = 3 # delay between remote scans
self._download_scan_batch = {} # path -> [(filenode, metadata)]
- self._stopped = False
def start_scanning(self):
self._scan_remote_collective()
extension += [(name, file_node, metadata)]
return extension
- def _download_file(self, name, file_node):
+ def _when_queue_is_empty(self):
+ d = task.deferLater(reactor, self._turn_delay, self._scan_remote_collective)
+ d.addCallback(lambda ign: self._turn_deque())
+ return d
+
+ def _process(self, item):
+ (name, file_node, metadata) = item
d = file_node.download_best_version()
def succeeded(res):
d.addCallback(lambda result: self._write_downloaded_file(name, result))
self._count('download_objects_queued')
if self.is_ready:
reactor.callLater(0, self._turn_deque)
-
- # FIXME move to QueueMixin
- def _turn_deque(self):
- if self._stopped:
- return
- try:
- file_path, file_node, metadata = self._deque.pop()
- except IndexError:
- self._log("magic folder upload deque is now empty")
- self._lazy_tail = defer.succeed(None)
- self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._scan_remote_collective))
- self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._turn_deque))
- return
- self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._download_file, file_path, file_node))
- self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._turn_deque))