From: David Stainton Date: Tue, 28 Jul 2015 00:00:39 +0000 (-0700) Subject: wip X-Git-Url: https://git.rkrishnan.org/uri//%22%22?a=commitdiff_plain;h=0e3640a252422820b296f2acc7559bb2de731bef;p=tahoe-lafs%2Ftahoe-lafs.git wip --- diff --git a/src/allmydata/frontends/magic_folder.py b/src/allmydata/frontends/magic_folder.py index 4627e471..5ee96a2c 100644 --- a/src/allmydata/frontends/magic_folder.py +++ b/src/allmydata/frontends/magic_folder.py @@ -74,8 +74,14 @@ class MagicFolder(service.MultiService): self.downloader.start_scanning() def finish(self): + print "finish" d = self.uploader.stop() + def _print(f): + print f + return f + d.addErrback(_print) d.addBoth(lambda ign: self.downloader.stop()) + d.addErrback(_print) return d def remove_service(self): @@ -167,6 +173,7 @@ class Uploader(QueueMixin): return d def stop(self): + print "stop: _deque = %r, _pending = %r" % (self._deque, self._pending) self._notifier.stopReading() self._counter('magic_folder.dirs_monitored', -1) @@ -174,6 +181,10 @@ class Uploader(QueueMixin): d = self._notifier.wait_until_stopped() else: d = defer.succeed(None) + def _after(res): + print "stop _after: res = %r, _deque = %r, _pending = %r" % (res, self._deque, self._pending) + return res + d.addBoth(_after) return d def start_scanning(self): @@ -377,13 +388,22 @@ class Downloader(QueueMixin): self._remote_scan_delay = 3 # XXX self._download_scan_batch = {} # path -> [(filenode, metadata)] + print "Downloader init" def start_scanning(self): + print "downloader start_scanning" self._scan_remote_collective() self._turn_deque() def stop(self): - return self._lazy_tail + print "downloader stop" + d = defer.succeed(None) + d.addCallback(lambda ign: self._lazy_tail) + def _print(res): + print "downloader stop _after: res = %r, _deque = %r, _pending = %r" % (res, self._deque, self._pending) + return res + d.addBoth(_print) + return d def _should_download(self, relpath_u, remote_version): """ @@ -391,6 +411,7 @@ class Downloader(QueueMixin): We check the remote metadata version against our magic-folder db version number; latest version wins. """ + print "_should_download" v = self._db.get_local_file_version(relpath_u) return (v is None or v < remote_version) @@ -432,19 +453,29 @@ class Downloader(QueueMixin): return collective_dirmap_d def _scan_remote(self, nickname, dirnode): + print "_scan_remote START: nickname %s dirnode %s" % (nickname, dirnode) listing_d = dirnode.list() - self._download_scan_batch = {} def scan_listing(listing_map): for name in listing_map.keys(): + print "name ", name file_node, metadata = listing_map[name] + print "ALL KEYS %s" % (self._download_scan_batch.keys(),) if self._download_scan_batch.has_key(name): + print "HAS KEY - %s %s" % (file_node, metadata) self._download_scan_batch[name] += [(file_node, metadata)] else: + print "NOT HAS KEY" self._download_scan_batch[name] = [(file_node, metadata)] + + print "download scan batch before filtering", repr(self._download_scan_batch) listing_d.addCallback(scan_listing) + print "_scan_remote END" return listing_d def _scan_remote_collective(self): + self._download_scan_batch = {} # XXX + + print "downloader _scan_remote_collective" if self._collective_dirnode is None: return collective_dirmap_d = self._collective_dirnode.list() @@ -463,7 +494,12 @@ class Downloader(QueueMixin): return d collective_dirmap_d.addCallback(scan_collective) collective_dirmap_d.addCallback(self._filter_scan_batch) + def _print(f): + print f + return f + collective_dirmap_d.addErrback(_print) collective_dirmap_d.addCallback(self._add_batch_to_download_queue) + print "end of _scan_remote_collective" return collective_dirmap_d def _add_batch_to_download_queue(self, result): @@ -471,30 +507,40 @@ class Downloader(QueueMixin): self._pending.update(map(lambda x: x[0], result)) def _filter_scan_batch(self, result): + print "FILTER START len %s" % (len(self._download_scan_batch),) extension = [] # consider whether this should be a dict for name in self._download_scan_batch.keys(): if name in self._pending: + print "downloader: %s found in pending; skipping" % (name,) continue file_node, metadata = max(self._download_scan_batch[name], key=lambda x: x[1]['version']) + print "file_node %s metadata %s" % (file_node, metadata) if self._should_download(name, metadata['version']): + print "should download" extension += [(name, file_node, metadata)] + else: + print "should not download" + print "FILTER END" return extension def _download_file(self, name, file_node): + print "_download_file" d = file_node.download_best_version() def succeeded(res): d.addCallback(lambda result: self._write_downloaded_file(name, result)) self._counter('magic_folder.objects_downloaded', 1) return None def failed(f): - self._log("download failed") + self._log("download failed: %s" % (str(f),)) self._counter('magic_folder.objects_download_failed', 1) return f - def remove_from_pending(result): - self._pending = self._pending.difference(set([name])) + def remove_from_pending(ign): + print "REMOVE FROM PENDING _pending = %r, name = %r" % (self._pending, name) + self._pending.remove(name) + print "REMOVE FROM PENDING _after: _pending = %r" % (self._pending,) d.addCallbacks(succeeded, failed) d.addBoth(self._do_callback) - d.addBoth(remove_from_pending) + d.addCallback(remove_from_pending) return d def _write_downloaded_file(self, name, file_contents): @@ -502,6 +548,7 @@ class Downloader(QueueMixin): # FIXME move to QueueMixin def _append_to_deque(self, path): + print "downloader _append_to_deque" if path in self._download_scan_batch.keys(): return self._deque.append(path) @@ -512,6 +559,7 @@ class Downloader(QueueMixin): # FIXME move to QueueMixin def _turn_deque(self): + print "downloader _turn_deque" #if self._stopped: # return try: diff --git a/src/allmydata/test/test_magic_folder.py b/src/allmydata/test/test_magic_folder.py index b6bd5323..28c94064 100644 --- a/src/allmydata/test/test_magic_folder.py +++ b/src/allmydata/test/test_magic_folder.py @@ -142,7 +142,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix self.mkdir_nonascii(small_tree_dir) fileutil.write(abspath_expanduser_unicode(u"what", base=small_tree_dir), "say when") d2 = defer.Deferred() - self.magicfolder.set_callback(d2.callback, ignore_count=1) + self.magicfolder.uploader.set_callback(d2.callback, ignore_count=1) os.rename(small_tree_dir, new_small_tree_dir) self.notify(to_filepath(new_small_tree_dir), self.inotify.IN_MOVED_TO) return d2 @@ -154,7 +154,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix def _check_moved_tree_is_watched(res): d2 = defer.Deferred() - self.magicfolder.set_callback(d2.callback) + self.magicfolder.uploader.set_callback(d2.callback) fileutil.write(abspath_expanduser_unicode(u"another", base=new_small_tree_dir), "file") self.notify(to_filepath(abspath_expanduser_unicode(u"another", base=new_small_tree_dir)), self.inotify.IN_CLOSE_WRITE) return d2 @@ -380,7 +380,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix def Alice_rewrite_file(result): print "Alice rewrites file\n" - self.file_path = abspath_expanduser_unicode(u"file1", base=self.alice_magicfolder._local_dir) + self.file_path = abspath_expanduser_unicode(u"file1", base=self.alice_magicfolder.uploader._local_path_u) fileutil.write(self.file_path, "Alice suddenly sees the white rabbit running into the forest.") self.magicfolder = self.alice_magicfolder self.notify(to_filepath(self.file_path), self.inotify.IN_CLOSE_WRITE)