]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/frontends/magic_folder.py
Daira's excellent fixes to the uploader from our pairing session
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
index 317500a935a51ede05140dbfed04539b338e7b5c..315ee70470da411ad09a828c1ef1ad3d2293e8c2 100644 (file)
@@ -56,7 +56,7 @@ def is_new_file(pathinfo, db_entry):
 class MagicFolder(service.MultiService):
     name = 'magic-folder'
 
-    def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
+    def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
                  pending_delay=1.0, clock=None):
         precondition_abspath(local_path_u)
 
@@ -77,7 +77,7 @@ class MagicFolder(service.MultiService):
 
         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
-                                     upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending)
+                                     upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
 
     def startService(self):
         # TODO: why is this being called more than once?
@@ -91,10 +91,8 @@ class MagicFolder(service.MultiService):
         """ready is used to signal us to start
         processing the upload and download items...
         """
-        d = self.uploader.start_scanning()
-        d2 = self.downloader.start_scanning()
-        d.addCallback(lambda ign: d2)
-        return d
+        self.uploader.start_uploading()  # synchronous
+        return self.downloader.start_downloading()
 
     def finish(self):
         print "finish"
@@ -194,7 +192,7 @@ class Uploader(QueueMixin):
         self._upload_dirnode = upload_dirnode
         self._inotify = get_inotify_module()
         self._notifier = self._inotify.INotify()
-        self._pending = set()
+        self._pending = set()  # of unicode relpaths
 
         if hasattr(self._notifier, 'set_pending_delay'):
             self._notifier.set_pending_delay(pending_delay)
@@ -231,22 +229,43 @@ class Uploader(QueueMixin):
         d.addCallback(lambda ign: self._lazy_tail)
         return d
 
-    def start_scanning(self):
-        self._log("start_scanning")
+    def start_uploading(self):
+        self._log("start_uploading")
         self.is_ready = True
-        self._pending = self._db.get_all_relpaths()
+
+        all_relpaths = self._db.get_all_relpaths()
+        self._log("all relpaths: %r" % (all_relpaths,))
+
+        for relpath_u in all_relpaths:
+            self._add_pending(relpath_u)
+
+        self._full_scan()
+        self._extend_queue_and_keep_going(self._pending)
+
+    def _extend_queue_and_keep_going(self, relpaths_u):
+        self._log("queueing %r" % (relpaths_u,))
+        self._deque.extend(relpaths_u)
+        self._count('objects_queued', len(relpaths_u))
+
+        if self.is_ready:
+            if self._immediate:  # for tests
+                self._turn_deque()
+            else:
+                self._clock.callLater(0, self._turn_deque)
+
+    def _full_scan(self):
+        print "FULL SCAN"
         self._log("all_files %r" % (self._pending))
-        d = self._scan(u"")
-        def _add_pending(ign):
-            # This adds all of the files that were in the db but not already processed
-            # (normally because they have been deleted on disk).
-            self._log("adding %r" % (self._pending))
-            self._deque.extend(self._pending)
-        d.addCallback(_add_pending)
-        d.addCallback(lambda ign: self._turn_deque())
-        return d
+        self._scan(u"")
+
+    def _add_pending(self, relpath_u):
+        if not magicpath.should_ignore_file(relpath_u):
+            self._pending.add(relpath_u)
 
     def _scan(self, reldir_u):
+        # Scan a directory by (synchronously) adding the paths of all its children to self._pending.
+        # Note that this doesn't add them to the deque -- that will
+
         self._log("scan %r" % (reldir_u,))
         fp = self._get_filepath(reldir_u)
         try:
@@ -258,24 +277,9 @@ class Uploader(QueueMixin):
             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
                             % quote_filepath(fp))
 
-        d = defer.succeed(None)
         for child in children:
             _assert(isinstance(child, unicode), child=child)
-            d.addCallback(lambda ign, child=child:
-                          ("%s/%s" % (reldir_u, child) if reldir_u else child))
-            def _add_pending(relpath_u):
-                if magicpath.should_ignore_file(relpath_u):
-                    return None
-
-                self._pending.add(relpath_u)
-                return relpath_u
-            d.addCallback(_add_pending)
-            # This call to _process doesn't go through the deque, and probably should.
-            d.addCallback(self._process)
-            d.addBoth(self._call_hook, 'processed')
-            d.addErrback(log.err)
-
-        return d
+            self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
 
     def is_pending(self, relpath_u):
         return relpath_u in self._pending
@@ -296,21 +300,14 @@ class Uploader(QueueMixin):
             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
             return
         if relpath_u in self._pending:
-            self._log("ignoring event for %r (already pending)" % (relpath_u,))
+            self._log("not queueing %r because it is already pending" % (relpath_u,))
             return
         if magicpath.should_ignore_file(relpath_u):
             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
             return
 
-        self._log("appending %r to deque" % (relpath_u,))
-        self._deque.append(relpath_u)
         self._pending.add(relpath_u)
-        self._count('objects_queued')
-        if self.is_ready:
-            if self._immediate:  # for tests
-                self._turn_deque()
-            else:
-                self._clock.callLater(0, self._turn_deque)
+        self._extend_queue_and_keep_going([relpath_u])
 
     def _when_queue_is_empty(self):
         return defer.succeed(None)
@@ -486,9 +483,11 @@ class WriteFileMixin(object):
         head, tail = os.path.split(abspath_u)
 
         old_mask = os.umask(self._umask)
-        fileutil.make_dirs(head, ~ self._umask)
-        fileutil.write(replacement_path_u, file_contents)
-        os.umask(old_mask)
+        try:
+            fileutil.make_dirs(head, (~ self._umask) & 0777)
+            fileutil.write(replacement_path_u, file_contents)
+        finally:
+            os.umask(old_mask)
 
         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
         if is_conflict:
@@ -527,7 +526,7 @@ class Downloader(QueueMixin, WriteFileMixin):
     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
 
     def __init__(self, client, local_path_u, db, collective_dirnode,
-                 upload_readonly_dircap, clock, is_upload_pending, umask = 0123):
+                 upload_readonly_dircap, clock, is_upload_pending, umask):
         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
 
         if not IDirectoryNode.providedBy(collective_dirnode):
@@ -541,10 +540,9 @@ class Downloader(QueueMixin, WriteFileMixin):
         self._upload_readonly_dircap = upload_readonly_dircap
         self._is_upload_pending = is_upload_pending
         self._umask = umask
-        self._turn_delay = self.REMOTE_SCAN_INTERVAL
 
-    def start_scanning(self):
-        self._log("start_scanning")
+    def start_downloading(self):
+        self._log("start_downloading")
         files = self._db.get_all_relpaths()
         self._log("all files %s" % files)
 
@@ -678,7 +676,7 @@ class Downloader(QueueMixin, WriteFileMixin):
         return d
 
     def _when_queue_is_empty(self):
-        d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
+        d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
         d.addBoth(self._logcb, "after _scan_remote_collective 1")
         d.addCallback(lambda ign: self._turn_deque())
         return d