def get_best_readable_version(self):
raise FileProhibited(self.reason)
- def download_best_version(self):
+ def download_best_version(self, progress=None):
raise FileProhibited(self.reason)
def get_best_mutable_version(self):
from allmydata.frontends import magic_folder
umask = self.get_config("magic_folder", "download.umask", 0077)
s = magic_folder.MagicFolder(self, upload_dircap, collective_dircap, local_dir, dbfile, umask)
+ self._magic_folder = s
s.setServiceParent(self)
s.startService()
return d
- def add_file(self, namex, uploadable, metadata=None, overwrite=True):
+ def add_file(self, namex, uploadable, metadata=None, overwrite=True, progress=None):
"""I upload a file (using the given IUploadable), then attach the
resulting FileNode to the directory at the given name. I return a
Deferred that fires (with the IFileNode of the uploaded file) when
name = normalize(namex)
if self.is_readonly():
return defer.fail(NotWriteableError())
- d = self._uploader.upload(uploadable)
+ d = self._uploader.upload(uploadable, progress=progress)
d.addCallback(lambda results:
self._create_and_validate_node(results.get_uri(), None,
name))
from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
from allmydata.util.assertutil import precondition, _assert
from allmydata.util.deferredutil import HookMixin
+from allmydata.util.progress import PercentProgress
from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
extend_filepath, unicode_from_filepath, unicode_segments_from, \
quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
% quote_local_unicode_path(self._local_path_u))
self._deque = deque()
+ # do we also want to bound on "maximum age"?
+ self._process_history = deque(maxlen=10)
self._lazy_tail = defer.succeed(None)
self._stopped = False
self._turn_delay = 0
+ def get_status(self):
+ """
+ Returns an iterable of instances that implement IQueuedItem
+ """
+ for item in self._deque:
+ yield item
+ for item in self._process_history:
+ yield item
+
def _get_filepath(self, relpath_u):
self._log("_get_filepath(%r)" % (relpath_u,))
return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
self._log("stopped")
return
try:
- item = self._deque.pop()
- self._log("popped %r" % (item,))
+ item = IQueuedItem(self._deque.pop())
+ self._process_history.append(item)
+
+ self._log("popped %r, now have %d" % (item, len(self._deque)))
self._count('objects_queued', -1)
except IndexError:
self._log("deque is now empty")
self._lazy_tail.addBoth(self._logcb, "got past _process")
self._lazy_tail.addBoth(self._call_hook, 'processed', async=True)
self._lazy_tail.addBoth(self._logcb, "got past _call_hook (turn_delay = %r)" % (self._turn_delay,))
+ self._lazy_tail.addErrback(log.err)
self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
self._lazy_tail.addBoth(self._logcb, "got past deferLater")
except Exception as e:
raise
+from zope.interface import Interface, implementer
+
+class IQueuedItem(Interface):
+ pass
+
+
+@implementer(IQueuedItem)
+class QueuedItem(object):
+ def __init__(self, relpath_u, progress):
+ self.relpath_u = relpath_u
+ self.progress = progress
+ self._status_history = dict()
+
+ def set_status(self, status, current_time=None):
+ if current_time is None:
+ current_time = time.time()
+ self._status_history[status] = current_time
+
+ def status_time(self, state):
+ """
+ Returns None if there's no status-update for 'state', else returns
+ the timestamp when that state was reached.
+ """
+ return self._status_history.get(state, None)
+
+ def status_history(self):
+ """
+ Returns a list of 2-tuples of (state, timestamp) sorted by timestamp
+ """
+ hist = self._status_history.items()
+ hist.sort(lambda a, b: cmp(a[1], b[1]))
+ return hist
+
+
+class UploadItem(QueuedItem):
+ pass
+
+
class Uploader(QueueMixin):
def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
immediate=False):
def _extend_queue_and_keep_going(self, relpaths_u):
self._log("_extend_queue_and_keep_going %r" % (relpaths_u,))
- self._deque.extend(relpaths_u)
+ for relpath_u in relpaths_u:
+ progress = PercentProgress()
+ item = UploadItem(relpath_u, progress)
+ item.set_status('queued', self._clock.seconds())
+ self._deque.append(item)
+
self._count('objects_queued', len(relpaths_u))
if self.is_ready:
self._extend_queue_and_keep_going(self._pending)
def _add_pending(self, relpath_u):
- self._log("add pending %r" % (relpath_u,))
+ self._log("add pending %r" % (relpath_u,))
if not magicpath.should_ignore_file(relpath_u):
self._pending.add(relpath_u)
def _when_queue_is_empty(self):
return defer.succeed(None)
- def _process(self, relpath_u):
+ def _process(self, item):
# Uploader
+ relpath_u = item.relpath_u
self._log("_process(%r)" % (relpath_u,))
+ item.set_status('started', self._clock.seconds())
+
if relpath_u is None:
+ item.set_status('invalid_path', self._clock.seconds())
return
precondition(isinstance(relpath_u, unicode), relpath_u)
precondition(not relpath_u.endswith(u'/'), relpath_u)
metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
empty_uploadable = Data("", self._client.convergence)
- d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
- metadata=metadata, overwrite=True)
+ d2 = self._upload_dirnode.add_file(
+ encoded_path_u, empty_uploadable,
+ metadata=metadata,
+ overwrite=True,
+ progress=item.progress,
+ )
def _add_db_entry(filenode):
filecap = filenode.get_uri()
uploadable = Data("", self._client.convergence)
encoded_path_u += magicpath.path2magic(u"/")
self._log("encoded_path_u = %r" % (encoded_path_u,))
- upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
+ upload_d = self._upload_dirnode.add_file(
+ encoded_path_u, uploadable,
+ metadata={"version": 0},
+ overwrite=True,
+ progress=item.progress,
+ )
def _dir_succeeded(ign):
self._log("created subdirectory %r" % (relpath_u,))
self._count('directories_created')
metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
- d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
- metadata=metadata, overwrite=True)
+ d2 = self._upload_dirnode.add_file(
+ encoded_path_u, uploadable,
+ metadata=metadata,
+ overwrite=True,
+ progress=item.progress,
+ )
def _add_db_entry(filenode):
filecap = filenode.get_uri()
def _succeeded(res):
self._count('objects_succeeded')
+ item.set_status('success', self._clock.seconds())
return res
def _failed(f):
self._count('objects_failed')
self._log("%s while processing %r" % (f, relpath_u))
+ item.set_status('failure', self._clock.seconds())
return f
d.addCallbacks(_succeeded, _failed)
return d
return abspath_u
+class DownloadItem(QueuedItem):
+ def __init__(self, relpath_u, progress, filenode, metadata):
+ super(DownloadItem, self).__init__(relpath_u, progress)
+ self.file_node = filenode
+ self.metadata = metadata
+
+
class Downloader(QueueMixin, WriteFileMixin):
REMOTE_SCAN_INTERVAL = 3 # facilitates tests
def start_downloading(self):
self._log("start_downloading")
+ self._turn_delay = self.REMOTE_SCAN_INTERVAL
files = self._db.get_all_relpaths()
self._log("all files %s" % files)
file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
if self._should_download(relpath_u, metadata['version']):
- self._deque.append( (relpath_u, file_node, metadata) )
+ to_dl = DownloadItem(
+ relpath_u,
+ PercentProgress(file_node.get_size()),
+ file_node,
+ metadata,
+ )
+ to_dl.set_status('queued', self._clock.seconds())
+ self._deque.append(to_dl)
else:
self._log("Excluding %r" % (relpath_u,))
self._call_hook(None, 'processed', async=True)
def _process(self, item, now=None):
# Downloader
self._log("_process(%r)" % (item,))
- if now is None:
- now = time.time()
- (relpath_u, file_node, metadata) = item
- fp = self._get_filepath(relpath_u)
+ if now is None: # XXX why can we pass in now?
+ now = time.time() # self._clock.seconds()
+
+ self._log("started! %s" % (now,))
+ item.set_status('started', now)
+ fp = self._get_filepath(item.relpath_u)
abspath_u = unicode_from_filepath(fp)
conflict_path_u = self._get_conflicted_filename(abspath_u)
d = defer.succeed(None)
def do_update_db(written_abspath_u):
- filecap = file_node.get_uri()
- last_uploaded_uri = metadata.get('last_uploaded_uri', None)
+ filecap = item.file_node.get_uri()
+ last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
last_downloaded_uri = filecap
last_downloaded_timestamp = now
written_pathinfo = get_pathinfo(written_abspath_u)
- if not written_pathinfo.exists and not metadata.get('deleted', False):
+ if not written_pathinfo.exists and not item.metadata.get('deleted', False):
raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
- self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
- last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
+ self._db.did_upload_version(
+ item.relpath_u, item.metadata['version'], last_uploaded_uri,
+ last_downloaded_uri, last_downloaded_timestamp, written_pathinfo,
+ )
self._count('objects_downloaded')
+ item.set_status('success', self._clock.seconds())
+
def failed(f):
+ item.set_status('failure', self._clock.seconds())
self._log("download failed: %s" % (str(f),))
self._count('objects_failed')
return f
if os.path.isfile(conflict_path_u):
def fail(res):
- raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
+ raise ConflictError("download failed: already conflicted: %r" % (item.relpath_u,))
d.addCallback(fail)
else:
is_conflict = False
- db_entry = self._db.get_db_entry(relpath_u)
- dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
- dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
+ db_entry = self._db.get_db_entry(item.relpath_u)
+ dmd_last_downloaded_uri = item.metadata.get('last_downloaded_uri', None)
+ dmd_last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
if db_entry:
if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
is_conflict = True
self._count('objects_conflicted')
- elif self._is_upload_pending(relpath_u):
+ elif self._is_upload_pending(item.relpath_u):
is_conflict = True
self._count('objects_conflicted')
- if relpath_u.endswith(u"/"):
- if metadata.get('deleted', False):
+ if item.relpath_u.endswith(u"/"):
+ if item.metadata.get('deleted', False):
self._log("rmdir(%r) ignored" % (abspath_u,))
else:
self._log("mkdir(%r)" % (abspath_u,))
d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
d.addCallback(lambda ign: abspath_u)
else:
- if metadata.get('deleted', False):
+ if item.metadata.get('deleted', False):
d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
else:
- d.addCallback(lambda ign: file_node.download_best_version())
+ d.addCallback(lambda ign: item.file_node.download_best_version(progress=item.progress))
d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
is_conflict=is_conflict))
class Encoder(object):
implements(IEncoder)
- def __init__(self, log_parent=None, upload_status=None):
+ def __init__(self, log_parent=None, upload_status=None, progress=None):
object.__init__(self)
self.uri_extension_data = {}
self._codec = None
self._log_number = log.msg("creating Encoder %s" % self,
facility="tahoe.encoder", parent=log_parent)
self._aborted = False
+ self._progress = progress
def __repr__(self):
if hasattr(self, "_storage_index"):
def _got_size(size):
self.log(format="file size: %(size)d", size=size)
self.file_size = size
+ if self._progress:
+ self._progress.set_progress_total(self.file_size)
d.addCallback(_got_size)
d.addCallback(lambda res: eu.get_all_encoding_parameters())
d.addCallback(self._got_all_encoding_parameters)
shareid = shareids[i]
d = self.send_block(shareid, segnum, block, lognum)
dl.append(d)
+
block_hash = hashutil.block_hash(block)
#from allmydata.util import base32
#log.msg("creating block (shareid=%d, blocknum=%d) "
self.block_hashes[shareid].append(block_hash)
dl = self._gather_responses(dl)
+
+ def do_progress(ign):
+ done = self.segment_size * (segnum + 1)
+ if self._progress:
+ self._progress.set_progress(done)
+ return ign
+ dl.addCallback(do_progress)
+
def _logit(res):
self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
(self,
from allmydata import uri
from twisted.internet.interfaces import IConsumer
from allmydata.interfaces import IImmutableFileNode, IUploadResults
-from allmydata.util import consumer
+from allmydata.util import consumer, progress
from allmydata.check_results import CheckResults, CheckAndRepairResults
from allmydata.util.dictutil import DictOfSets
from allmydata.util.happinessutil import servers_of_happiness
# we keep it here, we should also put this on CiphertextFileNode
def __hash__(self):
return self.u.__hash__()
+
def __eq__(self, other):
if isinstance(other, ImmutableFileNode):
return self.u.__eq__(other.u)
else:
return False
+
def __ne__(self, other):
if isinstance(other, ImmutableFileNode):
return self.u.__eq__(other.u)
def get_uri(self):
return self.u.to_string()
+
def get_cap(self):
return self.u
+
def get_readcap(self):
return self.u.get_readonly()
+
def get_verify_cap(self):
return self.u.get_verify_cap()
+
def get_repair_cap(self):
# CHK files can be repaired with just the verifycap
return self.u.get_verify_cap()
def get_size(self):
return self.u.get_size()
+
def get_current_size(self):
return defer.succeed(self.get_size())
def check_and_repair(self, monitor, verify=False, add_lease=False):
return self._cnode.check_and_repair(monitor, verify, add_lease)
+
def check(self, monitor, verify=False, add_lease=False):
return self._cnode.check(monitor, verify, add_lease)
"""
return defer.succeed(self)
-
- def download_best_version(self):
+ def download_best_version(self, progress=None):
"""
Download the best version of this file, returning its contents
as a bytestring. Since there is only one version of an immutable
file, we download and return the contents of this file.
"""
- d = consumer.download_to_data(self)
+ d = consumer.download_to_data(self, progress=progress)
return d
# for an immutable file, download_to_data (specified in IReadable)
return defer.succeed(self)
- def download_best_version(self):
+ def download_best_version(self, progress=None):
+ if progress is not None:
+ progress.set_progress_total(len(self.u.data))
+ progress.set_progress(len(self.u.data))
return defer.succeed(self.u.data)
def __init__(self, storage_index,
helper, storage_broker, secret_holder,
incoming_file, encoding_file,
- log_number):
+ log_number, progress=None):
+ upload.CHKUploader.__init__(self, storage_broker, secret_holder, progress=progress)
self._storage_index = storage_index
self._helper = helper
self._incoming_file = incoming_file
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
NoServersError, InsufficientVersionError, UploadUnhappinessError, \
- DEFAULT_MAX_SEGMENT_SIZE
+ DEFAULT_MAX_SEGMENT_SIZE, IProgress
from allmydata.immutable import layout
from pycryptopp.cipher.aes import AES
implements(IEncryptedUploadable)
CHUNKSIZE = 50*1024
- def __init__(self, original, log_parent=None):
+ def __init__(self, original, log_parent=None, progress=None):
precondition(original.default_params_set,
"set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,))
self.original = IUploadable(original)
self._file_size = None
self._ciphertext_bytes_read = 0
self._status = None
+ self._progress = progress
def set_upload_status(self, upload_status):
self._status = IUploadStatus(upload_status)
self._file_size = size
if self._status:
self._status.set_size(size)
+ if self._progress:
+ self._progress.set_progress_total(size)
return size
d.addCallback(_got_size)
return d
class CHKUploader:
server_selector_class = Tahoe2ServerSelector
- def __init__(self, storage_broker, secret_holder):
+ def __init__(self, storage_broker, secret_holder, progress=None):
# server_selector needs storage_broker and secret_holder
self._storage_broker = storage_broker
self._secret_holder = secret_holder
self._upload_status = UploadStatus()
self._upload_status.set_helper(False)
self._upload_status.set_active(True)
+ self._progress = progress
# locate_all_shareholders() will create the following attribute:
# self._server_trackers = {} # k: shnum, v: instance of ServerTracker
eu = IEncryptedUploadable(encrypted)
started = time.time()
- self._encoder = e = encode.Encoder(self._log_number,
- self._upload_status)
+ self._encoder = e = encode.Encoder(
+ self._log_number,
+ self._upload_status,
+ progress=self._progress,
+ )
d = e.set_encrypted_uploadable(eu)
d.addCallback(self.locate_all_shareholders, started)
d.addCallback(self.set_shareholders, e)
class LiteralUploader:
- def __init__(self):
+ def __init__(self, progress=None):
self._status = s = UploadStatus()
s.set_storage_index(None)
s.set_helper(False)
s.set_progress(0, 1.0)
s.set_active(False)
+ self._progress = progress
def start(self, uploadable):
uploadable = IUploadable(uploadable)
def _got_size(size):
self._size = size
self._status.set_size(size)
+ if self._progress:
+ self._progress.set_progress_total(size)
return read_this_many_bytes(uploadable, size)
d.addCallback(_got_size)
d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
self._status.set_progress(1, 1.0)
self._status.set_progress(2, 1.0)
self._status.set_results(ur)
+ if self._progress:
+ self._progress.set_progress(self._size)
return ur
def close(self):
name = "uploader"
URI_LIT_SIZE_THRESHOLD = 55
- def __init__(self, helper_furl=None, stats_provider=None, history=None):
+ def __init__(self, helper_furl=None, stats_provider=None, history=None, progress=None):
self._helper_furl = helper_furl
self.stats_provider = stats_provider
self._history = history
self._helper = None
self._all_uploads = weakref.WeakKeyDictionary() # for debugging
+ self._progress = progress
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
service.MultiService.__init__(self)
return (self._helper_furl, bool(self._helper))
- def upload(self, uploadable):
+ def upload(self, uploadable, progress=None):
"""
Returns a Deferred that will fire with the UploadResults instance.
"""
assert self.parent
assert self.running
+ assert progress is None or IProgress.providedBy(progress)
uploadable = IUploadable(uploadable)
d = uploadable.get_size()
precondition(isinstance(default_params, dict), default_params)
precondition("max_segment_size" in default_params, default_params)
uploadable.set_default_encoding_parameters(default_params)
+ if progress:
+ progress.set_progress_total(size)
if self.stats_provider:
self.stats_provider.count('uploader.files_uploaded', 1)
self.stats_provider.count('uploader.bytes_uploaded', size)
if size <= self.URI_LIT_SIZE_THRESHOLD:
- uploader = LiteralUploader()
+ uploader = LiteralUploader(progress=progress)
return uploader.start(uploadable)
else:
eu = EncryptAnUploadable(uploadable, self._parentmsgid)
else:
storage_broker = self.parent.get_storage_broker()
secret_holder = self.parent._secret_holder
- uploader = CHKUploader(storage_broker, secret_holder)
+ uploader = CHKUploader(storage_broker, secret_holder, progress=progress)
d2.addCallback(lambda x: uploader.start(eu))
self._all_uploads[uploader] = None
-from zope.interface import Interface
+from zope.interface import Interface, Attribute
from foolscap.api import StringConstraint, ListOf, TupleOf, SetOf, DictOf, \
ChoiceOf, IntegerConstraint, Any, RemoteInterface, Referenceable
"""Cannot add an unknown child cap specified in a rw_uri field."""
+class IProgress(Interface):
+ """
+ Remembers progress measured in arbitrary units. Users of these
+ instances must call ``set_progress_total`` at least once before
+ progress can be valid, and must use the same units for both
+ ``set_progress_total`` and ``set_progress calls``.
+
+ See also:
+ :class:`allmydata.util.progress.PercentProgress`
+ """
+
+ progress = Attribute(
+ "Current amount of progress (in percentage)"
+ )
+
+ def set_progress(self, value):
+ """
+ Sets the current amount of progress.
+
+ Arbitrary units, but must match units used for
+ set_progress_total.
+ """
+
+ def set_progress_total(self, value):
+ """
+ Sets the total amount of expected progress
+
+ Arbitrary units, but must be same units as used when calling
+ set_progress() on this instance)..
+ """
+
+
class IReadable(Interface):
"""I represent a readable object -- either an immutable file, or a
specific version of a mutable file.
def get_size():
"""Return the length (in bytes) of this readable object."""
- def download_to_data():
+ def download_to_data(progress=None):
"""Download all of the file contents. I return a Deferred that fires
- with the contents as a byte string."""
+ with the contents as a byte string.
+
+ :param progress: None or IProgress implementer
+ """
def read(consumer, offset=0, size=None):
"""Download a portion (possibly all) of the file's contents, making
the Deferred will errback with an UnrecoverableFileError.
"""
- def download_best_version():
+ def download_best_version(progress=None):
"""Download the contents of the version that would be returned
by get_best_readable_version(). This is equivalent to calling
download_to_data() on the IReadable given by that method.
+ progress is anything that implements IProgress
+
I return a Deferred that fires with a byte string when the file
has been fully downloaded. To support streaming download, use
the 'read' method of IReadable. If no version is recoverable,
everything) to get increased visibility.
"""
- def upload(new_contents, servermap):
+ def upload(new_contents, servermap, progress=None):
"""Replace the contents of the file with new ones. This requires a
servermap that was previously updated with MODE_WRITE.
operation. If I do not signal UncoordinatedWriteError, then I was
able to write the new version without incident.
+ ``progress`` is either None or an IProgress provider
+
I return a Deferred that fires (with a PublishStatus object) when the
publish has completed. I will update the servermap in-place with the
location of all new shares.
equivalent to calling set_node() multiple times, but is much more
efficient."""
- def add_file(name, uploadable, metadata=None, overwrite=True):
+ def add_file(name, uploadable, metadata=None, overwrite=True, progress=None):
"""I upload a file (using the given IUploadable), then attach the
resulting ImmutableFileNode to the directory at the given name. I set
metadata the same way as set_uri and set_node. The child name must be
a unicode string.
+ ``progress`` either provides IProgress or is None
+
I return a Deferred that fires (with the IFileNode of the uploaded
file) when the operation completes."""
(relpath_u,))
row = self.cursor.fetchone()
if not row:
+ print "found nothing for", relpath_u
return None
else:
(size, mtime, ctime, version, last_uploaded_uri, last_downloaded_uri, last_downloaded_timestamp) = row
return d.addCallback(_get_version, version)
- def download_best_version(self):
+ def download_best_version(self, progress=None):
"""
I return a Deferred that fires with the contents of the best
version of this mutable file.
"""
- return self._do_serialized(self._download_best_version)
+ return self._do_serialized(self._download_best_version, progress=progress)
- def _download_best_version(self):
+ def _download_best_version(self, progress=None):
"""
I am the serialized sibling of download_best_version.
"""
d = self.get_best_readable_version()
d.addCallback(self._record_size)
- d.addCallback(lambda version: version.download_to_data())
+ d.addCallback(lambda version: version.download_to_data(progress=progress))
# It is possible that the download will fail because there
# aren't enough shares to be had. If so, we will try again after
d = self.get_best_mutable_version()
d.addCallback(self._record_size)
- d.addCallback(lambda version: version.download_to_data())
+ d.addCallback(lambda version: version.download_to_data(progress=progress))
return d
d.addErrback(_maybe_retry)
return self._servermap.size_of_version(self._version)
- def download_to_data(self, fetch_privkey=False):
+ def download_to_data(self, fetch_privkey=False, progress=None):
"""
I return a Deferred that fires with the contents of this
readable object as a byte string.
"""
- c = consumer.MemoryConsumer()
+ c = consumer.MemoryConsumer(progress=progress)
d = self.read(c, fetch_privkey=fetch_privkey)
d.addCallback(lambda mc: "".join(mc.chunks))
return d
def __init__(self, url, err):
self.status = -1
self.reason = "Error trying to connect to %s: %s" % (url, err)
+ self.error = err
def read(self):
return ""
import os
+import urllib
+from sys import stderr
from types import NoneType
from cStringIO import StringIO
+from datetime import datetime
+
+import humanize
+import simplejson
from twisted.python import usage
import tahoe_mv
from allmydata.util.encodingutil import argv_to_abspath, argv_to_unicode, to_str, \
quote_local_unicode_path
+from allmydata.scripts.common_http import do_http, format_http_success, \
+ format_http_error, BadResponse
from allmydata.util import fileutil
from allmydata.util import configutil
from allmydata import uri
+
INVITE_SEPARATOR = "+"
class CreateOptions(BasedirOptions):
nickname = None
synopsis = ""
stdin = StringIO("")
+
def parseArgs(self):
BasedirOptions.parseArgs(self)
node_url_file = os.path.join(self['node-directory'], u"node.url")
- self['node-url'] = open(node_url_file, "r").read().strip()
+ with open(node_url_file, "r") as f:
+ self['node-url'] = f.read().strip()
+
+
+def _get_json_for_fragment(options, fragment):
+ nodeurl = options['node-url']
+ if nodeurl.endswith('/'):
+ nodeurl = nodeurl[:-1]
+
+ url = u'%s/%s' % (nodeurl, fragment)
+ resp = do_http(method, url)
+ if isinstance(resp, BadResponse):
+ # specifically NOT using format_http_error() here because the
+ # URL is pretty sensitive (we're doing /uri/<key>).
+ raise RuntimeError(
+ "Failed to get json from '%s': %s" % (nodeurl, resp.error)
+ )
+
+ data = resp.read()
+ parsed = simplejson.loads(data)
+ if not parsed:
+ raise RuntimeError("No data from '%s'" % (nodeurl,))
+ return parsed
+
+
+def _get_json_for_cap(options, cap):
+ return _get_json_for_fragment(
+ options,
+ 'uri/%s?t=json' % urllib.quote(cap),
+ )
+
+def _print_item_status(item, now, longest):
+ paddedname = (' ' * (longest - len(item['path']))) + item['path']
+ if 'failure_at' in item:
+ ts = datetime.fromtimestamp(item['started_at'])
+ prog = 'Failed %s (%s)' % (humanize.naturaltime(now - ts), ts)
+ elif item['percent_done'] < 100.0:
+ if 'started_at' not in item:
+ prog = 'not yet started'
+ else:
+ so_far = now - datetime.fromtimestamp(item['started_at'])
+ if so_far.seconds > 0.0:
+ rate = item['percent_done'] / so_far.seconds
+ if rate != 0:
+ time_left = (100.0 - item['percent_done']) / rate
+ prog = '%2.1f%% done, around %s left' % (
+ item['percent_done'],
+ humanize.naturaldelta(time_left),
+ )
+ else:
+ time_left = None
+ prog = '%2.1f%% done' % (item['percent_done'],)
+ else:
+ prog = 'just started'
+ else:
+ prog = ''
+ for verb in ['finished', 'started', 'queued']:
+ keyname = verb + '_at'
+ if keyname in item:
+ when = datetime.fromtimestamp(item[keyname])
+ prog = '%s %s' % (verb, humanize.naturaltime(now - when))
+ break
+
+ print " %s: %s" % (paddedname, prog)
def status(options):
- # XXX todo: use http interface to ask about our magic-folder upload status
+ nodedir = options["node-directory"]
+ with open(os.path.join(nodedir, u"private", u"magic_folder_dircap")) as f:
+ dmd_cap = f.read().strip()
+ with open(os.path.join(nodedir, u"private", u"collective_dircap")) as f:
+ collective_readcap = f.read().strip()
+
+ try:
+ captype, dmd = _get_json_for_cap(options, dmd_cap)
+ if captype != 'dirnode':
+ print >>stderr, "magic_folder_dircap isn't a directory capability"
+ return 2
+ except RuntimeError as e:
+ print >>stderr, str(e)
+ return 1
+
+ now = datetime.now()
+
+ print "Local files:"
+ for (name, child) in dmd['children'].items():
+ captype, meta = child
+ status = 'good'
+ size = meta['size']
+ created = datetime.fromtimestamp(meta['metadata']['tahoe']['linkcrtime'])
+ version = meta['metadata']['version']
+ nice_size = humanize.naturalsize(size)
+ nice_created = humanize.naturaltime(now - created)
+ if captype != 'filenode':
+ print "%20s: error, should be a filecap" % name
+ continue
+ print " %s (%s): %s, version=%s, created %s" % (name, nice_size, status, version, nice_created)
+
+ captype, collective = _get_json_for_cap(options, collective_readcap)
+ print
+ print "Remote files:"
+ for (name, data) in collective['children'].items():
+ if data[0] != 'dirnode':
+ print "Error: '%s': expected a dirnode, not '%s'" % (name, data[0])
+ print " %s's remote:" % name
+ dmd = _get_json_for_cap(options, data[1]['ro_uri'])
+ if dmd[0] != 'dirnode':
+ print "Error: should be a dirnode"
+ continue
+ for (n, d) in dmd[1]['children'].items():
+ if d[0] != 'filenode':
+ print "Error: expected '%s' to be a filenode." % (n,)
+
+ meta = d[1]
+ status = 'good'
+ size = meta['size']
+ created = datetime.fromtimestamp(meta['metadata']['tahoe']['linkcrtime'])
+ version = meta['metadata']['version']
+ nice_size = humanize.naturalsize(size)
+ nice_created = humanize.naturaltime(now - created)
+ print " %s (%s): %s, version=%s, created %s" % (n, nice_size, status, version, nice_created)
+
+ magicdata = _get_json_for_fragment(options, 'magic_folder?t=json')
+ if len(magicdata):
+ uploads = [item for item in magicdata if item['kind'] == 'upload']
+ downloads = [item for item in magicdata if item['kind'] == 'download']
+ longest = max([len(item['path']) for item in magicdata])
+
+ if True: # maybe --show-completed option or something?
+ uploads = [item for item in uploads if item['status'] != 'success']
+ downloads = [item for item in downloads if item['status'] != 'success']
+
+ if len(uploads):
+ print
+ print "Uploads:"
+ for item in uploads:
+ _print_item_status(item, now, longest)
+
+ if len(downloads):
+ print
+ print "Downloads:"
+ for item in downloads:
+ _print_item_status(item, now, longest)
+
+ for item in magicdata:
+ if item['status'] == 'failure':
+ print "Failed:", item
+
return 0
+
class MagicFolderCommand(BaseOptions):
subCommands = [
["create", None, CreateOptions, "Create a Magic Folder."],
["invite", None, InviteOptions, "Invite someone to a Magic Folder."],
["join", None, JoinOptions, "Join a Magic Folder."],
["leave", None, LeaveOptions, "Leave a Magic Folder."],
+ ["status", None, StatusOptions, "Display stutus of uploads/downloads."],
]
def postOptions(self):
if not hasattr(self, 'subOptions'):
"invite": invite,
"join": join,
"leave": leave,
+ "status": status,
}
def do_magic_folder(options):
return defer.succeed(self)
- def download_to_data(self):
- return download_to_data(self)
+ def download_to_data(self, progress=None):
+ return download_to_data(self, progress=progress)
download_best_version = download_to_data
d.addCallback(_done)
return d
- def download_best_version(self):
- return defer.succeed(self._download_best_version())
+ def download_best_version(self, progress=None):
+ return defer.succeed(self._download_best_version(progress=progress))
- def _download_best_version(self, ignored=None):
+ def _download_best_version(self, ignored=None, progress=None):
if isinstance(self.my_uri, uri.LiteralFileURI):
return self.my_uri.data
if self.storage_index not in self.all_contents:
def get_write_uri(self):
return self.uri.to_string()
- def download_best_version(self):
+ def download_best_version(self, progress=None):
return defer.succeed(self.data)
def get_writekey(self):
import os, sys
+import shutil
from twisted.trial import unittest
from twisted.internet import defer, task
helper_furl = None
helper_connected = False
- def upload(self, uploadable):
+ def upload(self, uploadable, **kw):
d = uploadable.get_size()
d.addCallback(lambda size: uploadable.read(size))
def _got_data(datav):
class MemoryConsumer:
implements(IConsumer)
- def __init__(self):
+
+ def __init__(self, progress=None):
self.chunks = []
self.done = False
+ self._progress = progress
+
def registerProducer(self, p, streaming):
self.producer = p
if streaming:
else:
while not self.done:
p.resumeProducing()
+
def write(self, data):
self.chunks.append(data)
+ if self._progress is not None:
+ self._progress.set_progress(sum([len(c) for c in self.chunks]))
+
def unregisterProducer(self):
self.done = True
-def download_to_data(n, offset=0, size=None):
- d = n.read(MemoryConsumer(), offset, size)
+def download_to_data(n, offset=0, size=None, progress=None):
+ """
+ :param on_progress: if set, a single-arg callable that receives total bytes downloaded
+ """
+ d = n.read(MemoryConsumer(progress=progress), offset, size)
d.addCallback(lambda mc: "".join(mc.chunks))
return d
"""
hook = self._hooks[name]
if hook is None:
- return None
+ return res # pass on error/result
(d, ignore_count) = hook
self._log("call_hook %r, ignore_count=%r" % (name, ignore_count))
--- /dev/null
+"""
+Utilities relating to computing progress information.
+
+Ties in with the "consumer" module also
+"""
+
+from allmydata.interfaces import IProgress
+from zope.interface import implementer
+
+
+@implementer(IProgress)
+class PercentProgress(object):
+ """
+ Represents progress as a percentage, from 0.0 to 100.0
+ """
+
+ def __init__(self, total_size=None):
+ self._value = 0.0
+ self.set_progress_total(total_size)
+
+ def set_progress(self, value):
+ "IProgress API"
+ self._value = value
+
+ def set_progress_total(self, size):
+ "IProgress API"
+ if size is not None:
+ size = float(size)
+ self._total_size = size
+
+ @property
+ def progress(self):
+ if self._total_size is None:
+ return 0 # or 1.0?
+ if self._total_size <= 0.0:
+ return 0
+ return (self._value / self._total_size) * 100.0
--- /dev/null
+import simplejson
+
+from nevow import rend, url, tags as T
+from nevow.inevow import IRequest
+
+from allmydata.web.common import getxmlfile, get_arg, WebError
+
+
+class MagicFolderWebApi(rend.Page):
+ """
+ I provide the web-based API for Magic Folder status etc.
+ """
+
+ def __init__(self, client):
+ ##rend.Page.__init__(self, storage)
+ super(MagicFolderWebApi, self).__init__(client)
+ self.client = client
+
+ def _render_json(self, req):
+ req.setHeader("content-type", "application/json")
+
+ data = []
+ for item in self.client._magic_folder.uploader.get_status():
+ d = dict(
+ path=item.relpath_u,
+ status=item.status_history()[-1][0],
+ kind='upload',
+ )
+ for (status, ts) in item.status_history():
+ d[status + '_at'] = ts
+ d['percent_done'] = item.progress.progress
+ data.append(d)
+
+ for item in self.client._magic_folder.downloader.get_status():
+ d = dict(
+ path=item.relpath_u,
+ status=item.status_history()[-1][0],
+ kind='download',
+ )
+ for (status, ts) in item.status_history():
+ d[status + '_at'] = ts
+ d['percent_done'] = item.progress.progress
+ data.append(d)
+
+ return simplejson.dumps(data)
+
+ def renderHTTP(self, ctx):
+ req = IRequest(ctx)
+ t = get_arg(req, "t", None)
+
+ if t is None:
+ return rend.Page.renderHTTP(self, ctx)
+
+ t = t.strip()
+ if t == 'json':
+ return self._render_json(req)
+
+ raise WebError("'%s' invalid type for 't' arg" % (t,), 400)
+
+
from allmydata.util import log
from allmydata.interfaces import IFileNode
from allmydata.web import filenode, directory, unlinked, status, operations
-from allmydata.web import storage
+from allmydata.web import storage, magic_folder
from allmydata.web.common import abbreviate_size, getxmlfile, WebError, \
get_arg, RenderMixin, get_format, get_mutable_type, render_time_delta, render_time, render_time_attr
self.child_uri = URIHandler(client)
self.child_cap = URIHandler(client)
+ # handler for "/magic_folder" URIs
+ self.child_magic_folder = magic_folder.MagicFolderWebApi(client)
+
self.child_file = FileHandler(client)
self.child_named = FileHandler(client)
self.child_status = status.Status(client.get_history())