]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
Enforce that paths are below the magic folder directory -- WIP.
authorDaira Hopwood <daira@jacaranda.org>
Thu, 17 Sep 2015 14:43:09 +0000 (15:43 +0100)
committerDaira Hopwood <daira@jacaranda.org>
Thu, 17 Sep 2015 14:43:09 +0000 (15:43 +0100)
Signed-off-by: Daira Hopwood <daira@jacaranda.org>
src/allmydata/backupdb.py
src/allmydata/frontends/magic_folder.py

index a51356cae31803b57ae0322dcd50d0f95ced61ca..0263717a16b1d58e6642a8250616199d47ba3030 100644 (file)
@@ -368,34 +368,33 @@ class BackupDB:
 class MagicFolderDB(BackupDB):
     VERSION = 3
 
-    def get_all_files(self):
-        """Retreive a list of all files that have had an entry in magic-folder db
-        (files that have been downloaded at least once).
+    def get_all_relpaths(self):
+        """
+        Retrieve a list of all relpaths of files that have had an entry in magic folder db
+        (i.e. that have been downloaded at least once).
         """
         self.cursor.execute("SELECT path FROM local_files")
         rows = self.cursor.fetchall()
-        if not rows:
-            return None
-        else:
-            return rows
+        return set([r[0] for r in rows])
 
-    def get_local_file_version(self, path):
-        """I will tell you the version of a local file tracked by our magic folder db.
-        If no db entry found then I'll return None.
+    def get_local_file_version(self, relpath_u):
+        """
+        Return the version of a local file tracked by our magic folder db.
+        If no db entry is found then return None.
         """
         c = self.cursor
         c.execute("SELECT version, fileid"
                   " FROM local_files"
                   " WHERE path=?",
-                  (path,))
+                  (relpath_u,))
         row = self.cursor.fetchone()
         if not row:
             return None
         else:
             return row[0]
 
-    def did_upload_file(self, filecap, path, version, mtime, ctime, size):
-        #print "_did_upload_file(%r, %r, %r, %r, %r, %r)" % (filecap, path, version, mtime, ctime, size)
+    def did_upload_file(self, filecap, relpath_u, version, mtime, ctime, size):
+        #print "_did_upload_file(%r, %r, %r, %r, %r, %r)" % (filecap, relpath_u, version, mtime, ctime, size)
         now = time.time()
         fileid = self.get_or_allocate_fileid_for_cap(filecap)
         try:
@@ -408,12 +407,12 @@ class MagicFolderDB(BackupDB):
                                 (now, now, fileid))
         try:
             self.cursor.execute("INSERT INTO local_files VALUES (?,?,?,?,?,?)",
-                                (path, size, mtime, ctime, fileid, version))
+                                (relpath_u, size, mtime, ctime, fileid, version))
         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
             self.cursor.execute("UPDATE local_files"
                                 " SET size=?, mtime=?, ctime=?, fileid=?, version=?"
                                 " WHERE path=?",
-                                (size, mtime, ctime, fileid, version, path))
+                                (size, mtime, ctime, fileid, version, relpath_u))
         self.connection.commit()
 
     def is_new_file_time(self, path, relpath_u):
index a440e6a716330a6707943330f0a62414eea2d9e2..bb7a00ded81231be2c40c3f5f1dbfcca78f3ef1e 100644 (file)
@@ -1,5 +1,5 @@
 
-import sys, os, stat
+import sys, os
 import os.path
 from collections import deque
 import time
@@ -96,17 +96,17 @@ class QueueMixin(HookMixin):
     def __init__(self, client, local_path_u, db, name):
         self._client = client
         self._local_path_u = local_path_u
-        self._local_path = to_filepath(local_path_u)
+        self._local_filepath = to_filepath(local_path_u)
         self._db = db
         self._name = name
         self._hooks = {'processed': None, 'started': None}
         self.started_d = self.set_hook('started')
 
-        if not self._local_path.exists():
+        if not self._local_filepath.exists():
             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
                                  "but there is no directory at that location."
                                  % quote_local_unicode_path(self._local_path_u))
-        if not self._local_path.isdir():
+        if not self._local_filepath.isdir():
             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
                                  "but the thing at that location is not a directory."
                                  % quote_local_unicode_path(self._local_path_u))
@@ -117,6 +117,12 @@ class QueueMixin(HookMixin):
         self._stopped = False
         self._turn_delay = 0
 
+    def _get_abspath(self, relpath_u):
+        return unicode_from_filepath(self._local_filepath.preauthChild(relpath_u))
+
+    def _get_relpath(self, filepath):
+        return u"/".join(filepath.segmentsFrom(self._local_filepath))
+
     def _count(self, counter_name, delta=1):
         self._client.stats_provider.count('magic_folder.%s.%s' % (self._name, counter_name), delta)
 
@@ -126,11 +132,12 @@ class QueueMixin(HookMixin):
         #print s
         #open("events", "ab+").write(msg)
 
-    def _append_to_deque(self, path):
-        if path in self._pending:
+    def _append_to_deque(self, relpath_u):
+        print "_append_to_deque(%r)" % (relpath_u,)
+        if relpath_u in self._pending:
             return
-        self._deque.append(path)
-        self._pending.add(path)
+        self._deque.append(relpath_u)
+        self._pending.add(relpath_u)
         self._count('objects_queued')
         if self.is_ready:
             reactor.callLater(0, self._turn_deque)
@@ -139,12 +146,12 @@ class QueueMixin(HookMixin):
         if self._stopped:
             return
         try:
-            item = self._deque.pop()
+            relpath_u = self._deque.pop()
         except IndexError:
             self._log("deque is now empty")
             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
         else:
-            self._lazy_tail.addCallback(lambda ign: self._process(item))
+            self._lazy_tail.addCallback(lambda ign: self._process(relpath_u))
             self._lazy_tail.addBoth(self._call_hook, 'processed')
             self._lazy_tail.addErrback(log.err)
             self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._turn_delay, self._turn_deque))
@@ -181,7 +188,7 @@ class Uploader(QueueMixin):
                     | self._inotify.IN_ONLYDIR
                     | IN_EXCL_UNLINK
                     )
-        self._notifier.watch(self._local_path, mask=self.mask, callbacks=[self._notify],
+        self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
                              recursive=True)
 
     def start_monitoring(self):
@@ -206,27 +213,34 @@ class Uploader(QueueMixin):
     def start_scanning(self):
         self._log("start_scanning")
         self.is_ready = True
-        all_files = self._db.get_all_files()
-        d = self._scan(self._local_path_u)
-        self._turn_deque()
+        self._pending = self._db.get_all_relpaths()
+        print "all_files %r" % (self._pending)
+        d = self._scan(u"")
+        def _add_pending(ign):
+            # This adds all of the files that were in the db but not already processed
+            # (normally because they have been deleted on disk).
+            print "adding %r" % (self._pending)
+            self._deque.extend(self._pending)
+        d.addCallback(_add_pending)
+        d.addCallback(lambda ign: self._turn_deque())
         return d
 
-    def _scan(self, local_path_u):  # XXX should this take a FilePath?
-        self._log("scan %r" % (local_path_u))
-        if not os.path.isdir(local_path_u):
-            raise AssertionError("Programmer error: _scan() must be passed a directory path.")
-        quoted_path = quote_local_unicode_path(local_path_u)
+    def _scan(self, reldir_u):
+        self._log("scan %r" % (reldir_u,))
+        abspath_u = self._get_abspath(reldir_u)
         try:
-            children = listdir_unicode(local_path_u)
+            children = listdir_unicode(abspath_u)
         except EnvironmentError:
-            raise(Exception("WARNING: magic folder: permission denied on directory %s" % (quoted_path,)))
+            raise Exception("WARNING: magic folder: permission denied on directory %s"
+                            % quote_local_unicode_path(abspath_u))
         except FilenameEncodingError:
-            raise(Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error" % (quoted_path,)))
+            raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
+                            % quote_local_unicode_path(abspath_u))
 
         d = defer.succeed(None)
         for child in children:
             assert isinstance(child, unicode), child
-            d.addCallback(lambda ign, child=child: os.path.join(local_path_u, child))
+            d.addCallback(lambda ign, child=child: os.path.join(reldir_u, child))
             d.addCallback(self._process_child)
             d.addErrback(log.err)
 
@@ -234,70 +248,70 @@ class Uploader(QueueMixin):
 
     def _notify(self, opaque, path, events_mask):
         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
-        path_u = unicode_from_filepath(path)
-        self._append_to_deque(path_u)
+        relpath_u = self._get_relpath(path)
+        self._append_to_deque(relpath_u)
 
     def _when_queue_is_empty(self):
         return defer.succeed(None)
 
-    def _process_child(self, path_u):
-        precondition(isinstance(path_u, unicode), path_u)
+    def _process_child(self, relpath_u):
+        precondition(isinstance(relpath_u, unicode), relpath_u)
 
-        pathinfo = get_pathinfo(path_u)
+        abspath_u = self._get_abspath(relpath_u)
+        pathinfo = get_pathinfo(abspath_u)
 
         if pathinfo.islink:
-            self.warn("WARNING: cannot backup symlink %s" % quote_local_unicode_path(path_u))
+            self.warn("WARNING: cannot backup symlink %s" % quote_local_unicode_path(abspath_u))
             return None
         elif pathinfo.isdir:
             # process directories unconditionally
-            self._append_to_deque(path_u)
+            self._append_to_deque(relpath_u)
 
             # recurse on the child directory
-            return self._scan(path_u)
+            return self._scan(relpath_u)
         elif pathinfo.isfile:
-            file_version = self._db.get_local_file_version(path_u)
+            file_version = self._db.get_local_file_version(relpath_u)
             if file_version is None:
                 # XXX upload if we didn't record our version in magicfolder db?
-                self._append_to_deque(path_u)
+                self._append_to_deque(relpath_u)
                 return None
             else:
-                d2 = self._get_collective_latest_file(path_u)
+                d2 = self._get_collective_latest_file(relpath_u)
                 def _got_latest_file((file_node, metadata)):
                     collective_version = metadata['version']
                     if collective_version is None:
                         return None
                     if file_version > collective_version:
-                        self._append_to_upload_deque(path_u)
+                        self._append_to_upload_deque(relpath_u)
                     elif file_version < collective_version: # FIXME Daira thinks this is wrong
                         # if a collective version of the file is newer than ours
                         # we must download it and unlink the old file from our upload dirnode
-                        self._append_to_download_deque(path_u)
+                        self._append_to_download_deque(relpath_u)
                         # XXX where should we save the returned deferred?
-                        return self._upload_dirnode.delete(path_u, must_be_file=True)
+                        return self._upload_dirnode.delete(relpath_u, must_be_file=True)
                     else:
                         # XXX same version. do nothing.
                         pass
                 d2.addCallback(_got_latest_file)
                 return d2
         else:
-            self.warn("WARNING: cannot backup special file %s" % quote_local_unicode_path(path_u))
+            self.warn("WARNING: cannot backup special file %s" % quote_local_unicode_path(abspath_u))
             return None
 
-    def _process(self, path_u):
-        precondition(isinstance(path_u, unicode), path_u)
+    def _process(self, relpath_u):
+        precondition(isinstance(relpath_u, unicode), relpath_u)
 
         d = defer.succeed(None)
 
         def _maybe_upload(val):
-            pathinfo = get_pathinfo(path_u)
+            abspath_u = self._get_abspath(relpath_u)
+            pathinfo = get_pathinfo(abspath_u)
 
-            self._pending.remove(path_u)  # FIXME make _upload_pending hold relative paths
-            relpath_u = os.path.relpath(path_u, self._local_path_u)
+            self._pending.remove(relpath_u)
             encoded_name_u = magicpath.path2magic(relpath_u)
 
             if not pathinfo.exists:
-                self._log("drop-upload: notified object %r disappeared "
-                          "(this is normal for temporary objects)" % (path_u,))
+                self._log("notified object %s disappeared (this is normal)" % quote_local_unicode_path(abspath_u))
                 self._count('objects_disappeared')
                 d2 = defer.succeed(None)
                 if self._db.check_file_db_exists(relpath_u):
@@ -323,31 +337,30 @@ class Uploader(QueueMixin):
                 d2.addCallback(lambda x: Exception("file does not exist"))  # FIXME wrong
                 return d2
             elif pathinfo.islink:
-                self.warn("WARNING: cannot upload symlink %s" % quote_local_unicode_path(path_u))
+                self.warn("WARNING: cannot upload symlink %s" % quote_local_unicode_path(abspath_u))
                 return None
             elif pathinfo.isdir:
-                self._notifier.watch(to_filepath(path_u), mask=self.mask, callbacks=[self._notify], recursive=True)
+                self._notifier.watch(to_filepath(abspath_u), mask=self.mask, callbacks=[self._notify], recursive=True)
                 uploadable = Data("", self._client.convergence)
                 encoded_name_u += u"@_"
                 upload_d = self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":0}, overwrite=True)
                 def _succeeded(ign):
-                    self._log("created subdirectory %r" % (path_u,))
+                    self._log("created subdirectory %r" % (relpath_u,))
                     self._count('directories_created')
                 def _failed(f):
-                    self._log("failed to create subdirectory %r" % (path_u,))
+                    self._log("failed to create subdirectory %r" % (relpath_u,))
                     return f
                 upload_d.addCallbacks(_succeeded, _failed)
-                upload_d.addCallback(lambda ign: self._scan(path_u))
+                upload_d.addCallback(lambda ign: self._scan(relpath_u))
                 return upload_d
             elif pathinfo.isfile:
                 version = self._db.get_local_file_version(relpath_u)
                 if version is None:
                     version = 0
-                else:
-                    if self._db.is_new_file_time(os.path.join(self._local_path_u, relpath_u), relpath_u):
-                        version += 1
+                elif self._db.is_new_file_time(abspath_u, relpath_u):
+                    version += 1
 
-                uploadable = FileName(path_u, self._client.convergence)
+                uploadable = FileName(abspath_u, self._client.convergence)
                 d2 = self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":version}, overwrite=True)
                 def add_db_entry(filenode):
                     filecap = filenode.get_uri()
@@ -358,7 +371,7 @@ class Uploader(QueueMixin):
                 d2.addCallback(add_db_entry)
                 return d2
             else:
-                self.warn("WARNING: cannot process special file %s" % quote_local_unicode_path(path_u))
+                self.warn("WARNING: cannot process special file %s" % quote_local_unicode_path(abspath_u))
                 return None
 
         d.addCallback(_maybe_upload)
@@ -370,7 +383,7 @@ class Uploader(QueueMixin):
         def _failed(f):
             self._count('objects_queued', -1)
             self._count('objects_failed')
-            self._log("%r while processing %r" % (f, path_u))
+            self._log("%r while processing %r" % (f, relpath_u))
             return f
         d.addCallbacks(_succeeded, _failed)
         return d
@@ -407,7 +420,7 @@ class Downloader(QueueMixin):
 
     def start_scanning(self):
         self._log("\nstart_scanning")
-        files = self._db.get_all_files()
+        files = self._db.get_all_relpaths()
         self._log("all files %s" % files)
 
         d = self._scan_remote_collective()
@@ -429,17 +442,20 @@ class Downloader(QueueMixin):
         v = self._db.get_local_file_version(relpath_u)
         return (v is None or v < remote_version)
 
-    def _get_local_latest(self, path_u):
-        """_get_local_latest takes a unicode path string checks to see if this file object
+    def _get_local_latest(self, relpath_u):
+        """
+        _get_local_latest takes a unicode path string checks to see if this file object
         exists in our magic-folder db; if not then return None
         else check for an entry in our magic-folder db and return the version number.
         """
-        if not os.path.exists(os.path.join(self._local_path_u,path_u)):
+        abspath_u = self._get_abspath(relpath_u)
+        if not os.path.exists(abspath_u):
             return None
-        return self._db.get_local_file_version(path_u)
+        return self._db.get_local_file_version(relpath_u)
 
     def _get_collective_latest_file(self, filename):
-        """_get_collective_latest_file takes a file path pointing to a file managed by
+        """
+        _get_collective_latest_file takes a file path pointing to a file managed by
         magic-folder and returns a deferred that fires with the two tuple containing a
         file node and metadata for the latest version of the file located in the
         magic-folder collective directory.
@@ -518,12 +534,12 @@ class Downloader(QueueMixin):
 
     def _filter_scan_batch(self, result):
         extension = [] # consider whether this should be a dict
-        for name in self._download_scan_batch.keys():
-            if name in self._pending:
+        for relpath_u in self._download_scan_batch.keys():
+            if relpath_u in self._pending:
                 continue
-            file_node, metadata = max(self._download_scan_batch[name], key=lambda x: x[1]['version'])
-            if self._should_download(name, metadata['version']):
-                extension += [(name, file_node, metadata)]
+            file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
+            if self._should_download(relpath_u, metadata['version']):
+                extension += [(relpath_u, file_node, metadata)]
         return extension
 
     def _when_queue_is_empty(self):
@@ -532,22 +548,19 @@ class Downloader(QueueMixin):
         return d
 
     def _process(self, item):
-        (name, file_node, metadata) = item
+        (relpath_u, file_node, metadata) = item
         d = file_node.download_best_version()
         def succeeded(res):
+            abspath_u = self._get_abspath(relpath_u)
             d2 = defer.succeed(res)
-            absname = fileutil.abspath_expanduser_unicode(name, base=self._local_path_u)
-            d2.addCallback(lambda result: self._write_downloaded_file(absname, result, is_conflict=False))
-            def do_update_db(full_path):
+            d2.addCallback(lambda result: self._write_downloaded_file(abspath_u, result, is_conflict=False))
+            def do_update_db(written_abspath_u):
                 filecap = file_node.get_uri()
-                try:
-                    s = os.stat(full_path)
-                except:
-                    raise(Exception("wtf downloaded file %s disappeared" % full_path))
-                size = s[stat.ST_SIZE]
-                ctime = s[stat.ST_CTIME]
-                mtime = s[stat.ST_MTIME]
-                self._db.did_upload_file(filecap, name, metadata['version'], mtime, ctime, size)
+                pathinfo = get_pathinfo(written_abspath_u)
+                if not pathinfo.exists:
+                    raise Exception("downloaded file %s disappeared" % quote_local_unicode_path(written_abspath_u))
+                self._db.did_upload_file(filecap, relpath_u, metadata['version'],
+                                         pathinfo.mtime, pathinfo.ctime, pathinfo.size)
             d2.addCallback(do_update_db)
             # XXX handle failure here with addErrback...
             self._count('objects_downloaded')
@@ -558,7 +571,7 @@ class Downloader(QueueMixin):
             return f
         d.addCallbacks(succeeded, failed)
         def remove_from_pending(res):
-            self._pending.remove(name)
+            self._pending.remove(relpath_u)
             return res
         d.addBoth(remove_from_pending)
         return d
@@ -566,7 +579,7 @@ class Downloader(QueueMixin):
     FUDGE_SECONDS = 10.0
 
     @classmethod
-    def _write_downloaded_file(cls, path, file_contents, is_conflict=False, now=None):
+    def _write_downloaded_file(cls, abspath_u, file_contents, is_conflict=False, now=None):
         # 1. Write a temporary file, say .foo.tmp.
         # 2. is_conflict determines whether this is an overwrite or a conflict.
         # 3. Set the mtime of the replacement file to be T seconds before the
@@ -577,25 +590,25 @@ class Downloader(QueueMixin):
         #
         # Returns the path of the destination file.
 
-        precondition_abspath(path)
-        replacement_path = path + u".tmp"  # FIXME more unique
-        backup_path = path + u".backup"
+        precondition_abspath(abspath_u)
+        replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
+        backup_path_u = abspath_u + u".backup"
         if now is None:
             now = time.time()
 
-        fileutil.write(replacement_path, file_contents)
-        os.utime(replacement_path, (now, now - cls.FUDGE_SECONDS))
+        fileutil.write(replacement_path_u, file_contents)
+        os.utime(replacement_path_u, (now, now - cls.FUDGE_SECONDS))
         if is_conflict:
-            return cls._rename_conflicted_file(path, replacement_path)
+            return cls._rename_conflicted_file(abspath_u, replacement_path_u)
         else:
             try:
-                fileutil.replace_file(path, replacement_path, backup_path)
-                return path
+                fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
+                return abspath_u
             except fileutil.ConflictError:
-                return cls._rename_conflicted_file(path, replacement_path)
+                return cls._rename_conflicted_file(abspath_u, replacement_path_u)
 
     @classmethod
-    def _rename_conflicted_file(self, path, replacement_path):
-        conflict_path = path + u".conflict"
-        fileutil.rename_no_overwrite(replacement_path, conflict_path)
-        return conflict_path
+    def _rename_conflicted_file(self, abspath_u, replacement_path_u):
+        conflict_path_u = abspath_u + u".conflict"
+        fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
+        return conflict_path_u