]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
Add rough naive downloader + remote scan
authorDavid Stainton <dstainton415@gmail.com>
Thu, 2 Jul 2015 04:11:05 +0000 (21:11 -0700)
committerDavid Stainton <dstainton415@gmail.com>
Thu, 2 Jul 2015 04:11:05 +0000 (21:11 -0700)
- makes the basic naive Alice + Bob unit test pass
-  `_should_download` is currently a stub function and
should be implemented
- handling of local+remote file versions is currently faked...
and should be implemented

src/allmydata/client.py
src/allmydata/frontends/magic_folder.py
src/allmydata/test/test_cli_magic_folder.py
src/allmydata/test/test_magic_folder.py

index 1175882cdafa509caedf79056b1900a0fa512593..9c47774ad264605c2a7be909a4a5731de5351169 100644 (file)
@@ -520,7 +520,7 @@ class Client(node.Node, pollmixin.PollMixin):
                 s.startService()
 
                 # start processing the upload queue when we've connected to enough servers
-                self.upload_ready_d.addCallback(s.upload_ready)
+                self.upload_ready_d.addCallback(s.ready)
             except Exception, e:
                 self.log("couldn't start Magic Folder: %r", args=(e,))
 
index 8c304ef95c2e3aed3c5427df06219d7f94fe292f..e587b59ec37d45b6691f546c0ccd255ac9a66968 100644 (file)
@@ -45,17 +45,23 @@ class MagicFolder(service.MultiService):
         precondition_abspath(local_dir)
 
         service.MultiService.__init__(self)
+        self._stopped = False
+        self._remote_scan_delay = 3 # XXX
         self._local_dir = abspath_expanduser_unicode(local_dir)
         self._upload_lazy_tail = defer.succeed(None)
         self._upload_pending = set()
+        self._download_scan_batch = {}
+        self._download_lazy_tail = defer.succeed(None)
+        self._download_pending = set()
         self._client = client
         self._stats_provider = client.stats_provider
         self._convergence = client.convergence
         self._local_path = to_filepath(self._local_dir)
         self._dbfile = dbfile
 
+        self._download_deque = deque()
         self._upload_deque = deque()
-        self.is_upload_ready = False
+        self.is_ready = False
 
         self._inotify = inotify or get_inotify_module()
 
@@ -108,6 +114,29 @@ class MagicFolder(service.MultiService):
                              recursive=True)
 
 
+        self._scan_remote_collective()
+
+    def _should_download(self, path, remote_version):
+        """
+        _should_download returns a bool indicating whether or not a remote object should be downloaded.
+        We check the remote metadata version against our magic-folder db version number;
+        latest version wins.
+        """
+        # XXX todo
+        return True
+
+    def _scan_remote(self, nickname, dirnode):
+        listing_d = dirnode.list()
+        def scan_listing(listing_map):
+            for name in listing_map.keys():
+                file_node, metadata = listing_map[name]
+                if self._download_scan_batch.has_key(name):
+                    self._download_scan_batch[name] += [(name, file_node, metadata)]
+                else:
+                    self._download_scan_batch[name] = [(name, file_node, metadata)]
+        listing_d.addCallback(scan_listing)
+        return listing_d
+
     def _scan_remote_collective(self):
         upload_readonly_dircap = self._upload_dirnode.get_readonly_uri()
         collective_dirmap_d = self._collective_dirnode.list()
@@ -117,23 +146,53 @@ class MagicFolder(service.MultiService):
             others = filter(not_mine, result.keys())
             return result, others
         collective_dirmap_d.addCallback(do_filter)
-        def do_scans(result):
+        def scan_collective(result):
             d = defer.succeed(None)
-            collective_dirmap, others = result
-            for dir_name in others:
-                d.addCallback(self._scan_remote(collective_dirmap[dir_name][0]))
+            collective_dirmap, others_list = result
+            for dir_name in others_list:
+                d.addCallback(lambda x: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
             return d
-        collective_dirmap_d.addCallback(do_scans)
+        collective_dirmap_d.addCallback(scan_collective)
+        collective_dirmap_d.addCallback(self._filter_scan_batch)
+        collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
         return collective_dirmap_d
 
-    def _scan_remote(self, dirnode):
-        listing_d = dirnode.list()
-        def display_listing(result):
-            return result.keys()
-        listing_d.addCallback(display_listing)
-        # XXX ...
-        return listing_d
-        
+    def _add_batch_to_download_queue(self, result):
+        self._download_deque.extend(result)
+        self._download_pending.update(map(lambda x: x[0], result))
+
+    def _filter_scan_batch(self, result):
+        extension = []
+        for name in self._download_scan_batch.keys():
+            if name in self._download_pending:
+                # XXX
+                continue
+            if len(self._download_scan_batch[name]) == 1:
+                filename, file_node, metadata = self._download_scan_batch[name][0]
+                if self._should_download(name, metadata['version']):
+                    extension += [(name, file_node, metadata)]
+            else:
+                for item in self._download_scan_batch:
+                    nickname, file_node, metadata = item
+                    if self._should_download(name, metadata['version']):
+                        extension += [(name, file_node, metadata)]
+        return extension
+
+    def _download_file(self, name, file_node):
+        print "_download_file"
+        d = file_node.download_best_version()
+        def succeeded(res):
+            d.addCallback(lambda result: self._write_downloaded_file(name, result))
+            self._stats_provider.count('magic_folder.objects_downloaded', +1)
+        def failed(f):
+            pass
+        d.addCallbacks(succeeded, failed)
+        d.addBoth(self._do_download_callback)
+        return d
+
+    def _write_downloaded_file(self, name, file_contents):
+        print "_write_downloaded_file: no-op."
+
     def _db_file_is_uploaded(self, childpath):
         """_db_file_is_uploaded returns true if the file was previously uploaded
         """ 
@@ -190,18 +249,44 @@ class MagicFolder(service.MultiService):
         self._stats_provider.count('magic_folder.dirs_monitored', 1)
         return d
 
-    def upload_ready(self):
-        """upload_ready is used to signal us to start
-        processing the upload items...
+    def ready(self):
+        """ready is used to signal us to start
+        processing the upload and download items...
         """
-        self.is_upload_ready = True
+        self.is_ready = True
         self._turn_upload_deque()
+        self._turn_download_deque()
+        self._scan_remote_collective()
+
+    def _append_to_download_deque(self, name, file_node):
+        if name in self._download_pending:
+            return
+        self._download_deque.append(file_node) # XXX
+        self._download_pending.add(name)
+        self._stats_provider.count('magic_folder.objects_queued_for_download', 1)
+        reactor.callLater(0, self._turn_download_deque)
+
+    def _turn_download_deque(self):
+        print "_turn_download_deque"
+        if self._stopped:
+            return
+        try:
+            file_path, file_node, metadata = self._download_deque.pop()
+        except IndexError:
+            self._log("magic folder upload deque is now empty")
+            self._download_lazy_tail = defer.succeed(None)
+            self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._turn_download_deque))
+            return
+        self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._download_file, file_path, file_node))
+        self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._turn_download_deque))
 
     def _append_to_upload_deque(self, path):
+        if path in self._upload_pending:
+            return
         self._upload_deque.append(path)
         self._upload_pending.add(path)
         self._stats_provider.count('magic_folder.objects_queued', 1)
-        if self.is_upload_ready:
+        if self.is_ready:
             reactor.callLater(0, self._turn_upload_deque)
 
     def _turn_upload_deque(self):
@@ -217,21 +302,20 @@ class MagicFolder(service.MultiService):
     def _notify(self, opaque, path, events_mask):
         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
         path_u = unicode_from_filepath(path)
-        if path_u not in self._upload_pending:
-            self._append_to_upload_deque(path_u)
+        self._append_to_upload_deque(path_u)
 
     def _process(self, path):
         d = defer.succeed(None)
 
         def _add_file(name):
             u = FileName(path, self._convergence)
-            return self._upload_dirnode.add_file(name, u, overwrite=True)
+            return self._upload_dirnode.add_file(name, u, metadata={"version":1}, overwrite=True)
 
         def _add_dir(name):
             self._notifier.watch(to_filepath(path), mask=self.mask, callbacks=[self._notify], recursive=True)
             u = Data("", self._convergence)
             name += "@_"
-            d2 = self._upload_dirnode.add_file(name, u, overwrite=True)
+            d2 = self._upload_dirnode.add_file(name, u, metadata={"version":1}, overwrite=True)
             def _succeeded(ign):
                 self._log("created subdirectory %r" % (path,))
                 self._stats_provider.count('magic_folder.directories_created', 1)
@@ -301,6 +385,13 @@ class MagicFolder(service.MultiService):
         d.addBoth(self._do_processed_callback)
         return d
 
+    def _do_download_callback(self, res):
+        if self._download_ignore_count == 0:
+            self._download_callback(res)
+        else:
+            self._download_ignore_count -= 1
+        return None  # intentionally suppress failures, which have already been logged
+
     def _do_processed_callback(self, res):
         if self._ignore_count == 0:
             self._processed_callback(res)
@@ -316,7 +407,6 @@ class MagicFolder(service.MultiService):
         self._download_callback = callback
         self._download_ignore_count = ignore_count
 
-
     def set_processed_callback(self, callback, ignore_count=0):
         """
         set_processed_callback sets a function that will be called after a
@@ -326,12 +416,17 @@ class MagicFolder(service.MultiService):
         self._ignore_count = ignore_count
 
     def finish(self, for_tests=False):
+        self._stopped = True
         self._notifier.stopReading()
         self._stats_provider.count('magic_folder.dirs_monitored', -1)
+
         if for_tests and hasattr(self._notifier, 'wait_until_stopped'):
-            return self._notifier.wait_until_stopped()
+            d = self._notifier.wait_until_stopped()
         else:
-            return defer.succeed(None)
+            d = defer.succeed(None)
+
+        d.addCallback(lambda x: self._download_lazy_tail)
+        return d
 
     def remove_service(self):
         return service.MultiService.disownServiceParent(self)
index ee235bcda450cde211605a9adaa80318c7ae6601..df8b84a619ea88e442c2fbfbaa53741abbf2be2a 100644 (file)
@@ -116,7 +116,7 @@ class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin):
         magicfolder = MagicFolder(self.get_client(client_num), upload_dircap, collective_dircap, local_magic_dir,
                                        dbfile, inotify=self.inotify, pending_delay=0.2)
         magicfolder.setServiceParent(self.get_client(client_num))
-        magicfolder.upload_ready()
+        magicfolder.ready()
         return magicfolder
 
     def setup_alice_and_bob(self):
index 59ef855c1d32039182e002833b78b439e21d91ef..2d44f8c2408fc43f2ae407ed7c25cbb2c7fdc794 100644 (file)
@@ -59,7 +59,7 @@ class MagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, ReallyEqual
         self.magicfolder = MagicFolder(self.client, self.upload_dircap, self.collective_dircap, self.local_dir,
                                        dbfile, inotify=self.inotify, pending_delay=0.2)
         self.magicfolder.setServiceParent(self.client)
-        self.magicfolder.upload_ready()
+        self.magicfolder.ready()
 
     # Prevent unclean reactor errors.
 
@@ -349,7 +349,7 @@ class MagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, ReallyEqual
         def prepare_for_bob_stats(result):
             self.stats_provider = self.bob_magicfolder._client.stats_provider
         d.addCallback(prepare_for_bob_stats)
-        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('magic_folder.objects_succeeded'), 1))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('magic_folder.objects_downloaded'), 1))
 
         def cleanup_Alice_and_Bob(result):
             d = defer.succeed(None)