self._upload_readonly_dircap = upload_readonly_dircap
self._turn_delay = self.REMOTE_SCAN_INTERVAL
- self._download_scan_batch = {} # path -> [(filenode, metadata)]
def start_scanning(self):
self._log("start_scanning")
collective_dirmap_d.addCallback(highest_version)
return collective_dirmap_d
- def _append_to_batch(self, name, file_node, metadata):
- if self._download_scan_batch.has_key(name):
- self._download_scan_batch[name] += [(file_node, metadata)]
- else:
- self._download_scan_batch[name] = [(file_node, metadata)]
-
- def _scan_remote(self, nickname, dirnode):
- self._log("_scan_remote nickname %r" % (nickname,))
+ def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
+ self._log("_scan_remote_dmd nickname %r" % (nickname,))
d = dirnode.list()
def scan_listing(listing_map):
for encoded_relpath_u in listing_map.keys():
local_version = self._get_local_latest(relpath_u)
remote_version = metadata.get('version', None)
self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
+
if local_version is None or remote_version is None or local_version < remote_version:
self._log("%r added to download queue" % (relpath_u,))
- self._append_to_batch(relpath_u, file_node, metadata)
+ if scan_batch.has_key(relpath_u):
+ scan_batch[relpath_u] += [(file_node, metadata)]
+ else:
+ scan_batch[relpath_u] = [(file_node, metadata)]
+
d.addCallback(scan_listing)
- d.addBoth(self._logcb, "end of _scan_remote")
+ d.addBoth(self._logcb, "end of _scan_remote_dmd")
return d
def _scan_remote_collective(self):
self._log("_scan_remote_collective")
- self._download_scan_batch = {} # XXX
+ scan_batch = {} # path -> [(filenode, metadata)]
d = self._collective_dirnode.list()
def scan_collective(dirmap):
for dir_name in dirmap:
(dirnode, metadata) = dirmap[dir_name]
if dirnode.get_readonly_uri() != self._upload_readonly_dircap:
- d2.addCallback(lambda ign, dir_name=dir_name: self._scan_remote(dir_name, dirnode))
+ d2.addCallback(lambda ign, dir_name=dir_name:
+ self._scan_remote_dmd(dir_name, dirnode, scan_batch))
def _err(f):
self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
# XXX what should we do to make this failure more visible to users?
d2.addErrback(_err)
+
return d2
d.addCallback(scan_collective)
- d.addCallback(self._filter_scan_batch)
- d.addCallback(self._add_batch_to_download_queue)
- return d
- def _add_batch_to_download_queue(self, result):
- self._log("result = %r" % (result,))
- self._log("deque = %r" % (self._deque,))
- self._deque.extend(result)
- self._log("deque after = %r" % (self._deque,))
- self._count('objects_queued', len(result))
-
- def _filter_scan_batch(self, result):
- self._log("_filter_scan_batch")
- extension = [] # consider whether this should be a dict
- for relpath_u in self._download_scan_batch.keys():
- if relpath_u in self._pending:
- continue
- file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
- if self._should_download(relpath_u, metadata['version']):
- extension += [(relpath_u, file_node, metadata)]
- else:
- self._log("Excluding %r" % (relpath_u,))
- self._count('objects_excluded')
- self._call_hook(None, 'processed')
- return extension
+ def _filter_batch_to_deque(ign):
+ self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
+ for relpath_u in scan_batch.keys():
+ file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
+
+ if self._should_download(relpath_u, metadata['version']):
+ self._deque.append( (relpath_u, file_node, metadata) )
+ else:
+ self._log("Excluding %r" % (relpath_u,))
+ self._count('objects_excluded')
+ self._call_hook(None, 'processed')
+
+ self._log("deque after = %r" % (self._deque,))
+ d.addCallback(_filter_batch_to_deque)
+ return d
def _when_queue_is_empty(self):
d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)