pass
class Output:
- def __init__(self, downloadable, key, total_length, log_parent):
+ def __init__(self, downloadable, key, total_length, log_parent,
+ download_status):
self.downloadable = downloadable
self._decryptor = AES(key)
self._crypttext_hasher = hashutil.crypttext_hasher()
self._crypttext_hash_tree = None
self._opened = False
self._log_parent = log_parent
-
- def get_progress(self):
- return float(self.length) / self.total_length
+ self._status = download_status
+ self._status.set_progress(0.0)
def log(self, *args, **kwargs):
if "parent" not in kwargs:
def write_segment(self, crypttext):
self.length += len(crypttext)
+ self._status.set_progress( float(self.length) / self.total_length )
# memory footprint: 'crypttext' is the only segment_size usage
# outstanding. While we decrypt it into 'plaintext', we hit
def bucket_failed(self, vbucket):
self.parent.bucket_failed(vbucket)
+class DownloadStatus:
+ implements(IDownloadStatus)
+
+ def __init__(self):
+ self.storage_index = None
+ self.size = None
+ self.helper = False
+ self.status = "Not started"
+ self.progress = 0.0
+ self.paused = False
+ self.stopped = False
+
+ def get_storage_index(self):
+ return self.storage_index
+ def get_size(self):
+ return self.size
+ def using_helper(self):
+ return self.helper
+ def get_status(self):
+ status = self.status
+ if self.paused:
+ status += " (output paused)"
+ if self.stopped:
+ status += " (output stopped)"
+ return status
+ def get_progress(self):
+ return self.progress
+
+ def set_storage_index(self, si):
+ self.storage_index = si
+ def set_size(self, size):
+ self.size = size
+ def set_helper(self, helper):
+ self.helper = helper
+ def set_status(self, status):
+ self.status = status
+ def set_paused(self, paused):
+ self.paused = paused
+ def set_stopped(self, stopped):
+ self.stopped = stopped
+ def set_progress(self, value):
+ self.progress = value
+
+
class FileDownloader:
- implements(IPushProducer, IDownloadStatus)
+ implements(IPushProducer)
check_crypttext_hash = True
check_plaintext_hash = True
+ _status = None
def __init__(self, client, u, downloadable):
self._client = client
self.init_logging()
- self._status = "Starting"
+ self._status = s = DownloadStatus()
+ s.set_status("Starting")
if IConsumer.providedBy(downloadable):
downloadable.registerProducer(self, True)
self._downloadable = downloadable
- self._output = Output(downloadable, u.key, self._size, self._log_number)
+ self._output = Output(downloadable, u.key, self._size, self._log_number,
+ self._status)
self._paused = False
self._stopped = False
if self._paused:
return
self._paused = defer.Deferred()
+ if self._status:
+ self._status.set_paused(True)
def resumeProducing(self):
if self._paused:
p = self._paused
self._paused = None
eventually(p.callback, None)
+ if self._status:
+ self._status.set_paused(False)
def stopProducing(self):
self.log("Download.stopProducing")
self._stopped = True
+ if self._status:
+ self._status.set_stopped(True)
def start(self):
self.log("starting download")
# once we know that, we can download blocks from everybody
d.addCallback(self._download_all_segments)
def _finished(res):
- self._status = "Finished"
+ if self._status:
+ self._status.set_status("Finished")
if IConsumer.providedBy(self._downloadable):
self._downloadable.unregisterProducer()
return res
d.addBoth(_finished)
def _failed(why):
- self._status = "Failed"
+ if self._status:
+ self._status.set_status("Failed")
self._output.fail(why)
return why
d.addErrback(_failed)
dl.append(d)
self._responses_received = 0
self._queries_sent = len(dl)
- self._status = "Locating Shares (%d/%d)" % (self._responses_received,
- self._queries_sent)
+ if self._status:
+ self._status.set_status("Locating Shares (%d/%d)" %
+ (self._responses_received,
+ self._queries_sent))
return defer.DeferredList(dl)
def _got_response(self, buckets):
self._responses_received += 1
- self._status = "Locating Shares (%d/%d)" % (self._responses_received,
- self._queries_sent)
+ if self._status:
+ self._status.set_status("Locating Shares (%d/%d)" %
+ (self._responses_received,
+ self._queries_sent))
for sharenum, bucket in buckets.iteritems():
b = storage.ReadBucketProxy(bucket)
self.add_share_bucket(sharenum, b)
# all are supposed to be identical. We compute the hash of the data
# that comes back, and compare it against the version in our URI. If
# they don't match, ignore their data and try someone else.
- self._status = "Obtaining URI Extension"
+ if self._status:
+ self._status.set_status("Obtaining URI Extension")
def _validate(proposal, bucket):
h = hashutil.uri_extension_hash(proposal)
self._share_hashtree.set_hashes({0: self._roothash})
def _get_hashtrees(self, res):
- self._status = "Retrieving Hash Trees"
+ if self._status:
+ self._status.set_status("Retrieving Hash Trees")
d = self._get_plaintext_hashtrees()
d.addCallback(self._get_crypttext_hashtrees)
d.addCallback(self._setup_hashtrees)
return res
def _download_segment(self, res, segnum):
- self._status = "Downloading segment %d of %d" % (segnum,
- self._total_segments)
+ if self._status:
+ self._status.set_status("Downloading segment %d of %d" %
+ (segnum, self._total_segments))
self.log("downloading seg#%d of %d (%d%%)"
% (segnum, self._total_segments,
100.0 * segnum / self._total_segments))
got=self._output.length, expected=self._size)
return self._output.finish()
- def get_storage_index(self):
- return self._storage_index
- def get_size(self):
- return self._size
- def using_helper(self):
- return False
- def get_status(self):
- status = self._status
- if self._paused:
- status += " (output paused)"
- if self._stopped:
- status += " (output stopped)"
- return status
- def get_progress(self):
- return self._output.get_progress()
+ def get_download_status(self):
+ return self._status
+
class LiteralDownloader:
implements(IDownloadStatus)
self._uri = IFileURI(u)
assert isinstance(self._uri, uri.LiteralFileURI)
self._downloadable = downloadable
+ self._status = s = DownloadStatus()
+ s.set_storage_index(None)
+ s.set_helper(False)
+ s.set_status("Done")
+ s.set_progress(1.0)
def start(self):
data = self._uri.data
+ self._status.set_size(len(data))
self._downloadable.open(len(data))
self._downloadable.write(data)
self._downloadable.close()
return defer.maybeDeferred(self._downloadable.finish)
- def get_storage_index(self):
- return None
- def get_size(self):
- return len(self._uri.data)
- def using_helper(self):
- return False
- def get_status(self):
- return "Done"
- def get_progress(self):
- return 1.0
+ def get_download_status(self):
+ return self._status
class FileName:
implements(IDownloadTarget)
dl = FileDownloader(self.parent, u, t)
else:
raise RuntimeError("I don't know how to download a %s" % u)
- self._all_downloads[dl] = None
+ self._all_downloads[dl.get_download_status()] = None
d = dl.start()
return d