From: Daira Hopwood Date: Thu, 23 Jul 2015 23:27:26 +0000 (+0100) Subject: Refactor MagicFolder into Uploader and Downloader classes (WIP). refs ticket:2477 X-Git-Url: https://git.rkrishnan.org/simplejson/%22news.html/status?a=commitdiff_plain;h=045611f92e16ffbdf02c78e9b623d263ae3ccfad;p=tahoe-lafs%2Ftahoe-lafs.git Refactor MagicFolder into Uploader and Downloader classes (WIP). refs ticket:2477 Signed-off-by: Daira Hopwood --- diff --git a/src/allmydata/frontends/magic_folder.py b/src/allmydata/frontends/magic_folder.py index 7532f2b1..68107f3f 100644 --- a/src/allmydata/frontends/magic_folder.py +++ b/src/allmydata/frontends/magic_folder.py @@ -46,55 +46,93 @@ class MagicFolder(service.MultiService): precondition_abspath(local_dir) service.MultiService.__init__(self) - self._stopped = False - self._remote_scan_delay = 3 # XXX - self._local_dir = local_dir - self._upload_lazy_tail = defer.succeed(None) - self._upload_pending = set() - self._download_scan_batch = {} # path -> [(filenode, metadata)] - self._download_lazy_tail = defer.succeed(None) - self._download_pending = set() - self._collective_dirnode = None - self._client = client - self._stats_provider = client.stats_provider - self._convergence = client.convergence - self._local_path = to_filepath(self._local_dir) - self._dbfile = dbfile + local_path = to_filepath(local_dir) - self._download_deque = deque() - self._upload_deque = deque() - self.is_ready = False + db = backupdb.get_backupdb(dbfile, create_version=(backupdb.SCHEMA_v3, 3)) + if db is None: + return Failure(Exception('ERROR: Unable to load magic folder db.')) - self._inotify = inotify or get_inotify_module() + self.is_ready = False - if not self._local_path.exists(): + if not local_path.exists(): raise AssertionError("The '[magic_folder] local.directory' parameter was %s " "but there is no directory at that location." % quote_local_unicode_path(local_dir)) - if not self._local_path.isdir(): + if not local_path.isdir(): raise AssertionError("The '[magic_folder] local.directory' parameter was %s " "but the thing at that location is not a directory." % quote_local_unicode_path(local_dir)) + self.uploader = Uploader(client, local_path, db, upload_dircap, inotify, pending_delay) + self.downloader = Downloader(client, local_path, db, collective_dircap) + + def startService(self): + service.MultiService.startService(self) + return self.uploader.start_monitoring() + + def ready(self): + """ready is used to signal us to start + processing the upload and download items... + """ + self.is_ready = True + self.uploader.start_scanning() + self.downloader.start_scanning() + + def finish(self): + d = self.uploader.stop() + d.addBoth(lambda ign: self.downloader.stop()) + return d + + def remove_service(self): + return service.MultiService.disownServiceParent(self) + + +class QueueMixin(object): + def __init__(self, client, counter, local_path, db): + self._client = client + self._counter = client.stats_provider.count + self._local_path = local_path + self._db = db + + self._deque = deque() + self._lazy_tail = defer.succeed(None) + self._pending = set() + self._processed_callback = lambda ign: None + self._ignore_count = 0 + + def _do_callback(self, res): + if self._ignore_count == 0: + self._callback(res) + else: + self._ignore_count -= 1 + return None # intentionally suppress failures, which have already been logged + + def set_callback(self, callback, ignore_count=0): + """ + set_callback sets a function that will be called after a filesystem change + (either local or remote) has been processed, successfully or unsuccessfully. + """ + self._callback = callback + self._ignore_count = ignore_count + + def _log(self, msg): + self._client.log("Magic Folder: " + msg) + #print "_log %s" % (msg,) + #open("events", "ab+").write(msg) + + +class Uploader(QueueMixin): + def __init__(self, client, local_path, db, upload_dircap, inotify, pending_delay): + QueueMixin.__init__(self, client, local_path, db) + # TODO: allow a path rather than a cap URI. self._upload_dirnode = self._client.create_node_from_uri(upload_dircap) - self._collective_dirnode = self._client.create_node_from_uri(collective_dircap) - if not IDirectoryNode.providedBy(self._upload_dirnode): raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.") if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly(): raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.") - if not IDirectoryNode.providedBy(self._collective_dirnode): - raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.") - if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly(): - raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.") - - self._processed_callback = lambda ign: None - self._download_callback = lambda ign: None - self._upload_ignore_count = 0 - self._download_ignore_count = 0 - + self._inotify = inotify or get_inotify_module() self._notifier = self._inotify.INotify() if hasattr(self._notifier, 'set_pending_delay'): @@ -115,126 +153,24 @@ class MagicFolder(service.MultiService): self._notifier.watch(self._local_path, mask=self.mask, callbacks=[self._notify], recursive=True) - def _should_download(self, relpath_u, remote_version): - """ - _should_download returns a bool indicating whether or not a remote object should be downloaded. - We check the remote metadata version against our magic-folder db version number; - latest version wins. - """ - v = self._db.get_local_file_version(relpath_u) - return (v is None or v < remote_version) - - def _get_collective_latest_file(self, filename): - """_get_collective_latest_file takes a file path pointing to a file managed by - magic-folder and returns a deferred that fires with the two tuple containing a - file node and metadata for the latest version of the file located in the - magic-folder collective directory. - """ - upload_readonly_dircap = self._upload_dirnode.get_readonly_uri() - collective_dirmap_d = self._collective_dirnode.list() - def do_filter(result): - print result - others = [x for x in result.keys() if result[x][0].get_readonly_uri() != upload_readonly_dircap] - return result, others - collective_dirmap_d.addCallback(do_filter) - def scan_collective(result): - list_of_deferreds = [] - collective_dirmap, others_list = result - for dir_name in result: - # XXX make sure it's a directory - d = defer.succeed(None) - d.addCallback(lambda x, dir_name=dir_name: collective_dirmap[dir_name][0].get_child_and_metadata(filename)) - list_of_deferreds.append(d) - deferList = defer.DeferredList(list_of_deferreds) - return deferList - collective_dirmap_d.addCallback(scan_collective) - def highest_version(deferredList): - max_version = 0 - metadata = None - node = None - for success, result in deferredList: - if success: - if result[1]['version'] > max_version: - node, metadata = result - max_version = result[1]['version'] - return node, metadata - collective_dirmap_d.addCallback(highest_version) - return collective_dirmap_d + def start_monitoring(self): + d = self._notifier.startReading() + self._counter('magic_folder.dirs_monitored', 1) + return d - def _scan_remote(self, nickname, dirnode): - listing_d = dirnode.list() - self._download_scan_batch = {} - def scan_listing(listing_map): - for name in listing_map.keys(): - file_node, metadata = listing_map[name] - if self._download_scan_batch.has_key(name): - self._download_scan_batch[name] += [(file_node, metadata)] - else: - self._download_scan_batch[name] = [(file_node, metadata)] - listing_d.addCallback(scan_listing) - return listing_d + def stop(self): + self._notifier.stopReading() + self._counter('magic_folder.dirs_monitored', -1) - def _scan_remote_collective(self): - if self._collective_dirnode is None: - return - upload_readonly_dircap = self._upload_dirnode.get_readonly_uri() - collective_dirmap_d = self._collective_dirnode.list() - def do_filter(result): - others = [x for x in result.keys() if result[x][0].get_readonly_uri() != upload_readonly_dircap] - return result, others - collective_dirmap_d.addCallback(do_filter) - def scan_collective(result): + if hasattr(self._notifier, 'wait_until_stopped'): + d = self._notifier.wait_until_stopped() + else: d = defer.succeed(None) - collective_dirmap, others_list = result - for dir_name in others_list: - d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0])) - # XXX todo add errback - return d - collective_dirmap_d.addCallback(scan_collective) - collective_dirmap_d.addCallback(self._filter_scan_batch) - collective_dirmap_d.addCallback(self._add_batch_to_download_queue) - return collective_dirmap_d - - def _add_batch_to_download_queue(self, result): - self._download_deque.extend(result) - self._download_pending.update(map(lambda x: x[0], result)) - - def _filter_scan_batch(self, result): - extension = [] # consider whether this should be a dict - for name in self._download_scan_batch.keys(): - if name in self._download_pending: - continue - file_node, metadata = max(self._download_scan_batch[name], key=lambda x: x[1]['version']) - if self._should_download(name, metadata['version']): - extension += [(name, file_node, metadata)] - return extension - - def _download_file(self, name, file_node): - d = file_node.download_best_version() - def succeeded(res): - d.addCallback(lambda result: self._write_downloaded_file(name, result)) - self._stats_provider.count('magic_folder.objects_downloaded', 1) - return None - def failed(f): - self._log("download failed") - self._stats_provider.count('magic_folder.objects_download_failed', 1) - return f - def remove_from_pending(result): - self._download_pending = self._download_pending.difference(set([name])) - d.addCallbacks(succeeded, failed) - d.addBoth(self._do_download_callback) - d.addBoth(remove_from_pending) return d - def _write_downloaded_file(self, name, file_contents): - print "_write_downloaded_file: no-op." - - def _db_file_is_uploaded(self, childpath): - """_db_file_is_uploaded returns true if the file was previously uploaded - """ - assert self._db != None - r = self._db.check_file(childpath) - return r.was_uploaded() + def start_scanning(self): + self._scan(self._local_dir) + self._turn_deque() def _scan(self, localpath): if not os.path.isdir(localpath): @@ -263,7 +199,7 @@ class MagicFolder(service.MultiService): return None elif isdir: # process directories unconditionally - self._append_to_upload_deque(childpath) + self._append_to_deque(childpath) # recurse on the child directory return self._scan(childpath) @@ -271,7 +207,7 @@ class MagicFolder(service.MultiService): file_version = self._db.get_local_file_version(childpath) if file_version is None: # XXX upload if we didn't record our version in magicfolder db? - self._append_to_upload_deque(childpath) + self._append_to_deque(childpath) return None else: d2 = self._get_collective_latest_file(childpath) @@ -281,7 +217,7 @@ class MagicFolder(service.MultiService): return None if file_version > collective_version: self._append_to_upload_deque(childpath) - elif file_version < collective_version: + elif file_version < collective_version: # FIXME Daira thinks this is wrong # if a collective version of the file is newer than ours # we must download it and unlink the old file from our upload dirnode self._append_to_download_deque(childpath) @@ -300,72 +236,31 @@ class MagicFolder(service.MultiService): return d - def startService(self): - self._db = backupdb.get_backupdb(self._dbfile, create_version=(backupdb.SCHEMA_v3, 3)) - if self._db is None: - return Failure(Exception('ERROR: Unable to load magic folder db.')) - - service.MultiService.startService(self) - d = self._notifier.startReading() - self._stats_provider.count('magic_folder.dirs_monitored', 1) - return d - - def ready(self): - """ready is used to signal us to start - processing the upload and download items... - """ - self.is_ready = True - self._scan(self._local_dir) - self._scan_remote_collective() - self._turn_upload_deque() - self._turn_download_deque() - - def _turn_download_deque(self): - if self._stopped: - return - try: - file_path, file_node, metadata = self._download_deque.pop() - except IndexError: - self._log("magic folder upload deque is now empty") - self._download_lazy_tail = defer.succeed(None) - self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._scan_remote_collective)) - self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._turn_download_deque)) - return - self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._download_file, file_path, file_node)) - self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._turn_download_deque)) - - def _append_to_download_deque(self, path): - if path in self._download_scan_batch.keys(): - return - self._download_deque.append(path) - self._download_pending.add(path) - self._stats_provider.count('magic_folder.download_objects_queued', 1) - if self.is_ready: - reactor.callLater(0, self._turn_download_deque) - - def _append_to_upload_deque(self, path): - if path in self._upload_pending: + # FIXME move to QueueMixin + def _append_to_deque(self, path): + if path in self._pending: return - self._upload_deque.append(path) - self._upload_pending.add(path) - self._stats_provider.count('magic_folder.objects_queued', 1) + self._deque.append(path) + self._pending.add(path) + self._counter('magic_folder.objects_queued', 1) if self.is_ready: - reactor.callLater(0, self._turn_upload_deque) + reactor.callLater(0, self._turn_deque) - def _turn_upload_deque(self): + # FIXME move to QueueMixin + def _turn_deque(self): try: - path = self._upload_deque.pop() + path = self._deque.pop() except IndexError: self._log("magic folder upload deque is now empty") - self._upload_lazy_tail = defer.succeed(None) + self._lazy_tail = defer.succeed(None) return - self._upload_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._process, path)) - self._upload_lazy_tail.addCallback(lambda ign: self._turn_upload_deque()) + self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._process, path)) + self._lazy_tail.addCallback(lambda ign: self._turn_deque()) def _notify(self, opaque, path, events_mask): self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask)))) path_u = unicode_from_filepath(path) - self._append_to_upload_deque(path_u) + self._append_to_deque(path_u) def _process(self, path_u): precondition(isinstance(path_u, unicode), path_u) @@ -382,7 +277,7 @@ class MagicFolder(service.MultiService): upload_d = self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":0}, overwrite=True) def _succeeded(ign): self._log("created subdirectory %r" % (path_u,)) - self._stats_provider.count('magic_folder.directories_created', 1) + self._counter('magic_folder.directories_created', 1) def _failed(f): self._log("failed to create subdirectory %r" % (path_u,)) return f @@ -405,7 +300,7 @@ class MagicFolder(service.MultiService): if not os.path.exists(path_u): self._log("drop-upload: notified object %r disappeared " "(this is normal for temporary objects)" % (path_u,)) - self._stats_provider.count('magic_folder.objects_disappeared', 1) + self._counter('magic_folder.objects_disappeared', 1) d2 = defer.succeed(None) if self._db.check_file_db_exists(relpath_u): d2.addCallback(get_metadata) @@ -413,8 +308,8 @@ class MagicFolder(service.MultiService): current_version = self._db.get_local_file_version(relpath_u) + 1 metadata['version'] = current_version metadata['deleted'] = True - emptyUploadable = Data("", self._convergence) - return self._upload_dirnode.add_file(encoded_name_u, emptyUploadable, overwrite=True, metadata=metadata) + empty_uploadable = Data("", self._convergence) + return self._upload_dirnode.add_file(encoded_name_u, empty_uploadable, overwrite=True, metadata=metadata) d2.addCallback(set_deleted) d2.addCallback(lambda x: Exception("file does not exist")) return d2 @@ -437,7 +332,7 @@ class MagicFolder(service.MultiService): ctime = s[stat.ST_CTIME] mtime = s[stat.ST_MTIME] self._db.did_upload_file(filecap, relpath_u, version, mtime, ctime, size) - self._stats_provider.count('magic_folder.files_uploaded', 1) + self._counter('magic_folder.files_uploaded', 1) d2.addCallback(add_db_entry) return d2 else: @@ -446,65 +341,176 @@ class MagicFolder(service.MultiService): d.addCallback(_maybe_upload) def _succeeded(res): - self._stats_provider.count('magic_folder.objects_queued', -1) - self._stats_provider.count('magic_folder.objects_succeeded', 1) + self._counter('magic_folder.objects_queued', -1) + self._counter('magic_folder.objects_succeeded', 1) return res def _failed(f): - self._stats_provider.count('magic_folder.objects_queued', -1) - self._stats_provider.count('magic_folder.objects_failed', 1) + self._counter('magic_folder.objects_queued', -1) + self._counter('magic_folder.objects_failed', 1) self._log("%r while processing %r" % (f, path_u)) return f d.addCallbacks(_succeeded, _failed) - d.addBoth(self._do_processed_callback) + d.addBoth(self._do_callback) return d - def _do_download_callback(self, res): - if self._download_ignore_count == 0: - self._download_callback(res) - else: - self._download_ignore_count -= 1 - return None # intentionally suppress failures, which have already been logged - def _do_processed_callback(self, res): - if self._upload_ignore_count == 0: - self._processed_callback(res) - else: - self._upload_ignore_count -= 1 - return None # intentionally suppress failures, which have already been logged +class Downloader(QueueMixin): + def __init__(self, client, local_path, db, collective_dircap): + QueueMixin.__init__(self, client, local_path, db) + + # TODO: allow a path rather than a cap URI. + self._collective_dirnode = self._client.create_node_from_uri(collective_dircap) + + if not IDirectoryNode.providedBy(self._collective_dirnode): + raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.") + if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly(): + raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.") + + self._remote_scan_delay = 3 # XXX + self._download_scan_batch = {} # path -> [(filenode, metadata)] - def set_download_callback(self, callback, ignore_count=0): + def start_scanning(self): + self._scan_remote_collective() + self._turn_deque() + + def stop(self): + return self._lazy_tail + + def _should_download(self, relpath_u, remote_version): """ - set_download_callback sets a function that will be called after a - remote filesystem notification has been processed (successfully or unsuccessfully). + _should_download returns a bool indicating whether or not a remote object should be downloaded. + We check the remote metadata version against our magic-folder db version number; + latest version wins. """ - self._download_callback = callback - self._download_ignore_count = ignore_count + v = self._db.get_local_file_version(relpath_u) + return (v is None or v < remote_version) - def set_processed_callback(self, callback, ignore_count=0): - """ - set_processed_callback sets a function that will be called after a - local filesystem notification has been processed (successfully or unsuccessfully). + def _get_collective_latest_file(self, filename): + """_get_collective_latest_file takes a file path pointing to a file managed by + magic-folder and returns a deferred that fires with the two tuple containing a + file node and metadata for the latest version of the file located in the + magic-folder collective directory. """ - self._processed_callback = callback - self._upload_ignore_count = ignore_count + upload_readonly_dircap = self._upload_dirnode.get_readonly_uri() + collective_dirmap_d = self._collective_dirnode.list() + def do_filter(result): + print result + others = [x for x in result.keys() if result[x][0].get_readonly_uri() != upload_readonly_dircap] + return result, others + collective_dirmap_d.addCallback(do_filter) + def scan_collective(result): + list_of_deferreds = [] + collective_dirmap, others_list = result + for dir_name in result: + # XXX make sure it's a directory + d = defer.succeed(None) + d.addCallback(lambda x, dir_name=dir_name: collective_dirmap[dir_name][0].get_child_and_metadata(filename)) + list_of_deferreds.append(d) + deferList = defer.DeferredList(list_of_deferreds) + return deferList + collective_dirmap_d.addCallback(scan_collective) + def highest_version(deferredList): + max_version = 0 + metadata = None + node = None + for success, result in deferredList: + if success: + if result[1]['version'] > max_version: + node, metadata = result + max_version = result[1]['version'] + return node, metadata + collective_dirmap_d.addCallback(highest_version) + return collective_dirmap_d - def finish(self, for_tests=False): - self._stopped = True - self._notifier.stopReading() - self._stats_provider.count('magic_folder.dirs_monitored', -1) + def _scan_remote(self, nickname, dirnode): + listing_d = dirnode.list() + self._download_scan_batch = {} + def scan_listing(listing_map): + for name in listing_map.keys(): + file_node, metadata = listing_map[name] + if self._download_scan_batch.has_key(name): + self._download_scan_batch[name] += [(file_node, metadata)] + else: + self._download_scan_batch[name] = [(file_node, metadata)] + listing_d.addCallback(scan_listing) + return listing_d - if for_tests and hasattr(self._notifier, 'wait_until_stopped'): - d = self._notifier.wait_until_stopped() - else: + def _scan_remote_collective(self): + if self._collective_dirnode is None: + return + upload_readonly_dircap = self._upload_dirnode.get_readonly_uri() + collective_dirmap_d = self._collective_dirnode.list() + def do_filter(result): + others = [x for x in result.keys() if result[x][0].get_readonly_uri() != upload_readonly_dircap] + return result, others + collective_dirmap_d.addCallback(do_filter) + def scan_collective(result): d = defer.succeed(None) + collective_dirmap, others_list = result + for dir_name in others_list: + d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0])) + # XXX todo add errback + return d + collective_dirmap_d.addCallback(scan_collective) + collective_dirmap_d.addCallback(self._filter_scan_batch) + collective_dirmap_d.addCallback(self._add_batch_to_download_queue) + return collective_dirmap_d - d.addCallback(lambda x: self._download_lazy_tail) + def _add_batch_to_download_queue(self, result): + self._deque.extend(result) + self._pending.update(map(lambda x: x[0], result)) + + def _filter_scan_batch(self, result): + extension = [] # consider whether this should be a dict + for name in self._download_scan_batch.keys(): + if name in self._pending: + continue + file_node, metadata = max(self._download_scan_batch[name], key=lambda x: x[1]['version']) + if self._should_download(name, metadata['version']): + extension += [(name, file_node, metadata)] + return extension + + def _download_file(self, name, file_node): + d = file_node.download_best_version() + def succeeded(res): + d.addCallback(lambda result: self._write_downloaded_file(name, result)) + self._counter('magic_folder.objects_downloaded', 1) + return None + def failed(f): + self._log("download failed") + self._counter('magic_folder.objects_download_failed', 1) + return f + def remove_from_pending(result): + self._pending = self._pending.difference(set([name])) + d.addCallbacks(succeeded, failed) + d.addBoth(self._do_callback) + d.addBoth(remove_from_pending) return d - def remove_service(self): - return service.MultiService.disownServiceParent(self) + def _write_downloaded_file(self, name, file_contents): + print "_write_downloaded_file: no-op." - def _log(self, msg): - self._client.log("drop-upload: " + msg) - #print "_log %s" % (msg,) - #open("events", "ab+").write(msg) + # FIXME move to QueueMixin + def _append_to_deque(self, path): + if path in self._download_scan_batch.keys(): + return + self._deque.append(path) + self._pending.add(path) + self._counter('magic_folder.download_objects_queued', 1) + if self.is_ready: + reactor.callLater(0, self._turn_deque) + + # FIXME move to QueueMixin + def _turn_deque(self): + if self._stopped: + return + try: + file_path, file_node, metadata = self._deque.pop() + except IndexError: + self._log("magic folder upload deque is now empty") + self._lazy_tail = defer.succeed(None) + self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._scan_remote_collective)) + self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._turn_deque)) + return + self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._download_file, file_path, file_node)) + self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._turn_deque))