From: David Stainton Date: Thu, 17 Dec 2015 16:19:26 +0000 (+0000) Subject: Daira's excellent fixes to the uploader from our pairing session X-Git-Url: https://git.rkrishnan.org/%5B/frontends/flags/index.php?a=commitdiff_plain;h=243ca2884a07d6a38862ea29cae3f85b171508e3;p=tahoe-lafs%2Ftahoe-lafs.git Daira's excellent fixes to the uploader from our pairing session --- diff --git a/src/allmydata/frontends/magic_folder.py b/src/allmydata/frontends/magic_folder.py index 06cebf29..315ee704 100644 --- a/src/allmydata/frontends/magic_folder.py +++ b/src/allmydata/frontends/magic_folder.py @@ -91,10 +91,8 @@ class MagicFolder(service.MultiService): """ready is used to signal us to start processing the upload and download items... """ - d = self.uploader.start_scanning() - d2 = self.downloader.start_scanning() - d.addCallback(lambda ign: d2) - return d + self.uploader.start_uploading() # synchronous + return self.downloader.start_downloading() def finish(self): print "finish" @@ -194,7 +192,7 @@ class Uploader(QueueMixin): self._upload_dirnode = upload_dirnode self._inotify = get_inotify_module() self._notifier = self._inotify.INotify() - self._pending = set() + self._pending = set() # of unicode relpaths if hasattr(self._notifier, 'set_pending_delay'): self._notifier.set_pending_delay(pending_delay) @@ -231,19 +229,43 @@ class Uploader(QueueMixin): d.addCallback(lambda ign: self._lazy_tail) return d - def start_scanning(self): - self._log("start_scanning") + def start_uploading(self): + self._log("start_uploading") self.is_ready = True - return self._full_scan() + + all_relpaths = self._db.get_all_relpaths() + self._log("all relpaths: %r" % (all_relpaths,)) + + for relpath_u in all_relpaths: + self._add_pending(relpath_u) + + self._full_scan() + self._extend_queue_and_keep_going(self._pending) + + def _extend_queue_and_keep_going(self, relpaths_u): + self._log("queueing %r" % (relpaths_u,)) + self._deque.extend(relpaths_u) + self._count('objects_queued', len(relpaths_u)) + + if self.is_ready: + if self._immediate: # for tests + self._turn_deque() + else: + self._clock.callLater(0, self._turn_deque) def _full_scan(self): print "FULL SCAN" - self._pending = self._db.get_all_relpaths() self._log("all_files %r" % (self._pending)) - d = self._scan(u"") - return d + self._scan(u"") + + def _add_pending(self, relpath_u): + if not magicpath.should_ignore_file(relpath_u): + self._pending.add(relpath_u) def _scan(self, reldir_u): + # Scan a directory by (synchronously) adding the paths of all its children to self._pending. + # Note that this doesn't add them to the deque -- that will + self._log("scan %r" % (reldir_u,)) fp = self._get_filepath(reldir_u) try: @@ -255,22 +277,9 @@ class Uploader(QueueMixin): raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error" % quote_filepath(fp)) - d = defer.succeed(None) for child in children: _assert(isinstance(child, unicode), child=child) - d.addCallback(lambda ign, child=child: - ("%s/%s" % (reldir_u, child) if reldir_u else child)) - def _add_pending(relpath_u): - if magicpath.should_ignore_file(relpath_u): - return None - - self._pending.add(relpath_u) - d.addCallback(_add_pending) - def _add_pending(ign): - self._log("adding %r" % (self._pending)) - self._deque.extend(self._pending) - d.addCallback(_add_pending) - return d + self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child) def is_pending(self, relpath_u): return relpath_u in self._pending @@ -291,21 +300,14 @@ class Uploader(QueueMixin): self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,)) return if relpath_u in self._pending: - self._log("ignoring event for %r (already pending)" % (relpath_u,)) + self._log("not queueing %r because it is already pending" % (relpath_u,)) return if magicpath.should_ignore_file(relpath_u): self._log("ignoring event for %r (ignorable path)" % (relpath_u,)) return - self._log("appending %r to deque" % (relpath_u,)) - self._deque.append(relpath_u) self._pending.add(relpath_u) - self._count('objects_queued') - if self.is_ready: - if self._immediate: # for tests - self._turn_deque() - else: - self._clock.callLater(0, self._turn_deque) + self._extend_queue_and_keep_going([relpath_u]) def _when_queue_is_empty(self): return defer.succeed(None) @@ -539,8 +541,8 @@ class Downloader(QueueMixin, WriteFileMixin): self._is_upload_pending = is_upload_pending self._umask = umask - def start_scanning(self): - self._log("start_scanning") + def start_downloading(self): + self._log("start_downloading") files = self._db.get_all_relpaths() self._log("all files %s" % files)