precondition_abspath(local_dir)
service.MultiService.__init__(self)
+ self._stopped = False
+ self._remote_scan_delay = 3 # XXX
self._local_dir = abspath_expanduser_unicode(local_dir)
self._upload_lazy_tail = defer.succeed(None)
self._upload_pending = set()
+ self._download_scan_batch = {}
+ self._download_lazy_tail = defer.succeed(None)
+ self._download_pending = set()
self._client = client
self._stats_provider = client.stats_provider
self._convergence = client.convergence
self._local_path = to_filepath(self._local_dir)
self._dbfile = dbfile
+ self._download_deque = deque()
self._upload_deque = deque()
- self.is_upload_ready = False
+ self.is_ready = False
self._inotify = inotify or get_inotify_module()
recursive=True)
+ self._scan_remote_collective()
+
+ def _should_download(self, path, remote_version):
+ """
+ _should_download returns a bool indicating whether or not a remote object should be downloaded.
+ We check the remote metadata version against our magic-folder db version number;
+ latest version wins.
+ """
+ # XXX todo
+ return True
+
+ def _scan_remote(self, nickname, dirnode):
+ listing_d = dirnode.list()
+ def scan_listing(listing_map):
+ for name in listing_map.keys():
+ file_node, metadata = listing_map[name]
+ if self._download_scan_batch.has_key(name):
+ self._download_scan_batch[name] += [(name, file_node, metadata)]
+ else:
+ self._download_scan_batch[name] = [(name, file_node, metadata)]
+ listing_d.addCallback(scan_listing)
+ return listing_d
+
def _scan_remote_collective(self):
upload_readonly_dircap = self._upload_dirnode.get_readonly_uri()
collective_dirmap_d = self._collective_dirnode.list()
others = filter(not_mine, result.keys())
return result, others
collective_dirmap_d.addCallback(do_filter)
- def do_scans(result):
+ def scan_collective(result):
d = defer.succeed(None)
- collective_dirmap, others = result
- for dir_name in others:
- d.addCallback(self._scan_remote(collective_dirmap[dir_name][0]))
+ collective_dirmap, others_list = result
+ for dir_name in others_list:
+ d.addCallback(lambda x: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
return d
- collective_dirmap_d.addCallback(do_scans)
+ collective_dirmap_d.addCallback(scan_collective)
+ collective_dirmap_d.addCallback(self._filter_scan_batch)
+ collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
return collective_dirmap_d
- def _scan_remote(self, dirnode):
- listing_d = dirnode.list()
- def display_listing(result):
- return result.keys()
- listing_d.addCallback(display_listing)
- # XXX ...
- return listing_d
-
+ def _add_batch_to_download_queue(self, result):
+ self._download_deque.extend(result)
+ self._download_pending.update(map(lambda x: x[0], result))
+
+ def _filter_scan_batch(self, result):
+ extension = []
+ for name in self._download_scan_batch.keys():
+ if name in self._download_pending:
+ # XXX
+ continue
+ if len(self._download_scan_batch[name]) == 1:
+ filename, file_node, metadata = self._download_scan_batch[name][0]
+ if self._should_download(name, metadata['version']):
+ extension += [(name, file_node, metadata)]
+ else:
+ for item in self._download_scan_batch:
+ nickname, file_node, metadata = item
+ if self._should_download(name, metadata['version']):
+ extension += [(name, file_node, metadata)]
+ return extension
+
+ def _download_file(self, name, file_node):
+ print "_download_file"
+ d = file_node.download_best_version()
+ def succeeded(res):
+ d.addCallback(lambda result: self._write_downloaded_file(name, result))
+ self._stats_provider.count('magic_folder.objects_downloaded', +1)
+ def failed(f):
+ pass
+ d.addCallbacks(succeeded, failed)
+ d.addBoth(self._do_download_callback)
+ return d
+
+ def _write_downloaded_file(self, name, file_contents):
+ print "_write_downloaded_file: no-op."
+
def _db_file_is_uploaded(self, childpath):
"""_db_file_is_uploaded returns true if the file was previously uploaded
"""
self._stats_provider.count('magic_folder.dirs_monitored', 1)
return d
- def upload_ready(self):
- """upload_ready is used to signal us to start
- processing the upload items...
+ def ready(self):
+ """ready is used to signal us to start
+ processing the upload and download items...
"""
- self.is_upload_ready = True
+ self.is_ready = True
self._turn_upload_deque()
+ self._turn_download_deque()
+ self._scan_remote_collective()
+
+ def _append_to_download_deque(self, name, file_node):
+ if name in self._download_pending:
+ return
+ self._download_deque.append(file_node) # XXX
+ self._download_pending.add(name)
+ self._stats_provider.count('magic_folder.objects_queued_for_download', 1)
+ reactor.callLater(0, self._turn_download_deque)
+
+ def _turn_download_deque(self):
+ print "_turn_download_deque"
+ if self._stopped:
+ return
+ try:
+ file_path, file_node, metadata = self._download_deque.pop()
+ except IndexError:
+ self._log("magic folder upload deque is now empty")
+ self._download_lazy_tail = defer.succeed(None)
+ self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._turn_download_deque))
+ return
+ self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._download_file, file_path, file_node))
+ self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._turn_download_deque))
def _append_to_upload_deque(self, path):
+ if path in self._upload_pending:
+ return
self._upload_deque.append(path)
self._upload_pending.add(path)
self._stats_provider.count('magic_folder.objects_queued', 1)
- if self.is_upload_ready:
+ if self.is_ready:
reactor.callLater(0, self._turn_upload_deque)
def _turn_upload_deque(self):
def _notify(self, opaque, path, events_mask):
self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
path_u = unicode_from_filepath(path)
- if path_u not in self._upload_pending:
- self._append_to_upload_deque(path_u)
+ self._append_to_upload_deque(path_u)
def _process(self, path):
d = defer.succeed(None)
def _add_file(name):
u = FileName(path, self._convergence)
- return self._upload_dirnode.add_file(name, u, overwrite=True)
+ return self._upload_dirnode.add_file(name, u, metadata={"version":1}, overwrite=True)
def _add_dir(name):
self._notifier.watch(to_filepath(path), mask=self.mask, callbacks=[self._notify], recursive=True)
u = Data("", self._convergence)
name += "@_"
- d2 = self._upload_dirnode.add_file(name, u, overwrite=True)
+ d2 = self._upload_dirnode.add_file(name, u, metadata={"version":1}, overwrite=True)
def _succeeded(ign):
self._log("created subdirectory %r" % (path,))
self._stats_provider.count('magic_folder.directories_created', 1)
d.addBoth(self._do_processed_callback)
return d
+ def _do_download_callback(self, res):
+ if self._download_ignore_count == 0:
+ self._download_callback(res)
+ else:
+ self._download_ignore_count -= 1
+ return None # intentionally suppress failures, which have already been logged
+
def _do_processed_callback(self, res):
if self._ignore_count == 0:
self._processed_callback(res)
self._download_callback = callback
self._download_ignore_count = ignore_count
-
def set_processed_callback(self, callback, ignore_count=0):
"""
set_processed_callback sets a function that will be called after a
self._ignore_count = ignore_count
def finish(self, for_tests=False):
+ self._stopped = True
self._notifier.stopReading()
self._stats_provider.count('magic_folder.dirs_monitored', -1)
+
if for_tests and hasattr(self._notifier, 'wait_until_stopped'):
- return self._notifier.wait_until_stopped()
+ d = self._notifier.wait_until_stopped()
else:
- return defer.succeed(None)
+ d = defer.succeed(None)
+
+ d.addCallback(lambda x: self._download_lazy_tail)
+ return d
def remove_service(self):
return service.MultiService.disownServiceParent(self)