]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/frontends/magic_folder.py
Minor cleanup and logging fix.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
index a40c8c56af021fe8bbbdfc559c67915b93d06943..766f2ae9a3c2e6077ab556da4870695aa68028c3 100644 (file)
@@ -21,7 +21,7 @@ from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
 from allmydata.immutable.upload import FileName, Data
 from allmydata import magicfolderdb, magicpath
 
-
+defer.setDebugging(True)
 IN_EXCL_UNLINK = 0x04000000L
 
 def get_inotify_module():
@@ -91,10 +91,8 @@ class MagicFolder(service.MultiService):
         """ready is used to signal us to start
         processing the upload and download items...
         """
-        d = self.uploader.start_scanning()
-        d2 = self.downloader.start_scanning()
-        d.addCallback(lambda ign: d2)
-        return d
+        self.uploader.start_uploading()  # synchronous
+        return self.downloader.start_downloading()
 
     def finish(self):
         print "finish"
@@ -158,22 +156,32 @@ class QueueMixin(HookMixin):
         #open("events", "ab+").write(msg)
 
     def _turn_deque(self):
-        self._log("_turn_deque")
-        if self._stopped:
-            self._log("stopped")
-            return
         try:
-            item = self._deque.pop()
-            self._log("popped %r" % (item,))
-            self._count('objects_queued', -1)
-        except IndexError:
-            self._log("deque is now empty")
-            self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
-        else:
-            self._lazy_tail.addCallback(lambda ign: self._process(item))
-            self._lazy_tail.addBoth(self._call_hook, 'processed')
-            self._lazy_tail.addErrback(log.err)
-            self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
+            self._log("_turn_deque")
+            if self._stopped:
+                self._log("stopped")
+                return
+            try:
+                item = self._deque.pop()
+                self._log("popped %r" % (item,))
+                self._count('objects_queued', -1)
+            except IndexError:
+                self._log("deque is now empty")
+                self._lazy_tail.addBoth(self._logcb, "whawhat empty")
+                self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
+                self._lazy_tail.addBoth(self._logcb, "got past _when_queue_is_empty")
+            else:
+                self._log("_turn_deque else clause")
+                self._lazy_tail.addBoth(self._logcb, "whawhat else %r" % (item,))
+                self._lazy_tail.addCallback(lambda ign: self._process(item))
+                self._lazy_tail.addBoth(self._logcb, "got past _process")
+                self._lazy_tail.addBoth(self._call_hook, 'processed')
+                self._lazy_tail.addBoth(self._logcb, "got past _call_hook (turn_delay = %r)" % (self._turn_delay,))
+                self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
+                self._lazy_tail.addBoth(self._logcb, "got past deferLater")
+        except Exception as e:
+            self._log("---- turn deque exception %s" % (e,))
+            raise
 
 
 class Uploader(QueueMixin):
@@ -194,7 +202,9 @@ class Uploader(QueueMixin):
         self._upload_dirnode = upload_dirnode
         self._inotify = get_inotify_module()
         self._notifier = self._inotify.INotify()
-        self._pending = set()
+        self._pending = set()  # of unicode relpaths
+
+        self._periodic_full_scan_duration = 10 * 60 # perform a full scan every 10 minutes
 
         if hasattr(self._notifier, 'set_pending_delay'):
             self._notifier.set_pending_delay(pending_delay)
@@ -224,6 +234,7 @@ class Uploader(QueueMixin):
         self._log("stop")
         self._notifier.stopReading()
         self._count('dirs_monitored', -1)
+        self.periodic_callid.cancel()
         if hasattr(self._notifier, 'wait_until_stopped'):
             d = self._notifier.wait_until_stopped()
         else:
@@ -231,22 +242,45 @@ class Uploader(QueueMixin):
         d.addCallback(lambda ign: self._lazy_tail)
         return d
 
-    def start_scanning(self):
-        self._log("start_scanning")
+    def start_uploading(self):
+        self._log("start_uploading")
         self.is_ready = True
-        self._pending = self._db.get_all_relpaths()
-        self._log("all_files %r" % (self._pending))
-        d = self._scan(u"")
-        def _add_pending(ign):
-            # This adds all of the files that were in the db but not already processed
-            # (normally because they have been deleted on disk).
-            self._log("adding %r" % (self._pending))
-            self._deque.extend(self._pending)
-        d.addCallback(_add_pending)
-        d.addCallback(lambda ign: self._turn_deque())
-        return d
+
+        all_relpaths = self._db.get_all_relpaths()
+        self._log("all relpaths: %r" % (all_relpaths,))
+
+        for relpath_u in all_relpaths:
+            self._add_pending(relpath_u)
+
+        self._full_scan()
+
+    def _extend_queue_and_keep_going(self, relpaths_u):
+        self._log("_extend_queue_and_keep_going %r" % (relpaths_u,))
+        self._deque.extend(relpaths_u)
+        self._count('objects_queued', len(relpaths_u))
+
+        if self.is_ready:
+            if self._immediate:  # for tests
+                self._turn_deque()
+            else:
+                self._clock.callLater(0, self._turn_deque)
+
+    def _full_scan(self):
+        self.periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._full_scan)
+        print "FULL SCAN"
+        self._log("_pending %r" % (self._pending))
+        self._scan(u"")
+        self._extend_queue_and_keep_going(self._pending)
+
+    def _add_pending(self, relpath_u):
+        self._log("add pending %r" % (relpath_u,))        
+        if not magicpath.should_ignore_file(relpath_u):
+            self._pending.add(relpath_u)
 
     def _scan(self, reldir_u):
+        # Scan a directory by (synchronously) adding the paths of all its children to self._pending.
+        # Note that this doesn't add them to the deque -- that will
+
         self._log("scan %r" % (reldir_u,))
         fp = self._get_filepath(reldir_u)
         try:
@@ -258,24 +292,9 @@ class Uploader(QueueMixin):
             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
                             % quote_filepath(fp))
 
-        d = defer.succeed(None)
         for child in children:
             _assert(isinstance(child, unicode), child=child)
-            d.addCallback(lambda ign, child=child:
-                          ("%s/%s" % (reldir_u, child) if reldir_u else child))
-            def _add_pending(relpath_u):
-                if magicpath.should_ignore_file(relpath_u):
-                    return None
-
-                self._pending.add(relpath_u)
-                return relpath_u
-            d.addCallback(_add_pending)
-            # This call to _process doesn't go through the deque, and probably should.
-            d.addCallback(self._process)
-            d.addBoth(self._call_hook, 'processed')
-            d.addErrback(log.err)
-
-        return d
+            self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
 
     def is_pending(self, relpath_u):
         return relpath_u in self._pending
@@ -296,21 +315,14 @@ class Uploader(QueueMixin):
             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
             return
         if relpath_u in self._pending:
-            self._log("ignoring event for %r (already pending)" % (relpath_u,))
+            self._log("not queueing %r because it is already pending" % (relpath_u,))
             return
         if magicpath.should_ignore_file(relpath_u):
             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
             return
 
-        self._log("appending %r to deque" % (relpath_u,))
-        self._deque.append(relpath_u)
         self._pending.add(relpath_u)
-        self._count('objects_queued')
-        if self.is_ready:
-            if self._immediate:  # for tests
-                self._turn_deque()
-            else:
-                self._clock.callLater(0, self._turn_deque)
+        self._extend_queue_and_keep_going([relpath_u])
 
     def _when_queue_is_empty(self):
         return defer.succeed(None)
@@ -325,7 +337,8 @@ class Uploader(QueueMixin):
 
         d = defer.succeed(None)
 
-        def _maybe_upload(val, now=None):
+        def _maybe_upload(ign, now=None):
+            self._log("_maybe_upload: relpath_u=%r, now=%r" % (relpath_u, now))
             if now is None:
                 now = time.time()
             fp = self._get_filepath(relpath_u)
@@ -377,6 +390,7 @@ class Uploader(QueueMixin):
                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
                 return None
             elif pathinfo.isdir:
+                print "ISDIR "
                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
 
@@ -384,14 +398,15 @@ class Uploader(QueueMixin):
                 encoded_path_u += magicpath.path2magic(u"/")
                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
-                def _succeeded(ign):
+                def _dir_succeeded(ign):
                     self._log("created subdirectory %r" % (relpath_u,))
                     self._count('directories_created')
-                def _failed(f):
+                def _dir_failed(f):
                     self._log("failed to create subdirectory %r" % (relpath_u,))
                     return f
-                upload_d.addCallbacks(_succeeded, _failed)
+                upload_d.addCallbacks(_dir_succeeded, _dir_failed)
                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
+                upload_d.addCallback(lambda ign: self._extend_queue_and_keep_going(self._pending))
                 return upload_d
             elif pathinfo.isfile:
                 db_entry = self._db.get_db_entry(relpath_u)
@@ -486,9 +501,11 @@ class WriteFileMixin(object):
         head, tail = os.path.split(abspath_u)
 
         old_mask = os.umask(self._umask)
-        fileutil.make_dirs(head, ~ self._umask)
-        fileutil.write(replacement_path_u, file_contents)
-        os.umask(old_mask)
+        try:
+            fileutil.make_dirs(head, (~ self._umask) & 0777)
+            fileutil.write(replacement_path_u, file_contents)
+        finally:
+            os.umask(old_mask)
 
         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
         if is_conflict:
@@ -541,10 +558,9 @@ class Downloader(QueueMixin, WriteFileMixin):
         self._upload_readonly_dircap = upload_readonly_dircap
         self._is_upload_pending = is_upload_pending
         self._umask = umask
-        self._turn_delay = self.REMOTE_SCAN_INTERVAL
 
-    def start_scanning(self):
-        self._log("start_scanning")
+    def start_downloading(self):
+        self._log("start_downloading")
         files = self._db.get_all_relpaths()
         self._log("all files %s" % files)
 
@@ -678,7 +694,7 @@ class Downloader(QueueMixin, WriteFileMixin):
         return d
 
     def _when_queue_is_empty(self):
-        d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
+        d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
         d.addBoth(self._logcb, "after _scan_remote_collective 1")
         d.addCallback(lambda ign: self._turn_deque())
         return d