From: Brian Warner Date: Tue, 12 Feb 2008 22:38:39 +0000 (-0700) Subject: add download-status objects, to track download progress X-Git-Tag: allmydata-tahoe-0.8.0~90 X-Git-Url: https://git.rkrishnan.org/simplejson/%22news.html/configuration.txt?a=commitdiff_plain;h=94097affc30059b84adfee11ad614ee96b13d253;p=tahoe-lafs%2Ftahoe-lafs.git add download-status objects, to track download progress --- diff --git a/src/allmydata/download.py b/src/allmydata/download.py index a8c6a3e9..eb5cd539 100644 --- a/src/allmydata/download.py +++ b/src/allmydata/download.py @@ -1,5 +1,5 @@ -import os, random +import os, random, weakref from zope.interface import implements from twisted.internet import defer from twisted.internet.interfaces import IPushProducer, IConsumer @@ -9,7 +9,8 @@ from foolscap.eventual import eventually from allmydata.util import idlib, mathutil, hashutil, log from allmydata.util.assertutil import _assert from allmydata import codec, hashtree, storage, uri -from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI +from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, \ + IDownloadStatus from allmydata.encode import NotEnoughPeersError from pycryptopp.cipher.aes import AES @@ -41,6 +42,9 @@ class Output: self._opened = False self._log_parent = log_parent + def get_progress(self): + return float(self.length) / self.total_length + def log(self, *args, **kwargs): if "parent" not in kwargs: kwargs["parent"] = self._log_parent @@ -321,7 +325,7 @@ class SegmentDownloader: self.parent.bucket_failed(vbucket) class FileDownloader: - implements(IPushProducer) + implements(IPushProducer, IDownloadStatus) check_crypttext_hash = True check_plaintext_hash = True @@ -337,6 +341,8 @@ class FileDownloader: self.init_logging() + self._status = "Starting" + if IConsumer.providedBy(downloadable): downloadable.registerProducer(self, True) self._downloadable = downloadable @@ -400,11 +406,13 @@ class FileDownloader: # once we know that, we can download blocks from everybody d.addCallback(self._download_all_segments) def _finished(res): + self._status = "Finished" if IConsumer.providedBy(self._downloadable): self._downloadable.unregisterProducer() return res d.addBoth(_finished) def _failed(why): + self._status = "Failed" self._output.fail(why) return why d.addErrback(_failed) @@ -418,9 +426,16 @@ class FileDownloader: d = ss.callRemote("get_buckets", self._storage_index) d.addCallbacks(self._got_response, self._got_error) dl.append(d) + self._responses_received = 0 + self._queries_sent = len(dl) + self._status = "Locating Shares (%d/%d)" % (self._responses_received, + self._queries_sent) return defer.DeferredList(dl) def _got_response(self, buckets): + self._responses_received += 1 + self._status = "Locating Shares (%d/%d)" % (self._responses_received, + self._queries_sent) for sharenum, bucket in buckets.iteritems(): b = storage.ReadBucketProxy(bucket) self.add_share_bucket(sharenum, b) @@ -448,6 +463,7 @@ class FileDownloader: def _got_all_shareholders(self, res): if len(self._share_buckets) < self._num_needed_shares: raise NotEnoughPeersError + #for s in self._share_vbuckets.values(): # for vb in s: # assert isinstance(vb, ValidatedBucket), \ @@ -461,6 +477,8 @@ class FileDownloader: # all are supposed to be identical. We compute the hash of the data # that comes back, and compare it against the version in our URI. If # they don't match, ignore their data and try someone else. + self._status = "Obtaining URI Extension" + def _validate(proposal, bucket): h = hashutil.uri_extension_hash(proposal) if h != self._uri_extension_hash: @@ -519,6 +537,7 @@ class FileDownloader: self._share_hashtree.set_hashes({0: self._roothash}) def _get_hashtrees(self, res): + self._status = "Retrieving Hash Trees" d = self._get_plaintext_hashtrees() d.addCallback(self._get_crypttext_hashtrees) d.addCallback(self._setup_hashtrees) @@ -634,6 +653,8 @@ class FileDownloader: return res def _download_segment(self, res, segnum): + self._status = "Downloading segment %d of %d" % (segnum, + self._total_segments) self.log("downloading seg#%d of %d (%d%%)" % (segnum, self._total_segments, 100.0 * segnum / self._total_segments)) @@ -709,7 +730,25 @@ class FileDownloader: got=self._output.length, expected=self._size) return self._output.finish() + def get_storage_index(self): + return self._storage_index + def get_size(self): + return self._size + def using_helper(self): + return False + def get_status(self): + status = self._status + if self._paused: + status += " (output paused)" + if self._stopped: + status += " (output stopped)" + return status + def get_progress(self): + return self._output.get_progress() + class LiteralDownloader: + implements(IDownloadStatus) + def __init__(self, client, u, downloadable): self._uri = IFileURI(u) assert isinstance(self._uri, uri.LiteralFileURI) @@ -722,6 +761,16 @@ class LiteralDownloader: self._downloadable.close() return defer.maybeDeferred(self._downloadable.finish) + def get_storage_index(self): + return None + def get_size(self): + return len(self._uri.data) + def using_helper(self): + return False + def get_status(self): + return "Done" + def get_progress(self): + return 1.0 class FileName: implements(IDownloadTarget) @@ -792,6 +841,10 @@ class Downloader(service.MultiService): implements(IDownloader) name = "downloader" + def __init__(self): + service.MultiService.__init__(self) + self._all_downloads = weakref.WeakKeyDictionary() + def download(self, u, t): assert self.parent assert self.running @@ -805,6 +858,7 @@ class Downloader(service.MultiService): dl = FileDownloader(self.parent, u, t) else: raise RuntimeError("I don't know how to download a %s" % u) + self._all_downloads[dl] = None d = dl.start() return d diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 64ae6469..f76c674d 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -1400,6 +1400,25 @@ class IUploadStatus(Interface): helper providing progress reports. It might be reasonable to add all three numbers and report the sum to the user.""" +class IDownloadStatus(Interface): + def get_storage_index(): + """Return a string with the (binary) storage index in use on this + download. This may be None if there is no storage index (i.e. LIT + files).""" + def get_size(): + """Return an integer with the number of bytes that will eventually be + retrieved for this file. Returns None if the size is not yet known. + """ + def using_helper(): + """Return True if this download is using a Helper, False if not.""" + def get_status(): + """Return a string describing the current state of the download + process.""" + def get_progress(): + """Returns a float (from 0.0 to 1.0) describing the amount of the + download that has completed. This value will remain at 0.0 until the + first byte of plaintext is pushed to the download target.""" + class NotCapableError(Exception): """You have tried to write to a read-only node."""