]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/frontends/magic_folder.py
Only perform full scan if pending set is empty
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
index 06cebf290259d49551b2c35d0bc7d8804a76aa63..abb0a1ff5e4d77252af98b29d0b8dde77a00933f 100644 (file)
@@ -91,10 +91,8 @@ class MagicFolder(service.MultiService):
         """ready is used to signal us to start
         processing the upload and download items...
         """
-        d = self.uploader.start_scanning()
-        d2 = self.downloader.start_scanning()
-        d.addCallback(lambda ign: d2)
-        return d
+        self.uploader.start_uploading()  # synchronous
+        return self.downloader.start_downloading()
 
     def finish(self):
         print "finish"
@@ -194,7 +192,9 @@ class Uploader(QueueMixin):
         self._upload_dirnode = upload_dirnode
         self._inotify = get_inotify_module()
         self._notifier = self._inotify.INotify()
-        self._pending = set()
+        self._pending = set()  # of unicode relpaths
+
+        self._periodic_full_scan_duration = 10 * 60 # perform a full scan every 10 minutes
 
         if hasattr(self._notifier, 'set_pending_delay'):
             self._notifier.set_pending_delay(pending_delay)
@@ -224,6 +224,7 @@ class Uploader(QueueMixin):
         self._log("stop")
         self._notifier.stopReading()
         self._count('dirs_monitored', -1)
+        self.periodic_callid.cancel()
         if hasattr(self._notifier, 'wait_until_stopped'):
             d = self._notifier.wait_until_stopped()
         else:
@@ -231,19 +232,48 @@ class Uploader(QueueMixin):
         d.addCallback(lambda ign: self._lazy_tail)
         return d
 
-    def start_scanning(self):
-        self._log("start_scanning")
+    def start_uploading(self):
+        self._log("start_uploading")
         self.is_ready = True
-        return self._full_scan()
+
+        all_relpaths = self._db.get_all_relpaths()
+        self._log("all relpaths: %r" % (all_relpaths,))
+
+        for relpath_u in all_relpaths:
+            self._add_pending(relpath_u)
+
+        self._periodic_full_scan()
+        self._extend_queue_and_keep_going(self._pending)
+
+    def _extend_queue_and_keep_going(self, relpaths_u):
+        self._log("queueing %r" % (relpaths_u,))
+        self._deque.extend(relpaths_u)
+        self._count('objects_queued', len(relpaths_u))
+
+        if self.is_ready:
+            if self._immediate:  # for tests
+                self._turn_deque()
+            else:
+                self._clock.callLater(0, self._turn_deque)
+
+    def _periodic_full_scan(self):
+        self.periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._periodic_full_scan)
+        if len(self._pending) == 0:
+            self._full_scan()
 
     def _full_scan(self):
         print "FULL SCAN"
-        self._pending = self._db.get_all_relpaths()
-        self._log("all_files %r" % (self._pending))
-        d = self._scan(u"")
-        return d
+        self._log("_pending %r" % (self._pending))
+        self._scan(u"")
+
+    def _add_pending(self, relpath_u):
+        if not magicpath.should_ignore_file(relpath_u):
+            self._pending.add(relpath_u)
 
     def _scan(self, reldir_u):
+        # Scan a directory by (synchronously) adding the paths of all its children to self._pending.
+        # Note that this doesn't add them to the deque -- that will
+
         self._log("scan %r" % (reldir_u,))
         fp = self._get_filepath(reldir_u)
         try:
@@ -255,22 +285,9 @@ class Uploader(QueueMixin):
             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
                             % quote_filepath(fp))
 
-        d = defer.succeed(None)
         for child in children:
             _assert(isinstance(child, unicode), child=child)
-            d.addCallback(lambda ign, child=child:
-                          ("%s/%s" % (reldir_u, child) if reldir_u else child))
-            def _add_pending(relpath_u):
-                if magicpath.should_ignore_file(relpath_u):
-                    return None
-
-                self._pending.add(relpath_u)
-            d.addCallback(_add_pending)
-        def _add_pending(ign):
-            self._log("adding %r" % (self._pending))
-            self._deque.extend(self._pending)
-        d.addCallback(_add_pending)
-        return d
+            self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
 
     def is_pending(self, relpath_u):
         return relpath_u in self._pending
@@ -291,21 +308,14 @@ class Uploader(QueueMixin):
             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
             return
         if relpath_u in self._pending:
-            self._log("ignoring event for %r (already pending)" % (relpath_u,))
+            self._log("not queueing %r because it is already pending" % (relpath_u,))
             return
         if magicpath.should_ignore_file(relpath_u):
             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
             return
 
-        self._log("appending %r to deque" % (relpath_u,))
-        self._deque.append(relpath_u)
         self._pending.add(relpath_u)
-        self._count('objects_queued')
-        if self.is_ready:
-            if self._immediate:  # for tests
-                self._turn_deque()
-            else:
-                self._clock.callLater(0, self._turn_deque)
+        self._extend_queue_and_keep_going([relpath_u])
 
     def _when_queue_is_empty(self):
         return defer.succeed(None)
@@ -379,14 +389,15 @@ class Uploader(QueueMixin):
                 encoded_path_u += magicpath.path2magic(u"/")
                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
-                def _succeeded(ign):
+                def _dir_succeeded(ign):
                     self._log("created subdirectory %r" % (relpath_u,))
                     self._count('directories_created')
-                def _failed(f):
+                def _dir_failed(f):
                     self._log("failed to create subdirectory %r" % (relpath_u,))
                     return f
-                upload_d.addCallbacks(_succeeded, _failed)
+                upload_d.addCallbacks(_dir_succeeded, _dir_failed)
                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
+                upload_d.addCallback(lambda ign: self._extend_queue_and_keep_going(self._pending))
                 return upload_d
             elif pathinfo.isfile:
                 db_entry = self._db.get_db_entry(relpath_u)
@@ -539,8 +550,8 @@ class Downloader(QueueMixin, WriteFileMixin):
         self._is_upload_pending = is_upload_pending
         self._umask = umask
 
-    def start_scanning(self):
-        self._log("start_scanning")
+    def start_downloading(self):
+        self._log("start_downloading")
         files = self._db.get_all_relpaths()
         self._log("all files %s" % files)