From: meejah Date: Thu, 12 Nov 2015 23:16:28 +0000 (-0700) Subject: Flesh out "tahoe magic-folder status" command X-Git-Url: https://git.rkrishnan.org/pf/content/somewhere?a=commitdiff_plain;h=144d31b4c355d87a1a4b7cf5ed5b3283fdca792e;p=tahoe-lafs%2Ftahoe-lafs.git Flesh out "tahoe magic-folder status" command Adds: - a JSON endpoint - CLI to display information - QueuedItem + IQueuedItem for uploader/downloader - IProgress interface + PercentProgress implementation - progress= args to many upload/download APIs --- diff --git a/src/allmydata/blacklist.py b/src/allmydata/blacklist.py index 9652c702..f0af41df 100644 --- a/src/allmydata/blacklist.py +++ b/src/allmydata/blacklist.py @@ -130,7 +130,7 @@ class ProhibitedNode: def get_best_readable_version(self): raise FileProhibited(self.reason) - def download_best_version(self): + def download_best_version(self, progress=None): raise FileProhibited(self.reason) def get_best_mutable_version(self): diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 9a0a5de8..a919d9a1 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -515,6 +515,7 @@ class Client(node.Node, pollmixin.PollMixin): from allmydata.frontends import magic_folder umask = self.get_config("magic_folder", "download.umask", 0077) s = magic_folder.MagicFolder(self, upload_dircap, collective_dircap, local_dir, dbfile, umask) + self._magic_folder = s s.setServiceParent(self) s.startService() diff --git a/src/allmydata/dirnode.py b/src/allmydata/dirnode.py index 5fddec41..4c17c663 100644 --- a/src/allmydata/dirnode.py +++ b/src/allmydata/dirnode.py @@ -588,7 +588,7 @@ class DirectoryNode: return d - def add_file(self, namex, uploadable, metadata=None, overwrite=True): + def add_file(self, namex, uploadable, metadata=None, overwrite=True, progress=None): """I upload a file (using the given IUploadable), then attach the resulting FileNode to the directory at the given name. I return a Deferred that fires (with the IFileNode of the uploaded file) when @@ -596,7 +596,7 @@ class DirectoryNode: name = normalize(namex) if self.is_readonly(): return defer.fail(NotWriteableError()) - d = self._uploader.upload(uploadable) + d = self._uploader.upload(uploadable, progress=progress) d.addCallback(lambda results: self._create_and_validate_node(results.get_uri(), None, name)) diff --git a/src/allmydata/frontends/magic_folder.py b/src/allmydata/frontends/magic_folder.py index afe26081..8a2b07e1 100644 --- a/src/allmydata/frontends/magic_folder.py +++ b/src/allmydata/frontends/magic_folder.py @@ -15,6 +15,7 @@ from allmydata.util import log from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError from allmydata.util.assertutil import precondition, _assert from allmydata.util.deferredutil import HookMixin +from allmydata.util.progress import PercentProgress from allmydata.util.encodingutil import listdir_filepath, to_filepath, \ extend_filepath, unicode_from_filepath, unicode_segments_from, \ quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError @@ -126,10 +127,21 @@ class QueueMixin(HookMixin): % quote_local_unicode_path(self._local_path_u)) self._deque = deque() + # do we also want to bound on "maximum age"? + self._process_history = deque(maxlen=10) self._lazy_tail = defer.succeed(None) self._stopped = False self._turn_delay = 0 + def get_status(self): + """ + Returns an iterable of instances that implement IQueuedItem + """ + for item in self._deque: + yield item + for item in self._process_history: + yield item + def _get_filepath(self, relpath_u): self._log("_get_filepath(%r)" % (relpath_u,)) return extend_filepath(self._local_filepath, relpath_u.split(u"/")) @@ -162,8 +174,10 @@ class QueueMixin(HookMixin): self._log("stopped") return try: - item = self._deque.pop() - self._log("popped %r" % (item,)) + item = IQueuedItem(self._deque.pop()) + self._process_history.append(item) + + self._log("popped %r, now have %d" % (item, len(self._deque))) self._count('objects_queued', -1) except IndexError: self._log("deque is now empty") @@ -177,6 +191,7 @@ class QueueMixin(HookMixin): self._lazy_tail.addBoth(self._logcb, "got past _process") self._lazy_tail.addBoth(self._call_hook, 'processed', async=True) self._lazy_tail.addBoth(self._logcb, "got past _call_hook (turn_delay = %r)" % (self._turn_delay,)) + self._lazy_tail.addErrback(log.err) self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque)) self._lazy_tail.addBoth(self._logcb, "got past deferLater") except Exception as e: @@ -184,6 +199,44 @@ class QueueMixin(HookMixin): raise +from zope.interface import Interface, implementer + +class IQueuedItem(Interface): + pass + + +@implementer(IQueuedItem) +class QueuedItem(object): + def __init__(self, relpath_u, progress): + self.relpath_u = relpath_u + self.progress = progress + self._status_history = dict() + + def set_status(self, status, current_time=None): + if current_time is None: + current_time = time.time() + self._status_history[status] = current_time + + def status_time(self, state): + """ + Returns None if there's no status-update for 'state', else returns + the timestamp when that state was reached. + """ + return self._status_history.get(state, None) + + def status_history(self): + """ + Returns a list of 2-tuples of (state, timestamp) sorted by timestamp + """ + hist = self._status_history.items() + hist.sort(lambda a, b: cmp(a[1], b[1])) + return hist + + +class UploadItem(QueuedItem): + pass + + class Uploader(QueueMixin): def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate=False): @@ -256,7 +309,12 @@ class Uploader(QueueMixin): def _extend_queue_and_keep_going(self, relpaths_u): self._log("_extend_queue_and_keep_going %r" % (relpaths_u,)) - self._deque.extend(relpaths_u) + for relpath_u in relpaths_u: + progress = PercentProgress() + item = UploadItem(relpath_u, progress) + item.set_status('queued', self._clock.seconds()) + self._deque.append(item) + self._count('objects_queued', len(relpaths_u)) if self.is_ready: @@ -273,7 +331,7 @@ class Uploader(QueueMixin): self._extend_queue_and_keep_going(self._pending) def _add_pending(self, relpath_u): - self._log("add pending %r" % (relpath_u,)) + self._log("add pending %r" % (relpath_u,)) if not magicpath.should_ignore_file(relpath_u): self._pending.add(relpath_u) @@ -327,10 +385,14 @@ class Uploader(QueueMixin): def _when_queue_is_empty(self): return defer.succeed(None) - def _process(self, relpath_u): + def _process(self, item): # Uploader + relpath_u = item.relpath_u self._log("_process(%r)" % (relpath_u,)) + item.set_status('started', self._clock.seconds()) + if relpath_u is None: + item.set_status('invalid_path', self._clock.seconds()) return precondition(isinstance(relpath_u, unicode), relpath_u) precondition(not relpath_u.endswith(u'/'), relpath_u) @@ -374,8 +436,12 @@ class Uploader(QueueMixin): metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri empty_uploadable = Data("", self._client.convergence) - d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable, - metadata=metadata, overwrite=True) + d2 = self._upload_dirnode.add_file( + encoded_path_u, empty_uploadable, + metadata=metadata, + overwrite=True, + progress=item.progress, + ) def _add_db_entry(filenode): filecap = filenode.get_uri() @@ -397,7 +463,12 @@ class Uploader(QueueMixin): uploadable = Data("", self._client.convergence) encoded_path_u += magicpath.path2magic(u"/") self._log("encoded_path_u = %r" % (encoded_path_u,)) - upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True) + upload_d = self._upload_dirnode.add_file( + encoded_path_u, uploadable, + metadata={"version": 0}, + overwrite=True, + progress=item.progress, + ) def _dir_succeeded(ign): self._log("created subdirectory %r" % (relpath_u,)) self._count('directories_created') @@ -428,8 +499,12 @@ class Uploader(QueueMixin): metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri uploadable = FileName(unicode_from_filepath(fp), self._client.convergence) - d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable, - metadata=metadata, overwrite=True) + d2 = self._upload_dirnode.add_file( + encoded_path_u, uploadable, + metadata=metadata, + overwrite=True, + progress=item.progress, + ) def _add_db_entry(filenode): filecap = filenode.get_uri() @@ -448,10 +523,12 @@ class Uploader(QueueMixin): def _succeeded(res): self._count('objects_succeeded') + item.set_status('success', self._clock.seconds()) return res def _failed(f): self._count('objects_failed') self._log("%s while processing %r" % (f, relpath_u)) + item.set_status('failure', self._clock.seconds()) return f d.addCallbacks(_succeeded, _failed) return d @@ -540,6 +617,13 @@ class WriteFileMixin(object): return abspath_u +class DownloadItem(QueuedItem): + def __init__(self, relpath_u, progress, filenode, metadata): + super(DownloadItem, self).__init__(relpath_u, progress) + self.file_node = filenode + self.metadata = metadata + + class Downloader(QueueMixin, WriteFileMixin): REMOTE_SCAN_INTERVAL = 3 # facilitates tests @@ -561,6 +645,7 @@ class Downloader(QueueMixin, WriteFileMixin): def start_downloading(self): self._log("start_downloading") + self._turn_delay = self.REMOTE_SCAN_INTERVAL files = self._db.get_all_relpaths() self._log("all files %s" % files) @@ -685,7 +770,14 @@ class Downloader(QueueMixin, WriteFileMixin): file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version']) if self._should_download(relpath_u, metadata['version']): - self._deque.append( (relpath_u, file_node, metadata) ) + to_dl = DownloadItem( + relpath_u, + PercentProgress(file_node.get_size()), + file_node, + metadata, + ) + to_dl.set_status('queued', self._clock.seconds()) + self._deque.append(to_dl) else: self._log("Excluding %r" % (relpath_u,)) self._call_hook(None, 'processed', async=True) @@ -703,42 +795,49 @@ class Downloader(QueueMixin, WriteFileMixin): def _process(self, item, now=None): # Downloader self._log("_process(%r)" % (item,)) - if now is None: - now = time.time() - (relpath_u, file_node, metadata) = item - fp = self._get_filepath(relpath_u) + if now is None: # XXX why can we pass in now? + now = time.time() # self._clock.seconds() + + self._log("started! %s" % (now,)) + item.set_status('started', now) + fp = self._get_filepath(item.relpath_u) abspath_u = unicode_from_filepath(fp) conflict_path_u = self._get_conflicted_filename(abspath_u) d = defer.succeed(None) def do_update_db(written_abspath_u): - filecap = file_node.get_uri() - last_uploaded_uri = metadata.get('last_uploaded_uri', None) + filecap = item.file_node.get_uri() + last_uploaded_uri = item.metadata.get('last_uploaded_uri', None) last_downloaded_uri = filecap last_downloaded_timestamp = now written_pathinfo = get_pathinfo(written_abspath_u) - if not written_pathinfo.exists and not metadata.get('deleted', False): + if not written_pathinfo.exists and not item.metadata.get('deleted', False): raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u)) - self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri, - last_downloaded_uri, last_downloaded_timestamp, written_pathinfo) + self._db.did_upload_version( + item.relpath_u, item.metadata['version'], last_uploaded_uri, + last_downloaded_uri, last_downloaded_timestamp, written_pathinfo, + ) self._count('objects_downloaded') + item.set_status('success', self._clock.seconds()) + def failed(f): + item.set_status('failure', self._clock.seconds()) self._log("download failed: %s" % (str(f),)) self._count('objects_failed') return f if os.path.isfile(conflict_path_u): def fail(res): - raise ConflictError("download failed: already conflicted: %r" % (relpath_u,)) + raise ConflictError("download failed: already conflicted: %r" % (item.relpath_u,)) d.addCallback(fail) else: is_conflict = False - db_entry = self._db.get_db_entry(relpath_u) - dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None) - dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None) + db_entry = self._db.get_db_entry(item.relpath_u) + dmd_last_downloaded_uri = item.metadata.get('last_downloaded_uri', None) + dmd_last_uploaded_uri = item.metadata.get('last_uploaded_uri', None) if db_entry: if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None: if dmd_last_downloaded_uri != db_entry.last_downloaded_uri: @@ -747,22 +846,22 @@ class Downloader(QueueMixin, WriteFileMixin): elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri: is_conflict = True self._count('objects_conflicted') - elif self._is_upload_pending(relpath_u): + elif self._is_upload_pending(item.relpath_u): is_conflict = True self._count('objects_conflicted') - if relpath_u.endswith(u"/"): - if metadata.get('deleted', False): + if item.relpath_u.endswith(u"/"): + if item.metadata.get('deleted', False): self._log("rmdir(%r) ignored" % (abspath_u,)) else: self._log("mkdir(%r)" % (abspath_u,)) d.addCallback(lambda ign: fileutil.make_dirs(abspath_u)) d.addCallback(lambda ign: abspath_u) else: - if metadata.get('deleted', False): + if item.metadata.get('deleted', False): d.addCallback(lambda ign: self._rename_deleted_file(abspath_u)) else: - d.addCallback(lambda ign: file_node.download_best_version()) + d.addCallback(lambda ign: item.file_node.download_best_version(progress=item.progress)) d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents, is_conflict=is_conflict)) diff --git a/src/allmydata/immutable/encode.py b/src/allmydata/immutable/encode.py index cf308dd3..8cc32d35 100644 --- a/src/allmydata/immutable/encode.py +++ b/src/allmydata/immutable/encode.py @@ -74,7 +74,7 @@ PiB=1024*TiB class Encoder(object): implements(IEncoder) - def __init__(self, log_parent=None, upload_status=None): + def __init__(self, log_parent=None, upload_status=None, progress=None): object.__init__(self) self.uri_extension_data = {} self._codec = None @@ -86,6 +86,7 @@ class Encoder(object): self._log_number = log.msg("creating Encoder %s" % self, facility="tahoe.encoder", parent=log_parent) self._aborted = False + self._progress = progress def __repr__(self): if hasattr(self, "_storage_index"): @@ -105,6 +106,8 @@ class Encoder(object): def _got_size(size): self.log(format="file size: %(size)d", size=size) self.file_size = size + if self._progress: + self._progress.set_progress_total(self.file_size) d.addCallback(_got_size) d.addCallback(lambda res: eu.get_all_encoding_parameters()) d.addCallback(self._got_all_encoding_parameters) @@ -436,6 +439,7 @@ class Encoder(object): shareid = shareids[i] d = self.send_block(shareid, segnum, block, lognum) dl.append(d) + block_hash = hashutil.block_hash(block) #from allmydata.util import base32 #log.msg("creating block (shareid=%d, blocknum=%d) " @@ -445,6 +449,14 @@ class Encoder(object): self.block_hashes[shareid].append(block_hash) dl = self._gather_responses(dl) + + def do_progress(ign): + done = self.segment_size * (segnum + 1) + if self._progress: + self._progress.set_progress(done) + return ign + dl.addCallback(do_progress) + def _logit(res): self.log("%s uploaded %s / %s bytes (%d%%) of your file." % (self, diff --git a/src/allmydata/immutable/filenode.py b/src/allmydata/immutable/filenode.py index 1c3780ba..77997767 100644 --- a/src/allmydata/immutable/filenode.py +++ b/src/allmydata/immutable/filenode.py @@ -8,7 +8,7 @@ from twisted.internet import defer from allmydata import uri from twisted.internet.interfaces import IConsumer from allmydata.interfaces import IImmutableFileNode, IUploadResults -from allmydata.util import consumer +from allmydata.util import consumer, progress from allmydata.check_results import CheckResults, CheckAndRepairResults from allmydata.util.dictutil import DictOfSets from allmydata.util.happinessutil import servers_of_happiness @@ -245,11 +245,13 @@ class ImmutableFileNode: # we keep it here, we should also put this on CiphertextFileNode def __hash__(self): return self.u.__hash__() + def __eq__(self, other): if isinstance(other, ImmutableFileNode): return self.u.__eq__(other.u) else: return False + def __ne__(self, other): if isinstance(other, ImmutableFileNode): return self.u.__eq__(other.u) @@ -273,12 +275,16 @@ class ImmutableFileNode: def get_uri(self): return self.u.to_string() + def get_cap(self): return self.u + def get_readcap(self): return self.u.get_readonly() + def get_verify_cap(self): return self.u.get_verify_cap() + def get_repair_cap(self): # CHK files can be repaired with just the verifycap return self.u.get_verify_cap() @@ -288,6 +294,7 @@ class ImmutableFileNode: def get_size(self): return self.u.get_size() + def get_current_size(self): return defer.succeed(self.get_size()) @@ -305,6 +312,7 @@ class ImmutableFileNode: def check_and_repair(self, monitor, verify=False, add_lease=False): return self._cnode.check_and_repair(monitor, verify, add_lease) + def check(self, monitor, verify=False, add_lease=False): return self._cnode.check(monitor, verify, add_lease) @@ -316,14 +324,13 @@ class ImmutableFileNode: """ return defer.succeed(self) - - def download_best_version(self): + def download_best_version(self, progress=None): """ Download the best version of this file, returning its contents as a bytestring. Since there is only one version of an immutable file, we download and return the contents of this file. """ - d = consumer.download_to_data(self) + d = consumer.download_to_data(self, progress=progress) return d # for an immutable file, download_to_data (specified in IReadable) diff --git a/src/allmydata/immutable/literal.py b/src/allmydata/immutable/literal.py index dc77b4ec..b18dccb6 100644 --- a/src/allmydata/immutable/literal.py +++ b/src/allmydata/immutable/literal.py @@ -113,7 +113,10 @@ class LiteralFileNode(_ImmutableFileNodeBase): return defer.succeed(self) - def download_best_version(self): + def download_best_version(self, progress=None): + if progress is not None: + progress.set_progress_total(len(self.u.data)) + progress.set_progress(len(self.u.data)) return defer.succeed(self.u.data) diff --git a/src/allmydata/immutable/offloaded.py b/src/allmydata/immutable/offloaded.py index 84390b1d..937dceae 100644 --- a/src/allmydata/immutable/offloaded.py +++ b/src/allmydata/immutable/offloaded.py @@ -137,7 +137,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): def __init__(self, storage_index, helper, storage_broker, secret_holder, incoming_file, encoding_file, - log_number): + log_number, progress=None): + upload.CHKUploader.__init__(self, storage_broker, secret_holder, progress=progress) self._storage_index = storage_index self._helper = helper self._incoming_file = incoming_file diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index c6324046..99168f4e 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -21,7 +21,7 @@ from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \ IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \ NoServersError, InsufficientVersionError, UploadUnhappinessError, \ - DEFAULT_MAX_SEGMENT_SIZE + DEFAULT_MAX_SEGMENT_SIZE, IProgress from allmydata.immutable import layout from pycryptopp.cipher.aes import AES @@ -623,7 +623,7 @@ class EncryptAnUploadable: implements(IEncryptedUploadable) CHUNKSIZE = 50*1024 - def __init__(self, original, log_parent=None): + def __init__(self, original, log_parent=None, progress=None): precondition(original.default_params_set, "set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,)) self.original = IUploadable(original) @@ -636,6 +636,7 @@ class EncryptAnUploadable: self._file_size = None self._ciphertext_bytes_read = 0 self._status = None + self._progress = progress def set_upload_status(self, upload_status): self._status = IUploadStatus(upload_status) @@ -656,6 +657,8 @@ class EncryptAnUploadable: self._file_size = size if self._status: self._status.set_size(size) + if self._progress: + self._progress.set_progress_total(size) return size d.addCallback(_got_size) return d @@ -894,7 +897,7 @@ class UploadStatus: class CHKUploader: server_selector_class = Tahoe2ServerSelector - def __init__(self, storage_broker, secret_holder): + def __init__(self, storage_broker, secret_holder, progress=None): # server_selector needs storage_broker and secret_holder self._storage_broker = storage_broker self._secret_holder = secret_holder @@ -904,6 +907,7 @@ class CHKUploader: self._upload_status = UploadStatus() self._upload_status.set_helper(False) self._upload_status.set_active(True) + self._progress = progress # locate_all_shareholders() will create the following attribute: # self._server_trackers = {} # k: shnum, v: instance of ServerTracker @@ -947,8 +951,11 @@ class CHKUploader: eu = IEncryptedUploadable(encrypted) started = time.time() - self._encoder = e = encode.Encoder(self._log_number, - self._upload_status) + self._encoder = e = encode.Encoder( + self._log_number, + self._upload_status, + progress=self._progress, + ) d = e.set_encrypted_uploadable(eu) d.addCallback(self.locate_all_shareholders, started) d.addCallback(self.set_shareholders, e) @@ -1073,12 +1080,13 @@ def read_this_many_bytes(uploadable, size, prepend_data=[]): class LiteralUploader: - def __init__(self): + def __init__(self, progress=None): self._status = s = UploadStatus() s.set_storage_index(None) s.set_helper(False) s.set_progress(0, 1.0) s.set_active(False) + self._progress = progress def start(self, uploadable): uploadable = IUploadable(uploadable) @@ -1086,6 +1094,8 @@ class LiteralUploader: def _got_size(size): self._size = size self._status.set_size(size) + if self._progress: + self._progress.set_progress_total(size) return read_this_many_bytes(uploadable, size) d.addCallback(_got_size) d.addCallback(lambda data: uri.LiteralFileURI("".join(data))) @@ -1109,6 +1119,8 @@ class LiteralUploader: self._status.set_progress(1, 1.0) self._status.set_progress(2, 1.0) self._status.set_results(ur) + if self._progress: + self._progress.set_progress(self._size) return ur def close(self): @@ -1503,12 +1515,13 @@ class Uploader(service.MultiService, log.PrefixingLogMixin): name = "uploader" URI_LIT_SIZE_THRESHOLD = 55 - def __init__(self, helper_furl=None, stats_provider=None, history=None): + def __init__(self, helper_furl=None, stats_provider=None, history=None, progress=None): self._helper_furl = helper_furl self.stats_provider = stats_provider self._history = history self._helper = None self._all_uploads = weakref.WeakKeyDictionary() # for debugging + self._progress = progress log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload") service.MultiService.__init__(self) @@ -1542,12 +1555,13 @@ class Uploader(service.MultiService, log.PrefixingLogMixin): return (self._helper_furl, bool(self._helper)) - def upload(self, uploadable): + def upload(self, uploadable, progress=None): """ Returns a Deferred that will fire with the UploadResults instance. """ assert self.parent assert self.running + assert progress is None or IProgress.providedBy(progress) uploadable = IUploadable(uploadable) d = uploadable.get_size() @@ -1556,13 +1570,15 @@ class Uploader(service.MultiService, log.PrefixingLogMixin): precondition(isinstance(default_params, dict), default_params) precondition("max_segment_size" in default_params, default_params) uploadable.set_default_encoding_parameters(default_params) + if progress: + progress.set_progress_total(size) if self.stats_provider: self.stats_provider.count('uploader.files_uploaded', 1) self.stats_provider.count('uploader.bytes_uploaded', size) if size <= self.URI_LIT_SIZE_THRESHOLD: - uploader = LiteralUploader() + uploader = LiteralUploader(progress=progress) return uploader.start(uploadable) else: eu = EncryptAnUploadable(uploadable, self._parentmsgid) @@ -1575,7 +1591,7 @@ class Uploader(service.MultiService, log.PrefixingLogMixin): else: storage_broker = self.parent.get_storage_broker() secret_holder = self.parent._secret_holder - uploader = CHKUploader(storage_broker, secret_holder) + uploader = CHKUploader(storage_broker, secret_holder, progress=progress) d2.addCallback(lambda x: uploader.start(eu)) self._all_uploads[uploader] = None diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 31fc6144..40ef040b 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -1,5 +1,5 @@ -from zope.interface import Interface +from zope.interface import Interface, Attribute from foolscap.api import StringConstraint, ListOf, TupleOf, SetOf, DictOf, \ ChoiceOf, IntegerConstraint, Any, RemoteInterface, Referenceable @@ -624,6 +624,38 @@ class MustNotBeUnknownRWError(CapConstraintError): """Cannot add an unknown child cap specified in a rw_uri field.""" +class IProgress(Interface): + """ + Remembers progress measured in arbitrary units. Users of these + instances must call ``set_progress_total`` at least once before + progress can be valid, and must use the same units for both + ``set_progress_total`` and ``set_progress calls``. + + See also: + :class:`allmydata.util.progress.PercentProgress` + """ + + progress = Attribute( + "Current amount of progress (in percentage)" + ) + + def set_progress(self, value): + """ + Sets the current amount of progress. + + Arbitrary units, but must match units used for + set_progress_total. + """ + + def set_progress_total(self, value): + """ + Sets the total amount of expected progress + + Arbitrary units, but must be same units as used when calling + set_progress() on this instance).. + """ + + class IReadable(Interface): """I represent a readable object -- either an immutable file, or a specific version of a mutable file. @@ -653,9 +685,12 @@ class IReadable(Interface): def get_size(): """Return the length (in bytes) of this readable object.""" - def download_to_data(): + def download_to_data(progress=None): """Download all of the file contents. I return a Deferred that fires - with the contents as a byte string.""" + with the contents as a byte string. + + :param progress: None or IProgress implementer + """ def read(consumer, offset=0, size=None): """Download a portion (possibly all) of the file's contents, making @@ -915,11 +950,13 @@ class IFileNode(IFilesystemNode): the Deferred will errback with an UnrecoverableFileError. """ - def download_best_version(): + def download_best_version(progress=None): """Download the contents of the version that would be returned by get_best_readable_version(). This is equivalent to calling download_to_data() on the IReadable given by that method. + progress is anything that implements IProgress + I return a Deferred that fires with a byte string when the file has been fully downloaded. To support streaming download, use the 'read' method of IReadable. If no version is recoverable, @@ -1065,7 +1102,7 @@ class IMutableFileNode(IFileNode): everything) to get increased visibility. """ - def upload(new_contents, servermap): + def upload(new_contents, servermap, progress=None): """Replace the contents of the file with new ones. This requires a servermap that was previously updated with MODE_WRITE. @@ -1086,6 +1123,8 @@ class IMutableFileNode(IFileNode): operation. If I do not signal UncoordinatedWriteError, then I was able to write the new version without incident. + ``progress`` is either None or an IProgress provider + I return a Deferred that fires (with a PublishStatus object) when the publish has completed. I will update the servermap in-place with the location of all new shares. @@ -1276,12 +1315,14 @@ class IDirectoryNode(IFilesystemNode): equivalent to calling set_node() multiple times, but is much more efficient.""" - def add_file(name, uploadable, metadata=None, overwrite=True): + def add_file(name, uploadable, metadata=None, overwrite=True, progress=None): """I upload a file (using the given IUploadable), then attach the resulting ImmutableFileNode to the directory at the given name. I set metadata the same way as set_uri and set_node. The child name must be a unicode string. + ``progress`` either provides IProgress or is None + I return a Deferred that fires (with the IFileNode of the uploaded file) when the operation completes.""" diff --git a/src/allmydata/magicfolderdb.py b/src/allmydata/magicfolderdb.py index 982b5fe2..661c774d 100644 --- a/src/allmydata/magicfolderdb.py +++ b/src/allmydata/magicfolderdb.py @@ -65,6 +65,7 @@ class MagicFolderDB(object): (relpath_u,)) row = self.cursor.fetchone() if not row: + print "found nothing for", relpath_u return None else: (size, mtime, ctime, version, last_uploaded_uri, last_downloaded_uri, last_downloaded_timestamp) = row diff --git a/src/allmydata/mutable/filenode.py b/src/allmydata/mutable/filenode.py index 50bbdd34..084c28f6 100644 --- a/src/allmydata/mutable/filenode.py +++ b/src/allmydata/mutable/filenode.py @@ -403,21 +403,21 @@ class MutableFileNode: return d.addCallback(_get_version, version) - def download_best_version(self): + def download_best_version(self, progress=None): """ I return a Deferred that fires with the contents of the best version of this mutable file. """ - return self._do_serialized(self._download_best_version) + return self._do_serialized(self._download_best_version, progress=progress) - def _download_best_version(self): + def _download_best_version(self, progress=None): """ I am the serialized sibling of download_best_version. """ d = self.get_best_readable_version() d.addCallback(self._record_size) - d.addCallback(lambda version: version.download_to_data()) + d.addCallback(lambda version: version.download_to_data(progress=progress)) # It is possible that the download will fail because there # aren't enough shares to be had. If so, we will try again after @@ -432,7 +432,7 @@ class MutableFileNode: d = self.get_best_mutable_version() d.addCallback(self._record_size) - d.addCallback(lambda version: version.download_to_data()) + d.addCallback(lambda version: version.download_to_data(progress=progress)) return d d.addErrback(_maybe_retry) @@ -935,13 +935,13 @@ class MutableFileVersion: return self._servermap.size_of_version(self._version) - def download_to_data(self, fetch_privkey=False): + def download_to_data(self, fetch_privkey=False, progress=None): """ I return a Deferred that fires with the contents of this readable object as a byte string. """ - c = consumer.MemoryConsumer() + c = consumer.MemoryConsumer(progress=progress) d = self.read(c, fetch_privkey=fetch_privkey) d.addCallback(lambda mc: "".join(mc.chunks)) return d diff --git a/src/allmydata/scripts/common_http.py b/src/allmydata/scripts/common_http.py index 7b965525..1964672b 100644 --- a/src/allmydata/scripts/common_http.py +++ b/src/allmydata/scripts/common_http.py @@ -31,6 +31,7 @@ class BadResponse(object): def __init__(self, url, err): self.status = -1 self.reason = "Error trying to connect to %s: %s" % (url, err) + self.error = err def read(self): return "" diff --git a/src/allmydata/scripts/magic_folder_cli.py b/src/allmydata/scripts/magic_folder_cli.py index 5d5c61dc..4c9a469b 100644 --- a/src/allmydata/scripts/magic_folder_cli.py +++ b/src/allmydata/scripts/magic_folder_cli.py @@ -1,7 +1,13 @@ import os +import urllib +from sys import stderr from types import NoneType from cStringIO import StringIO +from datetime import datetime + +import humanize +import simplejson from twisted.python import usage @@ -12,10 +18,13 @@ from .cli import MakeDirectoryOptions, LnOptions, CreateAliasOptions import tahoe_mv from allmydata.util.encodingutil import argv_to_abspath, argv_to_unicode, to_str, \ quote_local_unicode_path +from allmydata.scripts.common_http import do_http, format_http_success, \ + format_http_error, BadResponse from allmydata.util import fileutil from allmydata.util import configutil from allmydata import uri + INVITE_SEPARATOR = "+" class CreateOptions(BasedirOptions): @@ -200,21 +209,167 @@ class StatusOptions(BasedirOptions): nickname = None synopsis = "" stdin = StringIO("") + def parseArgs(self): BasedirOptions.parseArgs(self) node_url_file = os.path.join(self['node-directory'], u"node.url") - self['node-url'] = open(node_url_file, "r").read().strip() + with open(node_url_file, "r") as f: + self['node-url'] = f.read().strip() + + +def _get_json_for_fragment(options, fragment): + nodeurl = options['node-url'] + if nodeurl.endswith('/'): + nodeurl = nodeurl[:-1] + + url = u'%s/%s' % (nodeurl, fragment) + resp = do_http(method, url) + if isinstance(resp, BadResponse): + # specifically NOT using format_http_error() here because the + # URL is pretty sensitive (we're doing /uri/). + raise RuntimeError( + "Failed to get json from '%s': %s" % (nodeurl, resp.error) + ) + + data = resp.read() + parsed = simplejson.loads(data) + if not parsed: + raise RuntimeError("No data from '%s'" % (nodeurl,)) + return parsed + + +def _get_json_for_cap(options, cap): + return _get_json_for_fragment( + options, + 'uri/%s?t=json' % urllib.quote(cap), + ) + +def _print_item_status(item, now, longest): + paddedname = (' ' * (longest - len(item['path']))) + item['path'] + if 'failure_at' in item: + ts = datetime.fromtimestamp(item['started_at']) + prog = 'Failed %s (%s)' % (humanize.naturaltime(now - ts), ts) + elif item['percent_done'] < 100.0: + if 'started_at' not in item: + prog = 'not yet started' + else: + so_far = now - datetime.fromtimestamp(item['started_at']) + if so_far.seconds > 0.0: + rate = item['percent_done'] / so_far.seconds + if rate != 0: + time_left = (100.0 - item['percent_done']) / rate + prog = '%2.1f%% done, around %s left' % ( + item['percent_done'], + humanize.naturaldelta(time_left), + ) + else: + time_left = None + prog = '%2.1f%% done' % (item['percent_done'],) + else: + prog = 'just started' + else: + prog = '' + for verb in ['finished', 'started', 'queued']: + keyname = verb + '_at' + if keyname in item: + when = datetime.fromtimestamp(item[keyname]) + prog = '%s %s' % (verb, humanize.naturaltime(now - when)) + break + + print " %s: %s" % (paddedname, prog) def status(options): - # XXX todo: use http interface to ask about our magic-folder upload status + nodedir = options["node-directory"] + with open(os.path.join(nodedir, u"private", u"magic_folder_dircap")) as f: + dmd_cap = f.read().strip() + with open(os.path.join(nodedir, u"private", u"collective_dircap")) as f: + collective_readcap = f.read().strip() + + try: + captype, dmd = _get_json_for_cap(options, dmd_cap) + if captype != 'dirnode': + print >>stderr, "magic_folder_dircap isn't a directory capability" + return 2 + except RuntimeError as e: + print >>stderr, str(e) + return 1 + + now = datetime.now() + + print "Local files:" + for (name, child) in dmd['children'].items(): + captype, meta = child + status = 'good' + size = meta['size'] + created = datetime.fromtimestamp(meta['metadata']['tahoe']['linkcrtime']) + version = meta['metadata']['version'] + nice_size = humanize.naturalsize(size) + nice_created = humanize.naturaltime(now - created) + if captype != 'filenode': + print "%20s: error, should be a filecap" % name + continue + print " %s (%s): %s, version=%s, created %s" % (name, nice_size, status, version, nice_created) + + captype, collective = _get_json_for_cap(options, collective_readcap) + print + print "Remote files:" + for (name, data) in collective['children'].items(): + if data[0] != 'dirnode': + print "Error: '%s': expected a dirnode, not '%s'" % (name, data[0]) + print " %s's remote:" % name + dmd = _get_json_for_cap(options, data[1]['ro_uri']) + if dmd[0] != 'dirnode': + print "Error: should be a dirnode" + continue + for (n, d) in dmd[1]['children'].items(): + if d[0] != 'filenode': + print "Error: expected '%s' to be a filenode." % (n,) + + meta = d[1] + status = 'good' + size = meta['size'] + created = datetime.fromtimestamp(meta['metadata']['tahoe']['linkcrtime']) + version = meta['metadata']['version'] + nice_size = humanize.naturalsize(size) + nice_created = humanize.naturaltime(now - created) + print " %s (%s): %s, version=%s, created %s" % (n, nice_size, status, version, nice_created) + + magicdata = _get_json_for_fragment(options, 'magic_folder?t=json') + if len(magicdata): + uploads = [item for item in magicdata if item['kind'] == 'upload'] + downloads = [item for item in magicdata if item['kind'] == 'download'] + longest = max([len(item['path']) for item in magicdata]) + + if True: # maybe --show-completed option or something? + uploads = [item for item in uploads if item['status'] != 'success'] + downloads = [item for item in downloads if item['status'] != 'success'] + + if len(uploads): + print + print "Uploads:" + for item in uploads: + _print_item_status(item, now, longest) + + if len(downloads): + print + print "Downloads:" + for item in downloads: + _print_item_status(item, now, longest) + + for item in magicdata: + if item['status'] == 'failure': + print "Failed:", item + return 0 + class MagicFolderCommand(BaseOptions): subCommands = [ ["create", None, CreateOptions, "Create a Magic Folder."], ["invite", None, InviteOptions, "Invite someone to a Magic Folder."], ["join", None, JoinOptions, "Join a Magic Folder."], ["leave", None, LeaveOptions, "Leave a Magic Folder."], + ["status", None, StatusOptions, "Display stutus of uploads/downloads."], ] def postOptions(self): if not hasattr(self, 'subOptions'): @@ -234,6 +389,7 @@ subDispatch = { "invite": invite, "join": join, "leave": leave, + "status": status, } def do_magic_folder(options): diff --git a/src/allmydata/test/common.py b/src/allmydata/test/common.py index 6bf2f974..723254b5 100644 --- a/src/allmydata/test/common.py +++ b/src/allmydata/test/common.py @@ -151,8 +151,8 @@ class FakeCHKFileNode: return defer.succeed(self) - def download_to_data(self): - return download_to_data(self) + def download_to_data(self, progress=None): + return download_to_data(self, progress=progress) download_best_version = download_to_data @@ -329,11 +329,11 @@ class FakeMutableFileNode: d.addCallback(_done) return d - def download_best_version(self): - return defer.succeed(self._download_best_version()) + def download_best_version(self, progress=None): + return defer.succeed(self._download_best_version(progress=progress)) - def _download_best_version(self, ignored=None): + def _download_best_version(self, ignored=None, progress=None): if isinstance(self.my_uri, uri.LiteralFileURI): return self.my_uri.data if self.storage_index not in self.all_contents: diff --git a/src/allmydata/test/test_dirnode.py b/src/allmydata/test/test_dirnode.py index c65114fb..65366709 100644 --- a/src/allmydata/test/test_dirnode.py +++ b/src/allmydata/test/test_dirnode.py @@ -1519,7 +1519,7 @@ class FakeMutableFile: def get_write_uri(self): return self.uri.to_string() - def download_best_version(self): + def download_best_version(self, progress=None): return defer.succeed(self.data) def get_writekey(self): diff --git a/src/allmydata/test/test_magic_folder.py b/src/allmydata/test/test_magic_folder.py index e862fced..d974e5de 100644 --- a/src/allmydata/test/test_magic_folder.py +++ b/src/allmydata/test/test_magic_folder.py @@ -1,5 +1,6 @@ import os, sys +import shutil from twisted.trial import unittest from twisted.internet import defer, task diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py index ec329515..824f18c1 100644 --- a/src/allmydata/test/test_web.py +++ b/src/allmydata/test/test_web.py @@ -83,7 +83,7 @@ class FakeUploader(service.Service): helper_furl = None helper_connected = False - def upload(self, uploadable): + def upload(self, uploadable, **kw): d = uploadable.get_size() d.addCallback(lambda size: uploadable.read(size)) def _got_data(datav): diff --git a/src/allmydata/util/consumer.py b/src/allmydata/util/consumer.py index 4128c200..a48fb59d 100644 --- a/src/allmydata/util/consumer.py +++ b/src/allmydata/util/consumer.py @@ -8,9 +8,12 @@ from twisted.internet.interfaces import IConsumer class MemoryConsumer: implements(IConsumer) - def __init__(self): + + def __init__(self, progress=None): self.chunks = [] self.done = False + self._progress = progress + def registerProducer(self, p, streaming): self.producer = p if streaming: @@ -19,12 +22,19 @@ class MemoryConsumer: else: while not self.done: p.resumeProducing() + def write(self, data): self.chunks.append(data) + if self._progress is not None: + self._progress.set_progress(sum([len(c) for c in self.chunks])) + def unregisterProducer(self): self.done = True -def download_to_data(n, offset=0, size=None): - d = n.read(MemoryConsumer(), offset, size) +def download_to_data(n, offset=0, size=None, progress=None): + """ + :param on_progress: if set, a single-arg callable that receives total bytes downloaded + """ + d = n.read(MemoryConsumer(progress=progress), offset, size) d.addCallback(lambda mc: "".join(mc.chunks)) return d diff --git a/src/allmydata/util/deferredutil.py b/src/allmydata/util/deferredutil.py index 9a212378..b0526304 100644 --- a/src/allmydata/util/deferredutil.py +++ b/src/allmydata/util/deferredutil.py @@ -116,7 +116,7 @@ class HookMixin: """ hook = self._hooks[name] if hook is None: - return None + return res # pass on error/result (d, ignore_count) = hook self._log("call_hook %r, ignore_count=%r" % (name, ignore_count)) diff --git a/src/allmydata/util/progress.py b/src/allmydata/util/progress.py new file mode 100644 index 00000000..3618c88d --- /dev/null +++ b/src/allmydata/util/progress.py @@ -0,0 +1,37 @@ +""" +Utilities relating to computing progress information. + +Ties in with the "consumer" module also +""" + +from allmydata.interfaces import IProgress +from zope.interface import implementer + + +@implementer(IProgress) +class PercentProgress(object): + """ + Represents progress as a percentage, from 0.0 to 100.0 + """ + + def __init__(self, total_size=None): + self._value = 0.0 + self.set_progress_total(total_size) + + def set_progress(self, value): + "IProgress API" + self._value = value + + def set_progress_total(self, size): + "IProgress API" + if size is not None: + size = float(size) + self._total_size = size + + @property + def progress(self): + if self._total_size is None: + return 0 # or 1.0? + if self._total_size <= 0.0: + return 0 + return (self._value / self._total_size) * 100.0 diff --git a/src/allmydata/web/magic_folder.py b/src/allmydata/web/magic_folder.py new file mode 100644 index 00000000..5d2f3e5e --- /dev/null +++ b/src/allmydata/web/magic_folder.py @@ -0,0 +1,60 @@ +import simplejson + +from nevow import rend, url, tags as T +from nevow.inevow import IRequest + +from allmydata.web.common import getxmlfile, get_arg, WebError + + +class MagicFolderWebApi(rend.Page): + """ + I provide the web-based API for Magic Folder status etc. + """ + + def __init__(self, client): + ##rend.Page.__init__(self, storage) + super(MagicFolderWebApi, self).__init__(client) + self.client = client + + def _render_json(self, req): + req.setHeader("content-type", "application/json") + + data = [] + for item in self.client._magic_folder.uploader.get_status(): + d = dict( + path=item.relpath_u, + status=item.status_history()[-1][0], + kind='upload', + ) + for (status, ts) in item.status_history(): + d[status + '_at'] = ts + d['percent_done'] = item.progress.progress + data.append(d) + + for item in self.client._magic_folder.downloader.get_status(): + d = dict( + path=item.relpath_u, + status=item.status_history()[-1][0], + kind='download', + ) + for (status, ts) in item.status_history(): + d[status + '_at'] = ts + d['percent_done'] = item.progress.progress + data.append(d) + + return simplejson.dumps(data) + + def renderHTTP(self, ctx): + req = IRequest(ctx) + t = get_arg(req, "t", None) + + if t is None: + return rend.Page.renderHTTP(self, ctx) + + t = t.strip() + if t == 'json': + return self._render_json(req) + + raise WebError("'%s' invalid type for 't' arg" % (t,), 400) + + diff --git a/src/allmydata/web/root.py b/src/allmydata/web/root.py index d8d789cf..6b284810 100644 --- a/src/allmydata/web/root.py +++ b/src/allmydata/web/root.py @@ -12,7 +12,7 @@ from allmydata import get_package_versions_string from allmydata.util import log from allmydata.interfaces import IFileNode from allmydata.web import filenode, directory, unlinked, status, operations -from allmydata.web import storage +from allmydata.web import storage, magic_folder from allmydata.web.common import abbreviate_size, getxmlfile, WebError, \ get_arg, RenderMixin, get_format, get_mutable_type, render_time_delta, render_time, render_time_attr @@ -154,6 +154,9 @@ class Root(rend.Page): self.child_uri = URIHandler(client) self.child_cap = URIHandler(client) + # handler for "/magic_folder" URIs + self.child_magic_folder = magic_folder.MagicFolderWebApi(client) + self.child_file = FileHandler(client) self.child_named = FileHandler(client) self.child_status = status.Status(client.get_history())