From: Daira Hopwood Date: Tue, 3 Nov 2015 02:31:34 +0000 (+0000) Subject: Simplify _scan_remote_* and remove Downloader._download_scan_batch attribute. X-Git-Url: https://git.rkrishnan.org/specifications/%5B/%5D%20/reliability?a=commitdiff_plain;h=07ed9b8e240f3ef148c04a85daa52b838cd9c971;p=tahoe-lafs%2Ftahoe-lafs.git Simplify _scan_remote_* and remove Downloader._download_scan_batch attribute. Signed-off-by: Daira Hopwood --- diff --git a/src/allmydata/frontends/magic_folder.py b/src/allmydata/frontends/magic_folder.py index cf23afd1..6d77ceb3 100644 --- a/src/allmydata/frontends/magic_folder.py +++ b/src/allmydata/frontends/magic_folder.py @@ -515,7 +515,6 @@ class Downloader(QueueMixin, WriteFileMixin): self._upload_readonly_dircap = upload_readonly_dircap self._turn_delay = self.REMOTE_SCAN_INTERVAL - self._download_scan_batch = {} # path -> [(filenode, metadata)] def start_scanning(self): self._log("start_scanning") @@ -589,14 +588,8 @@ class Downloader(QueueMixin, WriteFileMixin): collective_dirmap_d.addCallback(highest_version) return collective_dirmap_d - def _append_to_batch(self, name, file_node, metadata): - if self._download_scan_batch.has_key(name): - self._download_scan_batch[name] += [(file_node, metadata)] - else: - self._download_scan_batch[name] = [(file_node, metadata)] - - def _scan_remote(self, nickname, dirnode): - self._log("_scan_remote nickname %r" % (nickname,)) + def _scan_remote_dmd(self, nickname, dirnode, scan_batch): + self._log("_scan_remote_dmd nickname %r" % (nickname,)) d = dirnode.list() def scan_listing(listing_map): for encoded_relpath_u in listing_map.keys(): @@ -607,16 +600,21 @@ class Downloader(QueueMixin, WriteFileMixin): local_version = self._get_local_latest(relpath_u) remote_version = metadata.get('version', None) self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version)) + if local_version is None or remote_version is None or local_version < remote_version: self._log("%r added to download queue" % (relpath_u,)) - self._append_to_batch(relpath_u, file_node, metadata) + if scan_batch.has_key(relpath_u): + scan_batch[relpath_u] += [(file_node, metadata)] + else: + scan_batch[relpath_u] = [(file_node, metadata)] + d.addCallback(scan_listing) - d.addBoth(self._logcb, "end of _scan_remote") + d.addBoth(self._logcb, "end of _scan_remote_dmd") return d def _scan_remote_collective(self): self._log("_scan_remote_collective") - self._download_scan_batch = {} # XXX + scan_batch = {} # path -> [(filenode, metadata)] d = self._collective_dirnode.list() def scan_collective(dirmap): @@ -624,38 +622,31 @@ class Downloader(QueueMixin, WriteFileMixin): for dir_name in dirmap: (dirnode, metadata) = dirmap[dir_name] if dirnode.get_readonly_uri() != self._upload_readonly_dircap: - d2.addCallback(lambda ign, dir_name=dir_name: self._scan_remote(dir_name, dirnode)) + d2.addCallback(lambda ign, dir_name=dir_name: + self._scan_remote_dmd(dir_name, dirnode, scan_batch)) def _err(f): self._log("failed to scan DMD for client %r: %s" % (dir_name, f)) # XXX what should we do to make this failure more visible to users? d2.addErrback(_err) + return d2 d.addCallback(scan_collective) - d.addCallback(self._filter_scan_batch) - d.addCallback(self._add_batch_to_download_queue) - return d - def _add_batch_to_download_queue(self, result): - self._log("result = %r" % (result,)) - self._log("deque = %r" % (self._deque,)) - self._deque.extend(result) - self._log("deque after = %r" % (self._deque,)) - self._count('objects_queued', len(result)) - - def _filter_scan_batch(self, result): - self._log("_filter_scan_batch") - extension = [] # consider whether this should be a dict - for relpath_u in self._download_scan_batch.keys(): - if relpath_u in self._pending: - continue - file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version']) - if self._should_download(relpath_u, metadata['version']): - extension += [(relpath_u, file_node, metadata)] - else: - self._log("Excluding %r" % (relpath_u,)) - self._count('objects_excluded') - self._call_hook(None, 'processed') - return extension + def _filter_batch_to_deque(ign): + self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch)) + for relpath_u in scan_batch.keys(): + file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version']) + + if self._should_download(relpath_u, metadata['version']): + self._deque.append( (relpath_u, file_node, metadata) ) + else: + self._log("Excluding %r" % (relpath_u,)) + self._count('objects_excluded') + self._call_hook(None, 'processed') + + self._log("deque after = %r" % (self._deque,)) + d.addCallback(_filter_batch_to_deque) + return d def _when_queue_is_empty(self): d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)