self._paused = False
self._stopped = False
+ self._results = None
self.active_buckets = {} # k: shnum, v: bucket
self._share_buckets = [] # list of (sharenum, bucket) tuples
self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
-import os, random, weakref, itertools
+import os, random, weakref, itertools, time
from zope.interface import implements
from twisted.internet import defer
from twisted.internet.interfaces import IPushProducer, IConsumer
from twisted.application import service
from foolscap.eventual import eventually
-from allmydata.util import base32, mathutil, hashutil, log, idlib
+from allmydata.util import base32, mathutil, hashutil, log
from allmydata.util.assertutil import _assert
from allmydata import codec, hashtree, storage, uri
from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, \
- IDownloadStatus
+ IDownloadStatus, IDownloadResults
from allmydata.encode import NotEnoughPeersError
from pycryptopp.cipher.aes import AES
class DownloadStopped(Exception):
pass
+class DownloadResults:
+ implements(IDownloadResults)
+
+ def __init__(self):
+ self.servers_used = set()
+ self.server_problems = {}
+ self.servermap = {}
+ self.timings = {}
+ self.file_size = None
+
class Output:
def __init__(self, downloadable, key, total_length, log_parent,
download_status):
self.paused = False
self.stopped = False
self.active = True
+ self.results = None
self.counter = self.statusid_counter.next()
def get_storage_index(self):
return self.progress
def get_active(self):
return self.active
+ def get_results(self):
+ return self.results
def get_counter(self):
return self.counter
self.progress = value
def set_active(self, value):
self.active = value
+ def set_results(self, value):
+ self.results = value
class FileDownloader:
implements(IPushProducer)
self._si_s = storage.si_b2a(self._storage_index)
self.init_logging()
+ self._started = time.time()
self._status = s = DownloadStatus()
s.set_status("Starting")
s.set_storage_index(self._storage_index)
s.set_helper(False)
s.set_active(True)
+ self._results = DownloadResults()
+ s.set_results(self._results)
+ self._results.file_size = self._size
+
if IConsumer.providedBy(downloadable):
downloadable.registerProducer(self, True)
self._downloadable = downloadable
def start(self):
self.log("starting download")
+ if self._results:
+ self._results.timings["servers_peer_selection"] = {}
# first step: who should we download from?
d = defer.maybeDeferred(self._get_all_shareholders)
d.addCallback(self._got_all_shareholders)
dl = []
for (peerid,ss) in self._client.get_permuted_peers("storage",
self._storage_index):
- peerid_s = idlib.shortnodeid_b2a(peerid)
d = ss.callRemote("get_buckets", self._storage_index)
d.addCallbacks(self._got_response, self._got_error,
- callbackArgs=(peerid_s,))
+ callbackArgs=(peerid,))
dl.append(d)
self._responses_received = 0
self._queries_sent = len(dl)
self._queries_sent))
return defer.DeferredList(dl)
- def _got_response(self, buckets, peerid_s):
+ def _got_response(self, buckets, peerid):
self._responses_received += 1
+ if self._results:
+ elapsed = time.time() - self._started
+ self._results.timings["servers_peer_selection"][peerid] = elapsed
if self._status:
self._status.set_status("Locating Shares (%d/%d)" %
(self._responses_received,
self._queries_sent))
for sharenum, bucket in buckets.iteritems():
- b = storage.ReadBucketProxy(bucket, peerid_s, self._si_s)
+ b = storage.ReadBucketProxy(bucket, peerid, self._si_s)
self.add_share_bucket(sharenum, b)
self._uri_extension_sources.append(b)
del self._share_vbuckets[shnum]
def _got_all_shareholders(self, res):
+ if self._results:
+ now = time.time()
+ self._results.timings["peer_selection"] = now - self._started
+
if len(self._share_buckets) < self._num_needed_shares:
raise NotEnoughPeersError
if self._status:
self._status.set_status("Obtaining URI Extension")
+ self._uri_extension_fetch_started = time.time()
def _validate(proposal, bucket):
h = hashutil.uri_extension_hash(proposal)
if h != self._uri_extension_hash:
return d
def _got_uri_extension(self, uri_extension_data):
+ if self._results:
+ elapsed = time.time() - self._uri_extension_fetch_started
+ self._results.timings["uri_extension"] = elapsed
+
d = self._uri_extension_data = uri_extension_data
self._codec = codec.get_decoder_by_name(d['codec_name'])
self._share_hashtree.set_hashes({0: self._roothash})
def _get_hashtrees(self, res):
+ self._get_hashtrees_started = time.time()
if self._status:
self._status.set_status("Retrieving Hash Trees")
d = self._get_plaintext_hashtrees()
def _setup_hashtrees(self, res):
self._output.setup_hashtrees(self._plaintext_hashtree,
self._crypttext_hashtree)
-
+ if self._results:
+ elapsed = time.time() - self._get_hashtrees_started
+ self._results.timings["hashtrees"] = elapsed
def _create_validated_buckets(self, ignored=None):
self._share_vbuckets = {}
# RIBucketReader references.
self.active_buckets = {} # k: shnum, v: ValidatedBucket instance
+ self._started_fetching = time.time()
+
d = defer.succeed(None)
for segnum in range(self._total_segments-1):
d.addCallback(self._download_segment, segnum)
def _done(self, res):
self.log("download done")
+ if self._results:
+ now = time.time()
+ self._results.timings["total"] = now - self._started
+ self._results.timings["fetching"] = now - self._started_fetching
self._output.close()
if self.check_crypttext_hash:
_assert(self._crypttext_hash == self._output.crypttext_hash,
class IUploadResults(Interface):
"""I am returned by upload() methods. I contain a number of public
- attributes which can be read to determine the results of the upload::
+ attributes which can be read to determine the results of the upload. Some
+ of these are functional, some are timing information. All of these may be
+ None.::
+ .file_size : the size of the file, in bytes
.uri : the CHK read-cap for the file
+ .ciphertext_fetched : how many bytes were fetched by the helper
+ .sharemap : dict mapping share number to placement string
+ .servermap : dict mapping server peerid to a set of share numbers
+ .timings : dict of timing information, mapping name to seconds (float)
+ total : total upload time, start to finish
+ storage_index : time to compute the storage index
+ peer_selection : time to decide which peers will be used
+ contacting_helper : initial helper query to upload/no-upload decision
+ existence_check : helper pre-upload existence check
+ helper_total : initial helper query to helper finished pushing
+ cumulative_fetch : helper waiting for ciphertext requests
+ total_fetch : helper start to last ciphertext response
+ cumulative_encoding : just time spent in zfec
+ cumulative_sending : just time spent waiting for storage servers
+ hashes_and_close : last segment push to shareholder close
+ total_encode_and_push : first encode to shareholder close
+
+ """
+
+class IDownloadResults(Interface):
+ """I am created internally by download() methods. I contain a number of
+ public attributes which contain details about the download process.::
+
+ .file_size : the size of the file, in bytes
+ .servers_used : set of server peerids that were used during download
+ .server_problems : dict mapping server peerid to a problem string. Only
+ servers that had problems (bad hashes, disconnects) are
+ listed here.
+ .servermap : dict mapping server peerid to a set of share numbers. Only
+ servers that had any shares are listed here.
+ .timings : dict of timing information, mapping name to seconds (float)
+ peer_selection : time to ask servers about shares
+ servers_peer_selection : dict of peerid to DYHB-query time
+ uri_extension : time to fetch a copy of the URI extension block
+ hashtrees : time to fetch the hash trees
+ fetching : time to fetch, decode, and deliver segments
+ cumulative_fetching : time spent waiting for storage servers
+ cumulative_decoding : just time spent in zfec
+ total : total download time, start to finish
+ servers_fetching : dict of peerid to list of per-segment fetch times
"""
self.log("no readers, so no UEB", level=log.NOISY)
return
b,peerid = self._readers.pop()
- rbp = storage.ReadBucketProxy(b, idlib.shortnodeid_b2a(peerid),
+ rbp = storage.ReadBucketProxy(b, peerid,
storage.si_b2a(self._storage_index))
d = rbp.startIfNecessary()
d.addCallback(lambda res: rbp.get_uri_extension())
class ReadBucketProxy:
implements(IStorageBucketReader)
- def __init__(self, rref, peerid_s=None, storage_index_s=None):
+ def __init__(self, rref, peerid=None, storage_index_s=None):
self._rref = rref
- self._peerid_s = peerid_s
+ self._peerid = peerid
self._si_s = storage_index_s
self._started = False
def __repr__(self):
- return "<ReadBucketProxy to peer [%s] SI %s>" % (self._peerid_s,
+ peerid_s = idlib.shortnodeid_b2a(self._peerid)
+ return "<ReadBucketProxy to peer [%s] SI %s>" % (peerid_s,
self._si_s)
def startIfNecessary(self):
<li>Status: <span n:render="status"/></li>
</ul>
+
+<div n:render="results">
+ <h2>Download Results</h2>
+ <ul>
+ <li>Servers Used: <span n:render="servers_used" /></li>
+ <li>Servermap: <span n:render="servermap" /></li>
+ <li>Timings:</li>
+ <ul>
+ <li>File Size: <span n:render="string" n:data="file_size" /> bytes</li>
+ <li>Total: <span n:render="time" n:data="time_total" />
+ (<span n:render="rate" n:data="rate_total" />)</li>
+ <ul>
+ <li>Peer Selection: <span n:render="time" n:data="time_peer_selection" /></li>
+ <li>UEB Fetch: <span n:render="time" n:data="time_uri_extension" /></li>
+ <li>Hashtree Fetch: <span n:render="time" n:data="time_hashtrees" /></li>
+ <li>Segment Fetch: <span n:render="time" n:data="time_fetching" />
+ (<span n:render="rate" n:data="rate_fetching" />)</li>
+ <ul>
+ <li>Cumulative Fetching: <span n:render="time" n:data="time_cumulative_fetch" />
+ (<span n:render="rate" n:data="rate_fetch" />)</li>
+ <li>Cumulative Decoding: <span n:render="time" n:data="time_cumulative_decoding" />
+ (<span n:render="rate" n:data="rate_decode" />)</li>
+ </ul>
+ </ul>
+ </ul>
+ </ul>
+</div>
+
<div>Return to the <a href="/">Welcome Page</a></div>
</body>
def render_status(self, ctx, data):
return data.get_status()
-class DownloadStatusPage(rend.Page):
+class DownloadResultsRendererMixin:
+ # this requires a method named 'download_results'
+
+ def render_servers_used(self, ctx, data):
+ return "nope"
+
+ def render_servermap(self, ctx, data):
+ d = self.download_results()
+ d.addCallback(lambda res: res.servermap)
+ def _render(servermap):
+ if servermap is None:
+ return "None"
+ l = T.ul()
+ for peerid in sorted(servermap.keys()):
+ peerid_s = idlib.shortnodeid_b2a(peerid)
+ shares_s = ",".join([str(shnum) for shnum in servermap[peerid]])
+ l[T.li["[%s] got shares: %s" % (peerid_s, shares_s)]]
+ return l
+ d.addCallback(_render)
+ return d
+
+ def data_file_size(self, ctx, data):
+ d = self.download_results()
+ d.addCallback(lambda res: res.file_size)
+ return d
+
+ def render_time(self, ctx, data):
+ # 1.23s, 790ms, 132us
+ if data is None:
+ return ""
+ s = float(data)
+ if s >= 1.0:
+ return "%.2fs" % s
+ if s >= 0.01:
+ return "%dms" % (1000*s)
+ if s >= 0.001:
+ return "%.1fms" % (1000*s)
+ return "%dus" % (1000000*s)
+
+ def render_rate(self, ctx, data):
+ # 21.8kBps, 554.4kBps 4.37MBps
+ if data is None:
+ return ""
+ r = float(data)
+ if r > 1000000:
+ return "%1.2fMBps" % (r/1000000)
+ if r > 1000:
+ return "%.1fkBps" % (r/1000)
+ return "%dBps" % r
+
+ def _get_time(self, name):
+ d = self.download_results()
+ d.addCallback(lambda res: res.timings.get(name))
+ return d
+
+ def data_time_total(self, ctx, data):
+ return self._get_time("total")
+
+ def data_time_peer_selection(self, ctx, data):
+ return self._get_time("peer_selection")
+
+ def data_time_uri_extension(self, ctx, data):
+ return self._get_time("uri_extension")
+
+ def data_time_hashtrees(self, ctx, data):
+ return self._get_time("hashtrees")
+
+ def data_time_fetching(self, ctx, data):
+ return self._get_time("fetching")
+
+ def data_time_cumulative_fetch(self, ctx, data):
+ return self._get_time("cumulative_fetch")
+
+ def data_time_cumulative_decoding(self, ctx, data):
+ return self._get_time("cumulative_decoding")
+
+ def _get_rate(self, name):
+ d = self.download_results()
+ def _convert(r):
+ file_size = r.file_size
+ time = r.timings.get(name)
+ if time is None:
+ return None
+ try:
+ return 1.0 * file_size / time
+ except ZeroDivisionError:
+ return None
+ d.addCallback(_convert)
+ return d
+
+ def data_rate_total(self, ctx, data):
+ return self._get_rate("total")
+
+ def data_rate_fetching(self, ctx, data):
+ return self._get_rate("fetching")
+
+ def data_rate_decode(self, ctx, data):
+ return self._get_rate("cumulative_decoding")
+
+ def data_rate_fetch(self, ctx, data):
+ return self._get_rate("cumulative_fetching")
+
+class DownloadStatusPage(DownloadResultsRendererMixin, rend.Page):
docFactory = getxmlfile("download-status.xhtml")
+ def __init__(self, data):
+ rend.Page.__init__(self, data)
+ self.download_status = data
+
+ def download_results(self):
+ return defer.maybeDeferred(self.download_status.get_results)
+
+ def render_results(self, ctx, data):
+ d = self.download_results()
+ def _got_results(results):
+ if results:
+ return ctx.tag
+ return ""
+ d.addCallback(_got_results)
+ return d
+
def render_si(self, ctx, data):
si_s = base32.b2a_or_none(data.get_storage_index())
if si_s is None: