from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
from allmydata.util.assertutil import precondition, _assert
from allmydata.util.deferredutil import HookMixin
+from allmydata.util.progress import PercentProgress
from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
extend_filepath, unicode_from_filepath, unicode_segments_from, \
quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
% quote_local_unicode_path(self._local_path_u))
self._deque = deque()
+ # do we also want to bound on "maximum age"?
+ self._process_history = deque(maxlen=10)
self._lazy_tail = defer.succeed(None)
self._stopped = False
self._turn_delay = 0
+ def get_status(self):
+ """
+ Returns an iterable of instances that implement IQueuedItem
+ """
+ for item in self._deque:
+ yield item
+ for item in self._process_history:
+ yield item
+
def _get_filepath(self, relpath_u):
self._log("_get_filepath(%r)" % (relpath_u,))
return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
self._log("stopped")
return
try:
- item = self._deque.pop()
- self._log("popped %r" % (item,))
+ item = IQueuedItem(self._deque.pop())
+ self._process_history.append(item)
+
+ self._log("popped %r, now have %d" % (item, len(self._deque)))
self._count('objects_queued', -1)
except IndexError:
self._log("deque is now empty")
self._lazy_tail.addBoth(self._logcb, "got past _process")
self._lazy_tail.addBoth(self._call_hook, 'processed', async=True)
self._lazy_tail.addBoth(self._logcb, "got past _call_hook (turn_delay = %r)" % (self._turn_delay,))
+ self._lazy_tail.addErrback(log.err)
self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
self._lazy_tail.addBoth(self._logcb, "got past deferLater")
except Exception as e:
raise
+from zope.interface import Interface, implementer
+
+class IQueuedItem(Interface):
+ pass
+
+
+@implementer(IQueuedItem)
+class QueuedItem(object):
+ def __init__(self, relpath_u, progress):
+ self.relpath_u = relpath_u
+ self.progress = progress
+ self._status_history = dict()
+
+ def set_status(self, status, current_time=None):
+ if current_time is None:
+ current_time = time.time()
+ self._status_history[status] = current_time
+
+ def status_time(self, state):
+ """
+ Returns None if there's no status-update for 'state', else returns
+ the timestamp when that state was reached.
+ """
+ return self._status_history.get(state, None)
+
+ def status_history(self):
+ """
+ Returns a list of 2-tuples of (state, timestamp) sorted by timestamp
+ """
+ hist = self._status_history.items()
+ hist.sort(lambda a, b: cmp(a[1], b[1]))
+ return hist
+
+
+class UploadItem(QueuedItem):
+ pass
+
+
class Uploader(QueueMixin):
def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
immediate=False):
def _extend_queue_and_keep_going(self, relpaths_u):
self._log("_extend_queue_and_keep_going %r" % (relpaths_u,))
- self._deque.extend(relpaths_u)
+ for relpath_u in relpaths_u:
+ progress = PercentProgress()
+ item = UploadItem(relpath_u, progress)
+ item.set_status('queued', self._clock.seconds())
+ self._deque.append(item)
+
self._count('objects_queued', len(relpaths_u))
if self.is_ready:
self._extend_queue_and_keep_going(self._pending)
def _add_pending(self, relpath_u):
- self._log("add pending %r" % (relpath_u,))
+ self._log("add pending %r" % (relpath_u,))
if not magicpath.should_ignore_file(relpath_u):
self._pending.add(relpath_u)
def _when_queue_is_empty(self):
return defer.succeed(None)
- def _process(self, relpath_u):
+ def _process(self, item):
# Uploader
+ relpath_u = item.relpath_u
self._log("_process(%r)" % (relpath_u,))
+ item.set_status('started', self._clock.seconds())
+
if relpath_u is None:
+ item.set_status('invalid_path', self._clock.seconds())
return
precondition(isinstance(relpath_u, unicode), relpath_u)
precondition(not relpath_u.endswith(u'/'), relpath_u)
metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
empty_uploadable = Data("", self._client.convergence)
- d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
- metadata=metadata, overwrite=True)
+ d2 = self._upload_dirnode.add_file(
+ encoded_path_u, empty_uploadable,
+ metadata=metadata,
+ overwrite=True,
+ progress=item.progress,
+ )
def _add_db_entry(filenode):
filecap = filenode.get_uri()
uploadable = Data("", self._client.convergence)
encoded_path_u += magicpath.path2magic(u"/")
self._log("encoded_path_u = %r" % (encoded_path_u,))
- upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
+ upload_d = self._upload_dirnode.add_file(
+ encoded_path_u, uploadable,
+ metadata={"version": 0},
+ overwrite=True,
+ progress=item.progress,
+ )
def _dir_succeeded(ign):
self._log("created subdirectory %r" % (relpath_u,))
self._count('directories_created')
metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
- d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
- metadata=metadata, overwrite=True)
+ d2 = self._upload_dirnode.add_file(
+ encoded_path_u, uploadable,
+ metadata=metadata,
+ overwrite=True,
+ progress=item.progress,
+ )
def _add_db_entry(filenode):
filecap = filenode.get_uri()
def _succeeded(res):
self._count('objects_succeeded')
+ item.set_status('success', self._clock.seconds())
return res
def _failed(f):
self._count('objects_failed')
self._log("%s while processing %r" % (f, relpath_u))
+ item.set_status('failure', self._clock.seconds())
return f
d.addCallbacks(_succeeded, _failed)
return d
return abspath_u
+class DownloadItem(QueuedItem):
+ def __init__(self, relpath_u, progress, filenode, metadata):
+ super(DownloadItem, self).__init__(relpath_u, progress)
+ self.file_node = filenode
+ self.metadata = metadata
+
+
class Downloader(QueueMixin, WriteFileMixin):
REMOTE_SCAN_INTERVAL = 3 # facilitates tests
def start_downloading(self):
self._log("start_downloading")
+ self._turn_delay = self.REMOTE_SCAN_INTERVAL
files = self._db.get_all_relpaths()
self._log("all files %s" % files)
file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
if self._should_download(relpath_u, metadata['version']):
- self._deque.append( (relpath_u, file_node, metadata) )
+ to_dl = DownloadItem(
+ relpath_u,
+ PercentProgress(file_node.get_size()),
+ file_node,
+ metadata,
+ )
+ to_dl.set_status('queued', self._clock.seconds())
+ self._deque.append(to_dl)
else:
self._log("Excluding %r" % (relpath_u,))
self._call_hook(None, 'processed', async=True)
def _process(self, item, now=None):
# Downloader
self._log("_process(%r)" % (item,))
- if now is None:
- now = time.time()
- (relpath_u, file_node, metadata) = item
- fp = self._get_filepath(relpath_u)
+ if now is None: # XXX why can we pass in now?
+ now = time.time() # self._clock.seconds()
+
+ self._log("started! %s" % (now,))
+ item.set_status('started', now)
+ fp = self._get_filepath(item.relpath_u)
abspath_u = unicode_from_filepath(fp)
conflict_path_u = self._get_conflicted_filename(abspath_u)
d = defer.succeed(None)
def do_update_db(written_abspath_u):
- filecap = file_node.get_uri()
- last_uploaded_uri = metadata.get('last_uploaded_uri', None)
+ filecap = item.file_node.get_uri()
+ last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
last_downloaded_uri = filecap
last_downloaded_timestamp = now
written_pathinfo = get_pathinfo(written_abspath_u)
- if not written_pathinfo.exists and not metadata.get('deleted', False):
+ if not written_pathinfo.exists and not item.metadata.get('deleted', False):
raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
- self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
- last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
+ self._db.did_upload_version(
+ item.relpath_u, item.metadata['version'], last_uploaded_uri,
+ last_downloaded_uri, last_downloaded_timestamp, written_pathinfo,
+ )
self._count('objects_downloaded')
+ item.set_status('success', self._clock.seconds())
+
def failed(f):
+ item.set_status('failure', self._clock.seconds())
self._log("download failed: %s" % (str(f),))
self._count('objects_failed')
return f
if os.path.isfile(conflict_path_u):
def fail(res):
- raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
+ raise ConflictError("download failed: already conflicted: %r" % (item.relpath_u,))
d.addCallback(fail)
else:
is_conflict = False
- db_entry = self._db.get_db_entry(relpath_u)
- dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
- dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
+ db_entry = self._db.get_db_entry(item.relpath_u)
+ dmd_last_downloaded_uri = item.metadata.get('last_downloaded_uri', None)
+ dmd_last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
if db_entry:
if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
is_conflict = True
self._count('objects_conflicted')
- elif self._is_upload_pending(relpath_u):
+ elif self._is_upload_pending(item.relpath_u):
is_conflict = True
self._count('objects_conflicted')
- if relpath_u.endswith(u"/"):
- if metadata.get('deleted', False):
+ if item.relpath_u.endswith(u"/"):
+ if item.metadata.get('deleted', False):
self._log("rmdir(%r) ignored" % (abspath_u,))
else:
self._log("mkdir(%r)" % (abspath_u,))
d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
d.addCallback(lambda ign: abspath_u)
else:
- if metadata.get('deleted', False):
+ if item.metadata.get('deleted', False):
d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
else:
- d.addCallback(lambda ign: file_node.download_best_version())
+ d.addCallback(lambda ign: item.file_node.download_best_version(progress=item.progress))
d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
is_conflict=is_conflict))