X-Git-Url: https://git.rkrishnan.org/?a=blobdiff_plain;f=src%2Fallmydata%2Ffrontends%2Fmagic_folder.py;fp=src%2Fallmydata%2Ffrontends%2Fmagic_folder.py;h=8a2b07e1be1633c06ffdfeeb05930c12607cdfb9;hb=144d31b4c355d87a1a4b7cf5ed5b3283fdca792e;hp=afe26081dc4909c134009ef00ab6c9476be4a9dc;hpb=f660aa78ab02b90b3b8edf7521ed2a2515e0e7f2;p=tahoe-lafs%2Ftahoe-lafs.git diff --git a/src/allmydata/frontends/magic_folder.py b/src/allmydata/frontends/magic_folder.py index afe26081..8a2b07e1 100644 --- a/src/allmydata/frontends/magic_folder.py +++ b/src/allmydata/frontends/magic_folder.py @@ -15,6 +15,7 @@ from allmydata.util import log from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError from allmydata.util.assertutil import precondition, _assert from allmydata.util.deferredutil import HookMixin +from allmydata.util.progress import PercentProgress from allmydata.util.encodingutil import listdir_filepath, to_filepath, \ extend_filepath, unicode_from_filepath, unicode_segments_from, \ quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError @@ -126,10 +127,21 @@ class QueueMixin(HookMixin): % quote_local_unicode_path(self._local_path_u)) self._deque = deque() + # do we also want to bound on "maximum age"? + self._process_history = deque(maxlen=10) self._lazy_tail = defer.succeed(None) self._stopped = False self._turn_delay = 0 + def get_status(self): + """ + Returns an iterable of instances that implement IQueuedItem + """ + for item in self._deque: + yield item + for item in self._process_history: + yield item + def _get_filepath(self, relpath_u): self._log("_get_filepath(%r)" % (relpath_u,)) return extend_filepath(self._local_filepath, relpath_u.split(u"/")) @@ -162,8 +174,10 @@ class QueueMixin(HookMixin): self._log("stopped") return try: - item = self._deque.pop() - self._log("popped %r" % (item,)) + item = IQueuedItem(self._deque.pop()) + self._process_history.append(item) + + self._log("popped %r, now have %d" % (item, len(self._deque))) self._count('objects_queued', -1) except IndexError: self._log("deque is now empty") @@ -177,6 +191,7 @@ class QueueMixin(HookMixin): self._lazy_tail.addBoth(self._logcb, "got past _process") self._lazy_tail.addBoth(self._call_hook, 'processed', async=True) self._lazy_tail.addBoth(self._logcb, "got past _call_hook (turn_delay = %r)" % (self._turn_delay,)) + self._lazy_tail.addErrback(log.err) self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque)) self._lazy_tail.addBoth(self._logcb, "got past deferLater") except Exception as e: @@ -184,6 +199,44 @@ class QueueMixin(HookMixin): raise +from zope.interface import Interface, implementer + +class IQueuedItem(Interface): + pass + + +@implementer(IQueuedItem) +class QueuedItem(object): + def __init__(self, relpath_u, progress): + self.relpath_u = relpath_u + self.progress = progress + self._status_history = dict() + + def set_status(self, status, current_time=None): + if current_time is None: + current_time = time.time() + self._status_history[status] = current_time + + def status_time(self, state): + """ + Returns None if there's no status-update for 'state', else returns + the timestamp when that state was reached. + """ + return self._status_history.get(state, None) + + def status_history(self): + """ + Returns a list of 2-tuples of (state, timestamp) sorted by timestamp + """ + hist = self._status_history.items() + hist.sort(lambda a, b: cmp(a[1], b[1])) + return hist + + +class UploadItem(QueuedItem): + pass + + class Uploader(QueueMixin): def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate=False): @@ -256,7 +309,12 @@ class Uploader(QueueMixin): def _extend_queue_and_keep_going(self, relpaths_u): self._log("_extend_queue_and_keep_going %r" % (relpaths_u,)) - self._deque.extend(relpaths_u) + for relpath_u in relpaths_u: + progress = PercentProgress() + item = UploadItem(relpath_u, progress) + item.set_status('queued', self._clock.seconds()) + self._deque.append(item) + self._count('objects_queued', len(relpaths_u)) if self.is_ready: @@ -273,7 +331,7 @@ class Uploader(QueueMixin): self._extend_queue_and_keep_going(self._pending) def _add_pending(self, relpath_u): - self._log("add pending %r" % (relpath_u,)) + self._log("add pending %r" % (relpath_u,)) if not magicpath.should_ignore_file(relpath_u): self._pending.add(relpath_u) @@ -327,10 +385,14 @@ class Uploader(QueueMixin): def _when_queue_is_empty(self): return defer.succeed(None) - def _process(self, relpath_u): + def _process(self, item): # Uploader + relpath_u = item.relpath_u self._log("_process(%r)" % (relpath_u,)) + item.set_status('started', self._clock.seconds()) + if relpath_u is None: + item.set_status('invalid_path', self._clock.seconds()) return precondition(isinstance(relpath_u, unicode), relpath_u) precondition(not relpath_u.endswith(u'/'), relpath_u) @@ -374,8 +436,12 @@ class Uploader(QueueMixin): metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri empty_uploadable = Data("", self._client.convergence) - d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable, - metadata=metadata, overwrite=True) + d2 = self._upload_dirnode.add_file( + encoded_path_u, empty_uploadable, + metadata=metadata, + overwrite=True, + progress=item.progress, + ) def _add_db_entry(filenode): filecap = filenode.get_uri() @@ -397,7 +463,12 @@ class Uploader(QueueMixin): uploadable = Data("", self._client.convergence) encoded_path_u += magicpath.path2magic(u"/") self._log("encoded_path_u = %r" % (encoded_path_u,)) - upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True) + upload_d = self._upload_dirnode.add_file( + encoded_path_u, uploadable, + metadata={"version": 0}, + overwrite=True, + progress=item.progress, + ) def _dir_succeeded(ign): self._log("created subdirectory %r" % (relpath_u,)) self._count('directories_created') @@ -428,8 +499,12 @@ class Uploader(QueueMixin): metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri uploadable = FileName(unicode_from_filepath(fp), self._client.convergence) - d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable, - metadata=metadata, overwrite=True) + d2 = self._upload_dirnode.add_file( + encoded_path_u, uploadable, + metadata=metadata, + overwrite=True, + progress=item.progress, + ) def _add_db_entry(filenode): filecap = filenode.get_uri() @@ -448,10 +523,12 @@ class Uploader(QueueMixin): def _succeeded(res): self._count('objects_succeeded') + item.set_status('success', self._clock.seconds()) return res def _failed(f): self._count('objects_failed') self._log("%s while processing %r" % (f, relpath_u)) + item.set_status('failure', self._clock.seconds()) return f d.addCallbacks(_succeeded, _failed) return d @@ -540,6 +617,13 @@ class WriteFileMixin(object): return abspath_u +class DownloadItem(QueuedItem): + def __init__(self, relpath_u, progress, filenode, metadata): + super(DownloadItem, self).__init__(relpath_u, progress) + self.file_node = filenode + self.metadata = metadata + + class Downloader(QueueMixin, WriteFileMixin): REMOTE_SCAN_INTERVAL = 3 # facilitates tests @@ -561,6 +645,7 @@ class Downloader(QueueMixin, WriteFileMixin): def start_downloading(self): self._log("start_downloading") + self._turn_delay = self.REMOTE_SCAN_INTERVAL files = self._db.get_all_relpaths() self._log("all files %s" % files) @@ -685,7 +770,14 @@ class Downloader(QueueMixin, WriteFileMixin): file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version']) if self._should_download(relpath_u, metadata['version']): - self._deque.append( (relpath_u, file_node, metadata) ) + to_dl = DownloadItem( + relpath_u, + PercentProgress(file_node.get_size()), + file_node, + metadata, + ) + to_dl.set_status('queued', self._clock.seconds()) + self._deque.append(to_dl) else: self._log("Excluding %r" % (relpath_u,)) self._call_hook(None, 'processed', async=True) @@ -703,42 +795,49 @@ class Downloader(QueueMixin, WriteFileMixin): def _process(self, item, now=None): # Downloader self._log("_process(%r)" % (item,)) - if now is None: - now = time.time() - (relpath_u, file_node, metadata) = item - fp = self._get_filepath(relpath_u) + if now is None: # XXX why can we pass in now? + now = time.time() # self._clock.seconds() + + self._log("started! %s" % (now,)) + item.set_status('started', now) + fp = self._get_filepath(item.relpath_u) abspath_u = unicode_from_filepath(fp) conflict_path_u = self._get_conflicted_filename(abspath_u) d = defer.succeed(None) def do_update_db(written_abspath_u): - filecap = file_node.get_uri() - last_uploaded_uri = metadata.get('last_uploaded_uri', None) + filecap = item.file_node.get_uri() + last_uploaded_uri = item.metadata.get('last_uploaded_uri', None) last_downloaded_uri = filecap last_downloaded_timestamp = now written_pathinfo = get_pathinfo(written_abspath_u) - if not written_pathinfo.exists and not metadata.get('deleted', False): + if not written_pathinfo.exists and not item.metadata.get('deleted', False): raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u)) - self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri, - last_downloaded_uri, last_downloaded_timestamp, written_pathinfo) + self._db.did_upload_version( + item.relpath_u, item.metadata['version'], last_uploaded_uri, + last_downloaded_uri, last_downloaded_timestamp, written_pathinfo, + ) self._count('objects_downloaded') + item.set_status('success', self._clock.seconds()) + def failed(f): + item.set_status('failure', self._clock.seconds()) self._log("download failed: %s" % (str(f),)) self._count('objects_failed') return f if os.path.isfile(conflict_path_u): def fail(res): - raise ConflictError("download failed: already conflicted: %r" % (relpath_u,)) + raise ConflictError("download failed: already conflicted: %r" % (item.relpath_u,)) d.addCallback(fail) else: is_conflict = False - db_entry = self._db.get_db_entry(relpath_u) - dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None) - dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None) + db_entry = self._db.get_db_entry(item.relpath_u) + dmd_last_downloaded_uri = item.metadata.get('last_downloaded_uri', None) + dmd_last_uploaded_uri = item.metadata.get('last_uploaded_uri', None) if db_entry: if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None: if dmd_last_downloaded_uri != db_entry.last_downloaded_uri: @@ -747,22 +846,22 @@ class Downloader(QueueMixin, WriteFileMixin): elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri: is_conflict = True self._count('objects_conflicted') - elif self._is_upload_pending(relpath_u): + elif self._is_upload_pending(item.relpath_u): is_conflict = True self._count('objects_conflicted') - if relpath_u.endswith(u"/"): - if metadata.get('deleted', False): + if item.relpath_u.endswith(u"/"): + if item.metadata.get('deleted', False): self._log("rmdir(%r) ignored" % (abspath_u,)) else: self._log("mkdir(%r)" % (abspath_u,)) d.addCallback(lambda ign: fileutil.make_dirs(abspath_u)) d.addCallback(lambda ign: abspath_u) else: - if metadata.get('deleted', False): + if item.metadata.get('deleted', False): d.addCallback(lambda ign: self._rename_deleted_file(abspath_u)) else: - d.addCallback(lambda ign: file_node.download_best_version()) + d.addCallback(lambda ign: item.file_node.download_best_version(progress=item.progress)) d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents, is_conflict=is_conflict))