from allmydata.immutable.upload import FileName, Data
from allmydata import magicfolderdb, magicpath
-
+defer.setDebugging(True)
IN_EXCL_UNLINK = 0x04000000L
def get_inotify_module():
#open("events", "ab+").write(msg)
def _turn_deque(self):
- self._log("_turn_deque")
- if self._stopped:
- self._log("stopped")
- return
try:
- item = self._deque.pop()
- self._log("popped %r" % (item,))
- self._count('objects_queued', -1)
- except IndexError:
- self._log("deque is now empty")
- self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
- else:
- self._lazy_tail.addCallback(lambda ign: self._process(item))
- self._lazy_tail.addBoth(self._call_hook, 'processed')
- self._lazy_tail.addErrback(log.err)
- self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
+ self._log("_turn_deque")
+ if self._stopped:
+ self._log("stopped")
+ return
+ try:
+ item = self._deque.pop()
+ self._log("popped %r" % (item,))
+ self._count('objects_queued', -1)
+ except IndexError:
+ self._log("deque is now empty")
+ self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
+ else:
+ self._log("_turn_deque else clause")
+ def whawhat(result):
+ self._log("whawhat result %r" % (result,))
+ return result
+ self._lazy_tail.addBoth(whawhat)
+ self._lazy_tail.addCallback(lambda ign: self._process(item))
+ self._lazy_tail.addBoth(self._call_hook, 'processed')
+ self._lazy_tail.addErrback(log.err)
+ self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
+ except Exception as e:
+ self._log("turn deque exception %s" % (e,))
+ raise
class Uploader(QueueMixin):
for relpath_u in all_relpaths:
self._add_pending(relpath_u)
- self._periodic_full_scan(ignore_pending=True)
- self._extend_queue_and_keep_going(self._pending)
+ self._full_scan()
def _extend_queue_and_keep_going(self, relpaths_u):
- self._log("queueing %r" % (relpaths_u,))
+ self._log("_extend_queue_and_keep_going %r" % (relpaths_u,))
self._deque.extend(relpaths_u)
self._count('objects_queued', len(relpaths_u))
else:
self._clock.callLater(0, self._turn_deque)
- def _periodic_full_scan(self, ignore_pending=False):
- self.periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._periodic_full_scan)
- if ignore_pending:
- self._full_scan()
- else:
- if len(self._pending) == 0:
- self._full_scan()
-
def _full_scan(self):
+ self.periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._full_scan)
print "FULL SCAN"
self._log("_pending %r" % (self._pending))
self._scan(u"")
+ self._extend_queue_and_keep_going(self._pending)
def _add_pending(self, relpath_u):
+ self._log("add pending %r" % (relpath_u,))
if not magicpath.should_ignore_file(relpath_u):
self._pending.add(relpath_u)
d = defer.succeed(None)
def _maybe_upload(val, now=None):
+ self._log("_maybe_upload(%r, now=%r)" % (val, now))
if now is None:
now = time.time()
fp = self._get_filepath(relpath_u)
self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
return None
elif pathinfo.isdir:
+ print "ISDIR "
if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)