From: David Stainton Date: Thu, 2 Jul 2015 04:11:05 +0000 (-0700) Subject: Add rough naive downloader + remote scan X-Git-Url: https://git.rkrishnan.org/?a=commitdiff_plain;h=dc807e1c0b72ae2b171fcd48b576a21ca50d9740;p=tahoe-lafs%2Ftahoe-lafs.git Add rough naive downloader + remote scan - makes the basic naive Alice + Bob unit test pass - `_should_download` is currently a stub function and should be implemented - handling of local+remote file versions is currently faked... and should be implemented --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 1175882c..9c47774a 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -520,7 +520,7 @@ class Client(node.Node, pollmixin.PollMixin): s.startService() # start processing the upload queue when we've connected to enough servers - self.upload_ready_d.addCallback(s.upload_ready) + self.upload_ready_d.addCallback(s.ready) except Exception, e: self.log("couldn't start Magic Folder: %r", args=(e,)) diff --git a/src/allmydata/frontends/magic_folder.py b/src/allmydata/frontends/magic_folder.py index 8c304ef9..e587b59e 100644 --- a/src/allmydata/frontends/magic_folder.py +++ b/src/allmydata/frontends/magic_folder.py @@ -45,17 +45,23 @@ class MagicFolder(service.MultiService): precondition_abspath(local_dir) service.MultiService.__init__(self) + self._stopped = False + self._remote_scan_delay = 3 # XXX self._local_dir = abspath_expanduser_unicode(local_dir) self._upload_lazy_tail = defer.succeed(None) self._upload_pending = set() + self._download_scan_batch = {} + self._download_lazy_tail = defer.succeed(None) + self._download_pending = set() self._client = client self._stats_provider = client.stats_provider self._convergence = client.convergence self._local_path = to_filepath(self._local_dir) self._dbfile = dbfile + self._download_deque = deque() self._upload_deque = deque() - self.is_upload_ready = False + self.is_ready = False self._inotify = inotify or get_inotify_module() @@ -108,6 +114,29 @@ class MagicFolder(service.MultiService): recursive=True) + self._scan_remote_collective() + + def _should_download(self, path, remote_version): + """ + _should_download returns a bool indicating whether or not a remote object should be downloaded. + We check the remote metadata version against our magic-folder db version number; + latest version wins. + """ + # XXX todo + return True + + def _scan_remote(self, nickname, dirnode): + listing_d = dirnode.list() + def scan_listing(listing_map): + for name in listing_map.keys(): + file_node, metadata = listing_map[name] + if self._download_scan_batch.has_key(name): + self._download_scan_batch[name] += [(name, file_node, metadata)] + else: + self._download_scan_batch[name] = [(name, file_node, metadata)] + listing_d.addCallback(scan_listing) + return listing_d + def _scan_remote_collective(self): upload_readonly_dircap = self._upload_dirnode.get_readonly_uri() collective_dirmap_d = self._collective_dirnode.list() @@ -117,23 +146,53 @@ class MagicFolder(service.MultiService): others = filter(not_mine, result.keys()) return result, others collective_dirmap_d.addCallback(do_filter) - def do_scans(result): + def scan_collective(result): d = defer.succeed(None) - collective_dirmap, others = result - for dir_name in others: - d.addCallback(self._scan_remote(collective_dirmap[dir_name][0])) + collective_dirmap, others_list = result + for dir_name in others_list: + d.addCallback(lambda x: self._scan_remote(dir_name, collective_dirmap[dir_name][0])) return d - collective_dirmap_d.addCallback(do_scans) + collective_dirmap_d.addCallback(scan_collective) + collective_dirmap_d.addCallback(self._filter_scan_batch) + collective_dirmap_d.addCallback(self._add_batch_to_download_queue) return collective_dirmap_d - def _scan_remote(self, dirnode): - listing_d = dirnode.list() - def display_listing(result): - return result.keys() - listing_d.addCallback(display_listing) - # XXX ... - return listing_d - + def _add_batch_to_download_queue(self, result): + self._download_deque.extend(result) + self._download_pending.update(map(lambda x: x[0], result)) + + def _filter_scan_batch(self, result): + extension = [] + for name in self._download_scan_batch.keys(): + if name in self._download_pending: + # XXX + continue + if len(self._download_scan_batch[name]) == 1: + filename, file_node, metadata = self._download_scan_batch[name][0] + if self._should_download(name, metadata['version']): + extension += [(name, file_node, metadata)] + else: + for item in self._download_scan_batch: + nickname, file_node, metadata = item + if self._should_download(name, metadata['version']): + extension += [(name, file_node, metadata)] + return extension + + def _download_file(self, name, file_node): + print "_download_file" + d = file_node.download_best_version() + def succeeded(res): + d.addCallback(lambda result: self._write_downloaded_file(name, result)) + self._stats_provider.count('magic_folder.objects_downloaded', +1) + def failed(f): + pass + d.addCallbacks(succeeded, failed) + d.addBoth(self._do_download_callback) + return d + + def _write_downloaded_file(self, name, file_contents): + print "_write_downloaded_file: no-op." + def _db_file_is_uploaded(self, childpath): """_db_file_is_uploaded returns true if the file was previously uploaded """ @@ -190,18 +249,44 @@ class MagicFolder(service.MultiService): self._stats_provider.count('magic_folder.dirs_monitored', 1) return d - def upload_ready(self): - """upload_ready is used to signal us to start - processing the upload items... + def ready(self): + """ready is used to signal us to start + processing the upload and download items... """ - self.is_upload_ready = True + self.is_ready = True self._turn_upload_deque() + self._turn_download_deque() + self._scan_remote_collective() + + def _append_to_download_deque(self, name, file_node): + if name in self._download_pending: + return + self._download_deque.append(file_node) # XXX + self._download_pending.add(name) + self._stats_provider.count('magic_folder.objects_queued_for_download', 1) + reactor.callLater(0, self._turn_download_deque) + + def _turn_download_deque(self): + print "_turn_download_deque" + if self._stopped: + return + try: + file_path, file_node, metadata = self._download_deque.pop() + except IndexError: + self._log("magic folder upload deque is now empty") + self._download_lazy_tail = defer.succeed(None) + self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._turn_download_deque)) + return + self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._download_file, file_path, file_node)) + self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._turn_download_deque)) def _append_to_upload_deque(self, path): + if path in self._upload_pending: + return self._upload_deque.append(path) self._upload_pending.add(path) self._stats_provider.count('magic_folder.objects_queued', 1) - if self.is_upload_ready: + if self.is_ready: reactor.callLater(0, self._turn_upload_deque) def _turn_upload_deque(self): @@ -217,21 +302,20 @@ class MagicFolder(service.MultiService): def _notify(self, opaque, path, events_mask): self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask)))) path_u = unicode_from_filepath(path) - if path_u not in self._upload_pending: - self._append_to_upload_deque(path_u) + self._append_to_upload_deque(path_u) def _process(self, path): d = defer.succeed(None) def _add_file(name): u = FileName(path, self._convergence) - return self._upload_dirnode.add_file(name, u, overwrite=True) + return self._upload_dirnode.add_file(name, u, metadata={"version":1}, overwrite=True) def _add_dir(name): self._notifier.watch(to_filepath(path), mask=self.mask, callbacks=[self._notify], recursive=True) u = Data("", self._convergence) name += "@_" - d2 = self._upload_dirnode.add_file(name, u, overwrite=True) + d2 = self._upload_dirnode.add_file(name, u, metadata={"version":1}, overwrite=True) def _succeeded(ign): self._log("created subdirectory %r" % (path,)) self._stats_provider.count('magic_folder.directories_created', 1) @@ -301,6 +385,13 @@ class MagicFolder(service.MultiService): d.addBoth(self._do_processed_callback) return d + def _do_download_callback(self, res): + if self._download_ignore_count == 0: + self._download_callback(res) + else: + self._download_ignore_count -= 1 + return None # intentionally suppress failures, which have already been logged + def _do_processed_callback(self, res): if self._ignore_count == 0: self._processed_callback(res) @@ -316,7 +407,6 @@ class MagicFolder(service.MultiService): self._download_callback = callback self._download_ignore_count = ignore_count - def set_processed_callback(self, callback, ignore_count=0): """ set_processed_callback sets a function that will be called after a @@ -326,12 +416,17 @@ class MagicFolder(service.MultiService): self._ignore_count = ignore_count def finish(self, for_tests=False): + self._stopped = True self._notifier.stopReading() self._stats_provider.count('magic_folder.dirs_monitored', -1) + if for_tests and hasattr(self._notifier, 'wait_until_stopped'): - return self._notifier.wait_until_stopped() + d = self._notifier.wait_until_stopped() else: - return defer.succeed(None) + d = defer.succeed(None) + + d.addCallback(lambda x: self._download_lazy_tail) + return d def remove_service(self): return service.MultiService.disownServiceParent(self) diff --git a/src/allmydata/test/test_cli_magic_folder.py b/src/allmydata/test/test_cli_magic_folder.py index ee235bcd..df8b84a6 100644 --- a/src/allmydata/test/test_cli_magic_folder.py +++ b/src/allmydata/test/test_cli_magic_folder.py @@ -116,7 +116,7 @@ class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin): magicfolder = MagicFolder(self.get_client(client_num), upload_dircap, collective_dircap, local_magic_dir, dbfile, inotify=self.inotify, pending_delay=0.2) magicfolder.setServiceParent(self.get_client(client_num)) - magicfolder.upload_ready() + magicfolder.ready() return magicfolder def setup_alice_and_bob(self): diff --git a/src/allmydata/test/test_magic_folder.py b/src/allmydata/test/test_magic_folder.py index 59ef855c..2d44f8c2 100644 --- a/src/allmydata/test/test_magic_folder.py +++ b/src/allmydata/test/test_magic_folder.py @@ -59,7 +59,7 @@ class MagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, ReallyEqual self.magicfolder = MagicFolder(self.client, self.upload_dircap, self.collective_dircap, self.local_dir, dbfile, inotify=self.inotify, pending_delay=0.2) self.magicfolder.setServiceParent(self.client) - self.magicfolder.upload_ready() + self.magicfolder.ready() # Prevent unclean reactor errors. @@ -349,7 +349,7 @@ class MagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, ReallyEqual def prepare_for_bob_stats(result): self.stats_provider = self.bob_magicfolder._client.stats_provider d.addCallback(prepare_for_bob_stats) - d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('magic_folder.objects_succeeded'), 1)) + d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('magic_folder.objects_downloaded'), 1)) def cleanup_Alice_and_Bob(result): d = defer.succeed(None)