]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
Daira's excellent fixes to the uploader from our pairing session
authorDavid Stainton <dstainton415@gmail.com>
Thu, 17 Dec 2015 16:19:26 +0000 (16:19 +0000)
committerDaira Hopwood <daira@jacaranda.org>
Tue, 29 Dec 2015 15:17:07 +0000 (15:17 +0000)
src/allmydata/frontends/magic_folder.py

index 06cebf290259d49551b2c35d0bc7d8804a76aa63..315ee70470da411ad09a828c1ef1ad3d2293e8c2 100644 (file)
@@ -91,10 +91,8 @@ class MagicFolder(service.MultiService):
         """ready is used to signal us to start
         processing the upload and download items...
         """
-        d = self.uploader.start_scanning()
-        d2 = self.downloader.start_scanning()
-        d.addCallback(lambda ign: d2)
-        return d
+        self.uploader.start_uploading()  # synchronous
+        return self.downloader.start_downloading()
 
     def finish(self):
         print "finish"
@@ -194,7 +192,7 @@ class Uploader(QueueMixin):
         self._upload_dirnode = upload_dirnode
         self._inotify = get_inotify_module()
         self._notifier = self._inotify.INotify()
-        self._pending = set()
+        self._pending = set()  # of unicode relpaths
 
         if hasattr(self._notifier, 'set_pending_delay'):
             self._notifier.set_pending_delay(pending_delay)
@@ -231,19 +229,43 @@ class Uploader(QueueMixin):
         d.addCallback(lambda ign: self._lazy_tail)
         return d
 
-    def start_scanning(self):
-        self._log("start_scanning")
+    def start_uploading(self):
+        self._log("start_uploading")
         self.is_ready = True
-        return self._full_scan()
+
+        all_relpaths = self._db.get_all_relpaths()
+        self._log("all relpaths: %r" % (all_relpaths,))
+
+        for relpath_u in all_relpaths:
+            self._add_pending(relpath_u)
+
+        self._full_scan()
+        self._extend_queue_and_keep_going(self._pending)
+
+    def _extend_queue_and_keep_going(self, relpaths_u):
+        self._log("queueing %r" % (relpaths_u,))
+        self._deque.extend(relpaths_u)
+        self._count('objects_queued', len(relpaths_u))
+
+        if self.is_ready:
+            if self._immediate:  # for tests
+                self._turn_deque()
+            else:
+                self._clock.callLater(0, self._turn_deque)
 
     def _full_scan(self):
         print "FULL SCAN"
-        self._pending = self._db.get_all_relpaths()
         self._log("all_files %r" % (self._pending))
-        d = self._scan(u"")
-        return d
+        self._scan(u"")
+
+    def _add_pending(self, relpath_u):
+        if not magicpath.should_ignore_file(relpath_u):
+            self._pending.add(relpath_u)
 
     def _scan(self, reldir_u):
+        # Scan a directory by (synchronously) adding the paths of all its children to self._pending.
+        # Note that this doesn't add them to the deque -- that will
+
         self._log("scan %r" % (reldir_u,))
         fp = self._get_filepath(reldir_u)
         try:
@@ -255,22 +277,9 @@ class Uploader(QueueMixin):
             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
                             % quote_filepath(fp))
 
-        d = defer.succeed(None)
         for child in children:
             _assert(isinstance(child, unicode), child=child)
-            d.addCallback(lambda ign, child=child:
-                          ("%s/%s" % (reldir_u, child) if reldir_u else child))
-            def _add_pending(relpath_u):
-                if magicpath.should_ignore_file(relpath_u):
-                    return None
-
-                self._pending.add(relpath_u)
-            d.addCallback(_add_pending)
-        def _add_pending(ign):
-            self._log("adding %r" % (self._pending))
-            self._deque.extend(self._pending)
-        d.addCallback(_add_pending)
-        return d
+            self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
 
     def is_pending(self, relpath_u):
         return relpath_u in self._pending
@@ -291,21 +300,14 @@ class Uploader(QueueMixin):
             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
             return
         if relpath_u in self._pending:
-            self._log("ignoring event for %r (already pending)" % (relpath_u,))
+            self._log("not queueing %r because it is already pending" % (relpath_u,))
             return
         if magicpath.should_ignore_file(relpath_u):
             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
             return
 
-        self._log("appending %r to deque" % (relpath_u,))
-        self._deque.append(relpath_u)
         self._pending.add(relpath_u)
-        self._count('objects_queued')
-        if self.is_ready:
-            if self._immediate:  # for tests
-                self._turn_deque()
-            else:
-                self._clock.callLater(0, self._turn_deque)
+        self._extend_queue_and_keep_going([relpath_u])
 
     def _when_queue_is_empty(self):
         return defer.succeed(None)
@@ -539,8 +541,8 @@ class Downloader(QueueMixin, WriteFileMixin):
         self._is_upload_pending = is_upload_pending
         self._umask = umask
 
-    def start_scanning(self):
-        self._log("start_scanning")
+    def start_downloading(self):
+        self._log("start_downloading")
         files = self._db.get_all_relpaths()
         self._log("all files %s" % files)