-import os, random
+import os, random, weakref
from zope.interface import implements
from twisted.internet import defer
from twisted.internet.interfaces import IPushProducer, IConsumer
from allmydata.util import idlib, mathutil, hashutil, log
from allmydata.util.assertutil import _assert
from allmydata import codec, hashtree, storage, uri
-from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI
+from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, \
+ IDownloadStatus
from allmydata.encode import NotEnoughPeersError
from pycryptopp.cipher.aes import AES
self._opened = False
self._log_parent = log_parent
+ def get_progress(self):
+ return float(self.length) / self.total_length
+
def log(self, *args, **kwargs):
if "parent" not in kwargs:
kwargs["parent"] = self._log_parent
self.parent.bucket_failed(vbucket)
class FileDownloader:
- implements(IPushProducer)
+ implements(IPushProducer, IDownloadStatus)
check_crypttext_hash = True
check_plaintext_hash = True
self.init_logging()
+ self._status = "Starting"
+
if IConsumer.providedBy(downloadable):
downloadable.registerProducer(self, True)
self._downloadable = downloadable
# once we know that, we can download blocks from everybody
d.addCallback(self._download_all_segments)
def _finished(res):
+ self._status = "Finished"
if IConsumer.providedBy(self._downloadable):
self._downloadable.unregisterProducer()
return res
d.addBoth(_finished)
def _failed(why):
+ self._status = "Failed"
self._output.fail(why)
return why
d.addErrback(_failed)
d = ss.callRemote("get_buckets", self._storage_index)
d.addCallbacks(self._got_response, self._got_error)
dl.append(d)
+ self._responses_received = 0
+ self._queries_sent = len(dl)
+ self._status = "Locating Shares (%d/%d)" % (self._responses_received,
+ self._queries_sent)
return defer.DeferredList(dl)
def _got_response(self, buckets):
+ self._responses_received += 1
+ self._status = "Locating Shares (%d/%d)" % (self._responses_received,
+ self._queries_sent)
for sharenum, bucket in buckets.iteritems():
b = storage.ReadBucketProxy(bucket)
self.add_share_bucket(sharenum, b)
def _got_all_shareholders(self, res):
if len(self._share_buckets) < self._num_needed_shares:
raise NotEnoughPeersError
+
#for s in self._share_vbuckets.values():
# for vb in s:
# assert isinstance(vb, ValidatedBucket), \
# all are supposed to be identical. We compute the hash of the data
# that comes back, and compare it against the version in our URI. If
# they don't match, ignore their data and try someone else.
+ self._status = "Obtaining URI Extension"
+
def _validate(proposal, bucket):
h = hashutil.uri_extension_hash(proposal)
if h != self._uri_extension_hash:
self._share_hashtree.set_hashes({0: self._roothash})
def _get_hashtrees(self, res):
+ self._status = "Retrieving Hash Trees"
d = self._get_plaintext_hashtrees()
d.addCallback(self._get_crypttext_hashtrees)
d.addCallback(self._setup_hashtrees)
return res
def _download_segment(self, res, segnum):
+ self._status = "Downloading segment %d of %d" % (segnum,
+ self._total_segments)
self.log("downloading seg#%d of %d (%d%%)"
% (segnum, self._total_segments,
100.0 * segnum / self._total_segments))
got=self._output.length, expected=self._size)
return self._output.finish()
+ def get_storage_index(self):
+ return self._storage_index
+ def get_size(self):
+ return self._size
+ def using_helper(self):
+ return False
+ def get_status(self):
+ status = self._status
+ if self._paused:
+ status += " (output paused)"
+ if self._stopped:
+ status += " (output stopped)"
+ return status
+ def get_progress(self):
+ return self._output.get_progress()
+
class LiteralDownloader:
+ implements(IDownloadStatus)
+
def __init__(self, client, u, downloadable):
self._uri = IFileURI(u)
assert isinstance(self._uri, uri.LiteralFileURI)
self._downloadable.close()
return defer.maybeDeferred(self._downloadable.finish)
+ def get_storage_index(self):
+ return None
+ def get_size(self):
+ return len(self._uri.data)
+ def using_helper(self):
+ return False
+ def get_status(self):
+ return "Done"
+ def get_progress(self):
+ return 1.0
class FileName:
implements(IDownloadTarget)
implements(IDownloader)
name = "downloader"
+ def __init__(self):
+ service.MultiService.__init__(self)
+ self._all_downloads = weakref.WeakKeyDictionary()
+
def download(self, u, t):
assert self.parent
assert self.running
dl = FileDownloader(self.parent, u, t)
else:
raise RuntimeError("I don't know how to download a %s" % u)
+ self._all_downloads[dl] = None
d = dl.start()
return d