From: Daira Hopwood Date: Thu, 17 Sep 2015 14:43:09 +0000 (+0100) Subject: Enforce that paths are below the magic folder directory -- WIP. X-Git-Url: https://git.rkrishnan.org/pf/content/en/seg/statistics?a=commitdiff_plain;h=c43381e3572a91a0551ccf13177ef1fd31802251;p=tahoe-lafs%2Ftahoe-lafs.git Enforce that paths are below the magic folder directory -- WIP. Signed-off-by: Daira Hopwood --- diff --git a/src/allmydata/backupdb.py b/src/allmydata/backupdb.py index a51356ca..0263717a 100644 --- a/src/allmydata/backupdb.py +++ b/src/allmydata/backupdb.py @@ -368,34 +368,33 @@ class BackupDB: class MagicFolderDB(BackupDB): VERSION = 3 - def get_all_files(self): - """Retreive a list of all files that have had an entry in magic-folder db - (files that have been downloaded at least once). + def get_all_relpaths(self): + """ + Retrieve a list of all relpaths of files that have had an entry in magic folder db + (i.e. that have been downloaded at least once). """ self.cursor.execute("SELECT path FROM local_files") rows = self.cursor.fetchall() - if not rows: - return None - else: - return rows + return set([r[0] for r in rows]) - def get_local_file_version(self, path): - """I will tell you the version of a local file tracked by our magic folder db. - If no db entry found then I'll return None. + def get_local_file_version(self, relpath_u): + """ + Return the version of a local file tracked by our magic folder db. + If no db entry is found then return None. """ c = self.cursor c.execute("SELECT version, fileid" " FROM local_files" " WHERE path=?", - (path,)) + (relpath_u,)) row = self.cursor.fetchone() if not row: return None else: return row[0] - def did_upload_file(self, filecap, path, version, mtime, ctime, size): - #print "_did_upload_file(%r, %r, %r, %r, %r, %r)" % (filecap, path, version, mtime, ctime, size) + def did_upload_file(self, filecap, relpath_u, version, mtime, ctime, size): + #print "_did_upload_file(%r, %r, %r, %r, %r, %r)" % (filecap, relpath_u, version, mtime, ctime, size) now = time.time() fileid = self.get_or_allocate_fileid_for_cap(filecap) try: @@ -408,12 +407,12 @@ class MagicFolderDB(BackupDB): (now, now, fileid)) try: self.cursor.execute("INSERT INTO local_files VALUES (?,?,?,?,?,?)", - (path, size, mtime, ctime, fileid, version)) + (relpath_u, size, mtime, ctime, fileid, version)) except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError): self.cursor.execute("UPDATE local_files" " SET size=?, mtime=?, ctime=?, fileid=?, version=?" " WHERE path=?", - (size, mtime, ctime, fileid, version, path)) + (size, mtime, ctime, fileid, version, relpath_u)) self.connection.commit() def is_new_file_time(self, path, relpath_u): diff --git a/src/allmydata/frontends/magic_folder.py b/src/allmydata/frontends/magic_folder.py index a440e6a7..bb7a00de 100644 --- a/src/allmydata/frontends/magic_folder.py +++ b/src/allmydata/frontends/magic_folder.py @@ -1,5 +1,5 @@ -import sys, os, stat +import sys, os import os.path from collections import deque import time @@ -96,17 +96,17 @@ class QueueMixin(HookMixin): def __init__(self, client, local_path_u, db, name): self._client = client self._local_path_u = local_path_u - self._local_path = to_filepath(local_path_u) + self._local_filepath = to_filepath(local_path_u) self._db = db self._name = name self._hooks = {'processed': None, 'started': None} self.started_d = self.set_hook('started') - if not self._local_path.exists(): + if not self._local_filepath.exists(): raise AssertionError("The '[magic_folder] local.directory' parameter was %s " "but there is no directory at that location." % quote_local_unicode_path(self._local_path_u)) - if not self._local_path.isdir(): + if not self._local_filepath.isdir(): raise AssertionError("The '[magic_folder] local.directory' parameter was %s " "but the thing at that location is not a directory." % quote_local_unicode_path(self._local_path_u)) @@ -117,6 +117,12 @@ class QueueMixin(HookMixin): self._stopped = False self._turn_delay = 0 + def _get_abspath(self, relpath_u): + return unicode_from_filepath(self._local_filepath.preauthChild(relpath_u)) + + def _get_relpath(self, filepath): + return u"/".join(filepath.segmentsFrom(self._local_filepath)) + def _count(self, counter_name, delta=1): self._client.stats_provider.count('magic_folder.%s.%s' % (self._name, counter_name), delta) @@ -126,11 +132,12 @@ class QueueMixin(HookMixin): #print s #open("events", "ab+").write(msg) - def _append_to_deque(self, path): - if path in self._pending: + def _append_to_deque(self, relpath_u): + print "_append_to_deque(%r)" % (relpath_u,) + if relpath_u in self._pending: return - self._deque.append(path) - self._pending.add(path) + self._deque.append(relpath_u) + self._pending.add(relpath_u) self._count('objects_queued') if self.is_ready: reactor.callLater(0, self._turn_deque) @@ -139,12 +146,12 @@ class QueueMixin(HookMixin): if self._stopped: return try: - item = self._deque.pop() + relpath_u = self._deque.pop() except IndexError: self._log("deque is now empty") self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty()) else: - self._lazy_tail.addCallback(lambda ign: self._process(item)) + self._lazy_tail.addCallback(lambda ign: self._process(relpath_u)) self._lazy_tail.addBoth(self._call_hook, 'processed') self._lazy_tail.addErrback(log.err) self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._turn_delay, self._turn_deque)) @@ -181,7 +188,7 @@ class Uploader(QueueMixin): | self._inotify.IN_ONLYDIR | IN_EXCL_UNLINK ) - self._notifier.watch(self._local_path, mask=self.mask, callbacks=[self._notify], + self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify], recursive=True) def start_monitoring(self): @@ -206,27 +213,34 @@ class Uploader(QueueMixin): def start_scanning(self): self._log("start_scanning") self.is_ready = True - all_files = self._db.get_all_files() - d = self._scan(self._local_path_u) - self._turn_deque() + self._pending = self._db.get_all_relpaths() + print "all_files %r" % (self._pending) + d = self._scan(u"") + def _add_pending(ign): + # This adds all of the files that were in the db but not already processed + # (normally because they have been deleted on disk). + print "adding %r" % (self._pending) + self._deque.extend(self._pending) + d.addCallback(_add_pending) + d.addCallback(lambda ign: self._turn_deque()) return d - def _scan(self, local_path_u): # XXX should this take a FilePath? - self._log("scan %r" % (local_path_u)) - if not os.path.isdir(local_path_u): - raise AssertionError("Programmer error: _scan() must be passed a directory path.") - quoted_path = quote_local_unicode_path(local_path_u) + def _scan(self, reldir_u): + self._log("scan %r" % (reldir_u,)) + abspath_u = self._get_abspath(reldir_u) try: - children = listdir_unicode(local_path_u) + children = listdir_unicode(abspath_u) except EnvironmentError: - raise(Exception("WARNING: magic folder: permission denied on directory %s" % (quoted_path,))) + raise Exception("WARNING: magic folder: permission denied on directory %s" + % quote_local_unicode_path(abspath_u)) except FilenameEncodingError: - raise(Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error" % (quoted_path,))) + raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error" + % quote_local_unicode_path(abspath_u)) d = defer.succeed(None) for child in children: assert isinstance(child, unicode), child - d.addCallback(lambda ign, child=child: os.path.join(local_path_u, child)) + d.addCallback(lambda ign, child=child: os.path.join(reldir_u, child)) d.addCallback(self._process_child) d.addErrback(log.err) @@ -234,70 +248,70 @@ class Uploader(QueueMixin): def _notify(self, opaque, path, events_mask): self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask)))) - path_u = unicode_from_filepath(path) - self._append_to_deque(path_u) + relpath_u = self._get_relpath(path) + self._append_to_deque(relpath_u) def _when_queue_is_empty(self): return defer.succeed(None) - def _process_child(self, path_u): - precondition(isinstance(path_u, unicode), path_u) + def _process_child(self, relpath_u): + precondition(isinstance(relpath_u, unicode), relpath_u) - pathinfo = get_pathinfo(path_u) + abspath_u = self._get_abspath(relpath_u) + pathinfo = get_pathinfo(abspath_u) if pathinfo.islink: - self.warn("WARNING: cannot backup symlink %s" % quote_local_unicode_path(path_u)) + self.warn("WARNING: cannot backup symlink %s" % quote_local_unicode_path(abspath_u)) return None elif pathinfo.isdir: # process directories unconditionally - self._append_to_deque(path_u) + self._append_to_deque(relpath_u) # recurse on the child directory - return self._scan(path_u) + return self._scan(relpath_u) elif pathinfo.isfile: - file_version = self._db.get_local_file_version(path_u) + file_version = self._db.get_local_file_version(relpath_u) if file_version is None: # XXX upload if we didn't record our version in magicfolder db? - self._append_to_deque(path_u) + self._append_to_deque(relpath_u) return None else: - d2 = self._get_collective_latest_file(path_u) + d2 = self._get_collective_latest_file(relpath_u) def _got_latest_file((file_node, metadata)): collective_version = metadata['version'] if collective_version is None: return None if file_version > collective_version: - self._append_to_upload_deque(path_u) + self._append_to_upload_deque(relpath_u) elif file_version < collective_version: # FIXME Daira thinks this is wrong # if a collective version of the file is newer than ours # we must download it and unlink the old file from our upload dirnode - self._append_to_download_deque(path_u) + self._append_to_download_deque(relpath_u) # XXX where should we save the returned deferred? - return self._upload_dirnode.delete(path_u, must_be_file=True) + return self._upload_dirnode.delete(relpath_u, must_be_file=True) else: # XXX same version. do nothing. pass d2.addCallback(_got_latest_file) return d2 else: - self.warn("WARNING: cannot backup special file %s" % quote_local_unicode_path(path_u)) + self.warn("WARNING: cannot backup special file %s" % quote_local_unicode_path(abspath_u)) return None - def _process(self, path_u): - precondition(isinstance(path_u, unicode), path_u) + def _process(self, relpath_u): + precondition(isinstance(relpath_u, unicode), relpath_u) d = defer.succeed(None) def _maybe_upload(val): - pathinfo = get_pathinfo(path_u) + abspath_u = self._get_abspath(relpath_u) + pathinfo = get_pathinfo(abspath_u) - self._pending.remove(path_u) # FIXME make _upload_pending hold relative paths - relpath_u = os.path.relpath(path_u, self._local_path_u) + self._pending.remove(relpath_u) encoded_name_u = magicpath.path2magic(relpath_u) if not pathinfo.exists: - self._log("drop-upload: notified object %r disappeared " - "(this is normal for temporary objects)" % (path_u,)) + self._log("notified object %s disappeared (this is normal)" % quote_local_unicode_path(abspath_u)) self._count('objects_disappeared') d2 = defer.succeed(None) if self._db.check_file_db_exists(relpath_u): @@ -323,31 +337,30 @@ class Uploader(QueueMixin): d2.addCallback(lambda x: Exception("file does not exist")) # FIXME wrong return d2 elif pathinfo.islink: - self.warn("WARNING: cannot upload symlink %s" % quote_local_unicode_path(path_u)) + self.warn("WARNING: cannot upload symlink %s" % quote_local_unicode_path(abspath_u)) return None elif pathinfo.isdir: - self._notifier.watch(to_filepath(path_u), mask=self.mask, callbacks=[self._notify], recursive=True) + self._notifier.watch(to_filepath(abspath_u), mask=self.mask, callbacks=[self._notify], recursive=True) uploadable = Data("", self._client.convergence) encoded_name_u += u"@_" upload_d = self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":0}, overwrite=True) def _succeeded(ign): - self._log("created subdirectory %r" % (path_u,)) + self._log("created subdirectory %r" % (relpath_u,)) self._count('directories_created') def _failed(f): - self._log("failed to create subdirectory %r" % (path_u,)) + self._log("failed to create subdirectory %r" % (relpath_u,)) return f upload_d.addCallbacks(_succeeded, _failed) - upload_d.addCallback(lambda ign: self._scan(path_u)) + upload_d.addCallback(lambda ign: self._scan(relpath_u)) return upload_d elif pathinfo.isfile: version = self._db.get_local_file_version(relpath_u) if version is None: version = 0 - else: - if self._db.is_new_file_time(os.path.join(self._local_path_u, relpath_u), relpath_u): - version += 1 + elif self._db.is_new_file_time(abspath_u, relpath_u): + version += 1 - uploadable = FileName(path_u, self._client.convergence) + uploadable = FileName(abspath_u, self._client.convergence) d2 = self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":version}, overwrite=True) def add_db_entry(filenode): filecap = filenode.get_uri() @@ -358,7 +371,7 @@ class Uploader(QueueMixin): d2.addCallback(add_db_entry) return d2 else: - self.warn("WARNING: cannot process special file %s" % quote_local_unicode_path(path_u)) + self.warn("WARNING: cannot process special file %s" % quote_local_unicode_path(abspath_u)) return None d.addCallback(_maybe_upload) @@ -370,7 +383,7 @@ class Uploader(QueueMixin): def _failed(f): self._count('objects_queued', -1) self._count('objects_failed') - self._log("%r while processing %r" % (f, path_u)) + self._log("%r while processing %r" % (f, relpath_u)) return f d.addCallbacks(_succeeded, _failed) return d @@ -407,7 +420,7 @@ class Downloader(QueueMixin): def start_scanning(self): self._log("\nstart_scanning") - files = self._db.get_all_files() + files = self._db.get_all_relpaths() self._log("all files %s" % files) d = self._scan_remote_collective() @@ -429,17 +442,20 @@ class Downloader(QueueMixin): v = self._db.get_local_file_version(relpath_u) return (v is None or v < remote_version) - def _get_local_latest(self, path_u): - """_get_local_latest takes a unicode path string checks to see if this file object + def _get_local_latest(self, relpath_u): + """ + _get_local_latest takes a unicode path string checks to see if this file object exists in our magic-folder db; if not then return None else check for an entry in our magic-folder db and return the version number. """ - if not os.path.exists(os.path.join(self._local_path_u,path_u)): + abspath_u = self._get_abspath(relpath_u) + if not os.path.exists(abspath_u): return None - return self._db.get_local_file_version(path_u) + return self._db.get_local_file_version(relpath_u) def _get_collective_latest_file(self, filename): - """_get_collective_latest_file takes a file path pointing to a file managed by + """ + _get_collective_latest_file takes a file path pointing to a file managed by magic-folder and returns a deferred that fires with the two tuple containing a file node and metadata for the latest version of the file located in the magic-folder collective directory. @@ -518,12 +534,12 @@ class Downloader(QueueMixin): def _filter_scan_batch(self, result): extension = [] # consider whether this should be a dict - for name in self._download_scan_batch.keys(): - if name in self._pending: + for relpath_u in self._download_scan_batch.keys(): + if relpath_u in self._pending: continue - file_node, metadata = max(self._download_scan_batch[name], key=lambda x: x[1]['version']) - if self._should_download(name, metadata['version']): - extension += [(name, file_node, metadata)] + file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version']) + if self._should_download(relpath_u, metadata['version']): + extension += [(relpath_u, file_node, metadata)] return extension def _when_queue_is_empty(self): @@ -532,22 +548,19 @@ class Downloader(QueueMixin): return d def _process(self, item): - (name, file_node, metadata) = item + (relpath_u, file_node, metadata) = item d = file_node.download_best_version() def succeeded(res): + abspath_u = self._get_abspath(relpath_u) d2 = defer.succeed(res) - absname = fileutil.abspath_expanduser_unicode(name, base=self._local_path_u) - d2.addCallback(lambda result: self._write_downloaded_file(absname, result, is_conflict=False)) - def do_update_db(full_path): + d2.addCallback(lambda result: self._write_downloaded_file(abspath_u, result, is_conflict=False)) + def do_update_db(written_abspath_u): filecap = file_node.get_uri() - try: - s = os.stat(full_path) - except: - raise(Exception("wtf downloaded file %s disappeared" % full_path)) - size = s[stat.ST_SIZE] - ctime = s[stat.ST_CTIME] - mtime = s[stat.ST_MTIME] - self._db.did_upload_file(filecap, name, metadata['version'], mtime, ctime, size) + pathinfo = get_pathinfo(written_abspath_u) + if not pathinfo.exists: + raise Exception("downloaded file %s disappeared" % quote_local_unicode_path(written_abspath_u)) + self._db.did_upload_file(filecap, relpath_u, metadata['version'], + pathinfo.mtime, pathinfo.ctime, pathinfo.size) d2.addCallback(do_update_db) # XXX handle failure here with addErrback... self._count('objects_downloaded') @@ -558,7 +571,7 @@ class Downloader(QueueMixin): return f d.addCallbacks(succeeded, failed) def remove_from_pending(res): - self._pending.remove(name) + self._pending.remove(relpath_u) return res d.addBoth(remove_from_pending) return d @@ -566,7 +579,7 @@ class Downloader(QueueMixin): FUDGE_SECONDS = 10.0 @classmethod - def _write_downloaded_file(cls, path, file_contents, is_conflict=False, now=None): + def _write_downloaded_file(cls, abspath_u, file_contents, is_conflict=False, now=None): # 1. Write a temporary file, say .foo.tmp. # 2. is_conflict determines whether this is an overwrite or a conflict. # 3. Set the mtime of the replacement file to be T seconds before the @@ -577,25 +590,25 @@ class Downloader(QueueMixin): # # Returns the path of the destination file. - precondition_abspath(path) - replacement_path = path + u".tmp" # FIXME more unique - backup_path = path + u".backup" + precondition_abspath(abspath_u) + replacement_path_u = abspath_u + u".tmp" # FIXME more unique + backup_path_u = abspath_u + u".backup" if now is None: now = time.time() - fileutil.write(replacement_path, file_contents) - os.utime(replacement_path, (now, now - cls.FUDGE_SECONDS)) + fileutil.write(replacement_path_u, file_contents) + os.utime(replacement_path_u, (now, now - cls.FUDGE_SECONDS)) if is_conflict: - return cls._rename_conflicted_file(path, replacement_path) + return cls._rename_conflicted_file(abspath_u, replacement_path_u) else: try: - fileutil.replace_file(path, replacement_path, backup_path) - return path + fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u) + return abspath_u except fileutil.ConflictError: - return cls._rename_conflicted_file(path, replacement_path) + return cls._rename_conflicted_file(abspath_u, replacement_path_u) @classmethod - def _rename_conflicted_file(self, path, replacement_path): - conflict_path = path + u".conflict" - fileutil.rename_no_overwrite(replacement_path, conflict_path) - return conflict_path + def _rename_conflicted_file(self, abspath_u, replacement_path_u): + conflict_path_u = abspath_u + u".conflict" + fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u) + return conflict_path_u