add download-status objects, to track download progress
authorBrian Warner <warner@allmydata.com>
Tue, 12 Feb 2008 22:38:39 +0000 (15:38 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 12 Feb 2008 22:38:39 +0000 (15:38 -0700)
src/allmydata/download.py
src/allmydata/interfaces.py

index a8c6a3e91b84a1e2cc72354b089a2d6f537e6682..eb5cd539b2acba6fb3fcd1e6f9ae71cd14d209b3 100644 (file)
@@ -1,5 +1,5 @@
 
-import os, random
+import os, random, weakref
 from zope.interface import implements
 from twisted.internet import defer
 from twisted.internet.interfaces import IPushProducer, IConsumer
@@ -9,7 +9,8 @@ from foolscap.eventual import eventually
 from allmydata.util import idlib, mathutil, hashutil, log
 from allmydata.util.assertutil import _assert
 from allmydata import codec, hashtree, storage, uri
-from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI
+from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, \
+     IDownloadStatus
 from allmydata.encode import NotEnoughPeersError
 from pycryptopp.cipher.aes import AES
 
@@ -41,6 +42,9 @@ class Output:
         self._opened = False
         self._log_parent = log_parent
 
+    def get_progress(self):
+        return float(self.length) / self.total_length
+
     def log(self, *args, **kwargs):
         if "parent" not in kwargs:
             kwargs["parent"] = self._log_parent
@@ -321,7 +325,7 @@ class SegmentDownloader:
         self.parent.bucket_failed(vbucket)
 
 class FileDownloader:
-    implements(IPushProducer)
+    implements(IPushProducer, IDownloadStatus)
     check_crypttext_hash = True
     check_plaintext_hash = True
 
@@ -337,6 +341,8 @@ class FileDownloader:
 
         self.init_logging()
 
+        self._status = "Starting"
+
         if IConsumer.providedBy(downloadable):
             downloadable.registerProducer(self, True)
         self._downloadable = downloadable
@@ -400,11 +406,13 @@ class FileDownloader:
         # once we know that, we can download blocks from everybody
         d.addCallback(self._download_all_segments)
         def _finished(res):
+            self._status = "Finished"
             if IConsumer.providedBy(self._downloadable):
                 self._downloadable.unregisterProducer()
             return res
         d.addBoth(_finished)
         def _failed(why):
+            self._status = "Failed"
             self._output.fail(why)
             return why
         d.addErrback(_failed)
@@ -418,9 +426,16 @@ class FileDownloader:
             d = ss.callRemote("get_buckets", self._storage_index)
             d.addCallbacks(self._got_response, self._got_error)
             dl.append(d)
+        self._responses_received = 0
+        self._queries_sent = len(dl)
+        self._status = "Locating Shares (%d/%d)" % (self._responses_received,
+                                                    self._queries_sent)
         return defer.DeferredList(dl)
 
     def _got_response(self, buckets):
+        self._responses_received += 1
+        self._status = "Locating Shares (%d/%d)" % (self._responses_received,
+                                                    self._queries_sent)
         for sharenum, bucket in buckets.iteritems():
             b = storage.ReadBucketProxy(bucket)
             self.add_share_bucket(sharenum, b)
@@ -448,6 +463,7 @@ class FileDownloader:
     def _got_all_shareholders(self, res):
         if len(self._share_buckets) < self._num_needed_shares:
             raise NotEnoughPeersError
+
         #for s in self._share_vbuckets.values():
         #    for vb in s:
         #        assert isinstance(vb, ValidatedBucket), \
@@ -461,6 +477,8 @@ class FileDownloader:
         # all are supposed to be identical. We compute the hash of the data
         # that comes back, and compare it against the version in our URI. If
         # they don't match, ignore their data and try someone else.
+        self._status = "Obtaining URI Extension"
+
         def _validate(proposal, bucket):
             h = hashutil.uri_extension_hash(proposal)
             if h != self._uri_extension_hash:
@@ -519,6 +537,7 @@ class FileDownloader:
         self._share_hashtree.set_hashes({0: self._roothash})
 
     def _get_hashtrees(self, res):
+        self._status = "Retrieving Hash Trees"
         d = self._get_plaintext_hashtrees()
         d.addCallback(self._get_crypttext_hashtrees)
         d.addCallback(self._setup_hashtrees)
@@ -634,6 +653,8 @@ class FileDownloader:
         return res
 
     def _download_segment(self, res, segnum):
+        self._status = "Downloading segment %d of %d" % (segnum,
+                                                         self._total_segments)
         self.log("downloading seg#%d of %d (%d%%)"
                  % (segnum, self._total_segments,
                     100.0 * segnum / self._total_segments))
@@ -709,7 +730,25 @@ class FileDownloader:
                 got=self._output.length, expected=self._size)
         return self._output.finish()
 
+    def get_storage_index(self):
+        return self._storage_index
+    def get_size(self):
+        return self._size
+    def using_helper(self):
+        return False
+    def get_status(self):
+        status = self._status
+        if self._paused:
+            status += " (output paused)"
+        if self._stopped:
+            status += " (output stopped)"
+        return status
+    def get_progress(self):
+        return self._output.get_progress()
+
 class LiteralDownloader:
+    implements(IDownloadStatus)
+
     def __init__(self, client, u, downloadable):
         self._uri = IFileURI(u)
         assert isinstance(self._uri, uri.LiteralFileURI)
@@ -722,6 +761,16 @@ class LiteralDownloader:
         self._downloadable.close()
         return defer.maybeDeferred(self._downloadable.finish)
 
+    def get_storage_index(self):
+        return None
+    def get_size(self):
+        return len(self._uri.data)
+    def using_helper(self):
+        return False
+    def get_status(self):
+        return "Done"
+    def get_progress(self):
+        return 1.0
 
 class FileName:
     implements(IDownloadTarget)
@@ -792,6 +841,10 @@ class Downloader(service.MultiService):
     implements(IDownloader)
     name = "downloader"
 
+    def __init__(self):
+        service.MultiService.__init__(self)
+        self._all_downloads = weakref.WeakKeyDictionary()
+
     def download(self, u, t):
         assert self.parent
         assert self.running
@@ -805,6 +858,7 @@ class Downloader(service.MultiService):
             dl = FileDownloader(self.parent, u, t)
         else:
             raise RuntimeError("I don't know how to download a %s" % u)
+        self._all_downloads[dl] = None
         d = dl.start()
         return d
 
index 64ae64699fa1aedb19d90d4dd006e1062b138040..f76c674dd434df75636efdf43c601bec51685dc6 100644 (file)
@@ -1400,6 +1400,25 @@ class IUploadStatus(Interface):
         helper providing progress reports. It might be reasonable to add all
         three numbers and report the sum to the user."""
 
+class IDownloadStatus(Interface):
+    def get_storage_index():
+        """Return a string with the (binary) storage index in use on this
+        download. This may be None if there is no storage index (i.e. LIT
+        files)."""
+    def get_size():
+        """Return an integer with the number of bytes that will eventually be
+        retrieved for this file. Returns None if the size is not yet known.
+        """
+    def using_helper():
+        """Return True if this download is using a Helper, False if not."""
+    def get_status():
+        """Return a string describing the current state of the download
+        process."""
+    def get_progress():
+        """Returns a float (from 0.0 to 1.0) describing the amount of the
+        download that has completed. This value will remain at 0.0 until the
+        first byte of plaintext is pushed to the download target."""
+
 
 class NotCapableError(Exception):
     """You have tried to write to a read-only node."""