self._results = DownloadResults()
s.set_results(self._results)
self._results.file_size = self._size
+ self._results.timings["servers_peer_selection"] = {}
+ self._results.timings["cumulative_fetch"] = 0.0
+ self._results.timings["cumulative_decode"] = 0.0
+ self._results.timings["cumulative_decrypt"] = 0.0
if IConsumer.providedBy(downloadable):
downloadable.registerProducer(self, True)
def start(self):
self.log("starting download")
- if self._results:
- self._results.timings["servers_peer_selection"] = {}
# first step: who should we download from?
d = defer.maybeDeferred(self._get_all_shareholders)
d.addCallback(self._got_all_shareholders)
if self._status:
self._status.set_status("Finished")
self._status.set_active(False)
+ self._status.set_paused(False)
if IConsumer.providedBy(self._downloadable):
self._downloadable.unregisterProducer()
return res
b = storage.ReadBucketProxy(bucket, peerid, self._si_s)
self.add_share_bucket(sharenum, b)
self._uri_extension_sources.append(b)
+ if self._results:
+ if peerid not in self._results.servermap:
+ self._results.servermap[peerid] = set()
+ self._results.servermap[peerid].add(sharenum)
def add_share_bucket(self, sharenum, bucket):
# this is split out for the benefit of test_encode.py
# memory footprint: when the SegmentDownloader finishes pulling down
# all shares, we have 1*segment_size of usage.
segmentdler = SegmentDownloader(self, segnum, self._num_needed_shares)
+ started = time.time()
d = segmentdler.start()
+ def _finished_fetching(res):
+ elapsed = time.time() - started
+ self._results.timings["cumulative_fetch"] += elapsed
+ return res
+ if self._results:
+ d.addCallback(_finished_fetching)
# pause before using more memory
d.addCallback(self._check_for_pause)
# while the codec does its job, we hit 2*segment_size
+ def _started_decode(res):
+ self._started_decode = time.time()
+ return res
+ if self._results:
+ d.addCallback(_started_decode)
d.addCallback(lambda (shares, shareids):
self._codec.decode(shares, shareids))
# once the codec is done, we drop back to 1*segment_size, because
# 'shares' goes out of scope. The memory usage is all in the
# plaintext now, spread out into a bunch of tiny buffers.
+ def _finished_decode(res):
+ elapsed = time.time() - self._started_decode
+ self._results.timings["cumulative_decode"] += elapsed
+ return res
+ if self._results:
+ d.addCallback(_finished_decode)
# pause/check-for-stop just before writing, to honor stopProducing
d.addCallback(self._check_for_pause)
# we're down to 1*segment_size right now, but write_segment()
# will decrypt a copy of the segment internally, which will push
# us up to 2*segment_size while it runs.
+ started_decrypt = time.time()
self._output.write_segment(segment)
+ if self._results:
+ elapsed = time.time() - started_decrypt
+ self._results.timings["cumulative_decrypt"] += elapsed
d.addCallback(_done)
return d
% (segnum, self._total_segments,
100.0 * segnum / self._total_segments))
segmentdler = SegmentDownloader(self, segnum, self._num_needed_shares)
+ started = time.time()
d = segmentdler.start()
+ def _finished_fetching(res):
+ elapsed = time.time() - started
+ self._results.timings["cumulative_fetch"] += elapsed
+ return res
+ if self._results:
+ d.addCallback(_finished_fetching)
# pause before using more memory
d.addCallback(self._check_for_pause)
+ def _started_decode(res):
+ self._started_decode = time.time()
+ return res
+ if self._results:
+ d.addCallback(_started_decode)
d.addCallback(lambda (shares, shareids):
self._tail_codec.decode(shares, shareids))
+ def _finished_decode(res):
+ elapsed = time.time() - self._started_decode
+ self._results.timings["cumulative_decode"] += elapsed
+ return res
+ if self._results:
+ d.addCallback(_finished_decode)
# pause/check-for-stop just before writing, to honor stopProducing
d.addCallback(self._check_for_pause)
def _done(buffers):
pad_size = mathutil.pad_size(self._size, self._segment_size)
tail_size = self._segment_size - pad_size
segment = segment[:tail_size]
+ started_decrypt = time.time()
self._output.write_segment(segment)
+ if self._results:
+ elapsed = time.time() - started_decrypt
+ self._results.timings["cumulative_decrypt"] += elapsed
d.addCallback(_done)
return d
if self._results:
now = time.time()
self._results.timings["total"] = now - self._started
- self._results.timings["fetching"] = now - self._started_fetching
+ self._results.timings["segments"] = now - self._started_fetching
self._output.close()
if self.check_crypttext_hash:
_assert(self._crypttext_hash == self._output.crypttext_hash,
"""I am created internally by download() methods. I contain a number of
public attributes which contain details about the download process.::
- .file_size : the size of the file, in bytes
+ .file_size : the size of the file, in bytes
.servers_used : set of server peerids that were used during download
.server_problems : dict mapping server peerid to a problem string. Only
servers that had problems (bad hashes, disconnects) are
listed here.
- .servermap : dict mapping server peerid to a set of share numbers. Only
- servers that had any shares are listed here.
+ .servermap : dict mapping server peerid to a set of share numbers. Only
+ servers that had any shares are listed here.
.timings : dict of timing information, mapping name to seconds (float)
peer_selection : time to ask servers about shares
servers_peer_selection : dict of peerid to DYHB-query time
uri_extension : time to fetch a copy of the URI extension block
hashtrees : time to fetch the hash trees
- fetching : time to fetch, decode, and deliver segments
- cumulative_fetching : time spent waiting for storage servers
- cumulative_decoding : just time spent in zfec
+ segments : time to fetch, decode, and deliver segments
+ cumulative_fetch : time spent waiting for storage servers
+ cumulative_decode : just time spent in zfec
+ cumulative_decrypt : just time spent in decryption
total : total download time, start to finish
servers_fetching : dict of peerid to list of per-segment fetch times
<li>Peer Selection: <span n:render="time" n:data="time_peer_selection" /></li>
<li>UEB Fetch: <span n:render="time" n:data="time_uri_extension" /></li>
<li>Hashtree Fetch: <span n:render="time" n:data="time_hashtrees" /></li>
- <li>Segment Fetch: <span n:render="time" n:data="time_fetching" />
- (<span n:render="rate" n:data="rate_fetching" />)</li>
+ <li>Segment Fetch: <span n:render="time" n:data="time_segments" />
+ (<span n:render="rate" n:data="rate_segments" />)</li>
<ul>
<li>Cumulative Fetching: <span n:render="time" n:data="time_cumulative_fetch" />
(<span n:render="rate" n:data="rate_fetch" />)</li>
- <li>Cumulative Decoding: <span n:render="time" n:data="time_cumulative_decoding" />
+ <li>Cumulative Decoding: <span n:render="time" n:data="time_cumulative_decode" />
(<span n:render="rate" n:data="rate_decode" />)</li>
+ <li>Cumulative Decrypting: <span n:render="time" n:data="time_cumulative_decrypt" />
+ (<span n:render="rate" n:data="rate_decrypt" />)</li>
</ul>
</ul>
</ul>
# XXX add redirect_to_result
return d
+def plural(sequence):
+ if len(sequence) == 1:
+ return ""
+ return "s"
+
class UploadResultsRendererMixin:
# this requires a method named 'upload_results'
l = T.ul()
for peerid in sorted(servermap.keys()):
peerid_s = idlib.shortnodeid_b2a(peerid)
- shares_s = ",".join([str(shnum) for shnum in servermap[peerid]])
- l[T.li["[%s] got shares: %s" % (peerid_s, shares_s)]]
+ shares_s = ",".join(["#%d" % shnum
+ for shnum in servermap[peerid]])
+ l[T.li["[%s] got share%s: %s" % (peerid_s,
+ plural(servermap[peerid]),
+ shares_s)]]
return l
d.addCallback(_render)
return d
l = T.ul()
for peerid in sorted(servermap.keys()):
peerid_s = idlib.shortnodeid_b2a(peerid)
- shares_s = ",".join([str(shnum) for shnum in servermap[peerid]])
- l[T.li["[%s] got shares: %s" % (peerid_s, shares_s)]]
+ shares_s = ",".join(["#%d" % shnum
+ for shnum in servermap[peerid]])
+ l[T.li["[%s] has share%s: %s" % (peerid_s,
+ plural(servermap[peerid]),
+ shares_s)]]
return l
d.addCallback(_render)
return d
def data_time_hashtrees(self, ctx, data):
return self._get_time("hashtrees")
- def data_time_fetching(self, ctx, data):
- return self._get_time("fetching")
+ def data_time_segments(self, ctx, data):
+ return self._get_time("segments")
def data_time_cumulative_fetch(self, ctx, data):
return self._get_time("cumulative_fetch")
- def data_time_cumulative_decoding(self, ctx, data):
- return self._get_time("cumulative_decoding")
+ def data_time_cumulative_decode(self, ctx, data):
+ return self._get_time("cumulative_decode")
+
+ def data_time_cumulative_decrypt(self, ctx, data):
+ return self._get_time("cumulative_decrypt")
def _get_rate(self, name):
d = self.download_results()
def data_rate_total(self, ctx, data):
return self._get_rate("total")
- def data_rate_fetching(self, ctx, data):
- return self._get_rate("fetching")
+ def data_rate_segments(self, ctx, data):
+ return self._get_rate("segments")
+
+ def data_rate_fetch(self, ctx, data):
+ return self._get_rate("cumulative_fetch")
def data_rate_decode(self, ctx, data):
- return self._get_rate("cumulative_decoding")
+ return self._get_rate("cumulative_decode")
- def data_rate_fetch(self, ctx, data):
- return self._get_rate("cumulative_fetching")
+ def data_rate_decrypt(self, ctx, data):
+ return self._get_rate("cumulative_decrypt")
class DownloadStatusPage(DownloadResultsRendererMixin, rend.Page):
docFactory = getxmlfile("download-status.xhtml")