class DecryptingTarget(log.PrefixingLogMixin):
implements(IDownloadTarget, IConsumer)
- def __init__(self, downloadable, key, _log_msg_id=None):
- precondition(IDownloadTarget.providedBy(downloadable), downloadable)
- self.downloadable = downloadable
+ def __init__(self, target, key, _log_msg_id=None):
+ precondition(IDownloadTarget.providedBy(target), target)
+ self.target = target
self._decryptor = AES(key)
- prefix = str(downloadable)
+ prefix = str(target)
log.PrefixingLogMixin.__init__(self, "allmydata.immutable.download", _log_msg_id, prefix=prefix)
def registerProducer(self, producer, streaming):
- if IConsumer.providedBy(self.downloadable):
- self.downloadable.registerProducer(producer, streaming)
+ if IConsumer.providedBy(self.target):
+ self.target.registerProducer(producer, streaming)
def unregisterProducer(self):
- if IConsumer.providedBy(self.downloadable):
- self.downloadable.unregisterProducer()
+ if IConsumer.providedBy(self.target):
+ self.target.unregisterProducer()
def write(self, ciphertext):
plaintext = self._decryptor.process(ciphertext)
- self.downloadable.write(plaintext)
+ self.target.write(plaintext)
def open(self, size):
- self.downloadable.open(size)
+ self.target.open(size)
def close(self):
- self.downloadable.close()
+ self.target.close()
def finish(self):
- return self.downloadable.finish()
+ return self.target.finish()
class ValidatedThingObtainer:
def __init__(self, validatedthingproxies, debugname, log_id):
"""I am responsible for downloading all the blocks for a single segment
of data.
- I am a child of the FileDownloader.
+ I am a child of the CiphertextDownloader.
"""
def __init__(self, parent, segmentnumber, needed_shares, results):
def set_results(self, value):
self.results = value
-class FileDownloader(log.PrefixingLogMixin):
+class CiphertextDownloader(log.PrefixingLogMixin):
""" I download shares, check their integrity, then decode them, check the integrity of the
resulting ciphertext, then and write it to my target. """
implements(IPushProducer)
_status = None
- def __init__(self, client, u, downloadable):
- precondition(IVerifierURI.providedBy(u), u)
- precondition(IDownloadTarget.providedBy(downloadable), downloadable)
+ def __init__(self, client, v, target):
+ precondition(IVerifierURI.providedBy(v), v)
+ precondition(IDownloadTarget.providedBy(target), target)
- prefix=base32.b2a_l(u.get_storage_index()[:8], 60)
+ prefix=base32.b2a_l(v.get_storage_index()[:8], 60)
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
self._client = client
- self._uri = u
- self._storage_index = u.get_storage_index()
- self._uri_extension_hash = u.uri_extension_hash
+ self._verifycap = v
+ self._storage_index = v.get_storage_index()
+ self._uri_extension_hash = v.uri_extension_hash
self._vup = None # ValidatedExtendedURIProxy
self._started = time.time()
self._status = s = DownloadStatus()
s.set_status("Starting")
s.set_storage_index(self._storage_index)
- s.set_size(self._uri.size)
+ s.set_size(self._verifycap.size)
s.set_helper(False)
s.set_active(True)
self._results = DownloadResults()
s.set_results(self._results)
- self._results.file_size = self._uri.size
+ self._results.file_size = self._verifycap.size
self._results.timings["servers_peer_selection"] = {}
self._results.timings["fetch_per_server"] = {}
self._results.timings["cumulative_fetch"] = 0.0
self._paused = False
self._stopped = False
- if IConsumer.providedBy(downloadable):
- downloadable.registerProducer(self, True)
- self._downloadable = downloadable
+ if IConsumer.providedBy(target):
+ target.registerProducer(self, True)
+ self._target = target
self._opened = False
self.active_buckets = {} # k: shnum, v: bucket
self._ciphertext_hasher = hashutil.crypttext_hasher()
self._bytes_done = 0
- self._status.set_progress(float(self._bytes_done)/self._uri.size)
+ self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
# _got_uri_extension() will create the following:
# self._crypttext_hash_tree
self._status.set_status("Finished")
self._status.set_active(False)
self._status.set_paused(False)
- if IConsumer.providedBy(self._downloadable):
- self._downloadable.unregisterProducer()
+ if IConsumer.providedBy(self._target):
+ self._target.unregisterProducer()
return res
d.addBoth(_finished)
def _failed(why):
now = time.time()
self._results.timings["peer_selection"] = now - self._started
- if len(self._share_buckets) < self._uri.needed_shares:
- raise NotEnoughSharesError(len(self._share_buckets), self._uri.needed_shares)
+ if len(self._share_buckets) < self._verifycap.needed_shares:
+ raise NotEnoughSharesError(len(self._share_buckets), self._verifycap.needed_shares)
#for s in self._share_vbuckets.values():
# for vb in s:
vups = []
for sharenum, bucket in self._share_buckets:
- vups.append(ValidatedExtendedURIProxy(bucket, self._uri, self._fetch_failures))
+ vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
d = vto.start()
self._vup = vup
self._codec = codec.CRSDecoder()
- self._codec.set_params(self._vup.segment_size, self._uri.needed_shares, self._uri.total_shares)
+ self._codec.set_params(self._vup.segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
self._tail_codec = codec.CRSDecoder()
- self._tail_codec.set_params(self._vup.tail_segment_size, self._uri.needed_shares, self._uri.total_shares)
+ self._tail_codec.set_params(self._vup.tail_segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
self._current_segnum = 0
- self._share_hash_tree = hashtree.IncompleteHashTree(self._uri.total_shares)
+ self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares)
self._share_hash_tree.set_hashes({0: vup.share_root_hash})
self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
"""either return a mapping from shnum to a ValidatedReadBucketProxy that can
provide data for that share, or raise NotEnoughSharesError"""
- while len(self.active_buckets) < self._uri.needed_shares:
+ while len(self.active_buckets) < self._verifycap.needed_shares:
# need some more
handled_shnums = set(self.active_buckets.keys())
available_shnums = set(self._share_vbuckets.keys())
potential_shnums = list(available_shnums - handled_shnums)
- if len(potential_shnums) < (self._uri.needed_shares - len(self.active_buckets)):
+ if len(potential_shnums) < (self._verifycap.needed_shares - len(self.active_buckets)):
raise NotEnoughSharesError
# For the next share, choose a primary share if available, else a randomly chosen
# secondary share.
potential_shnums.sort()
- if potential_shnums[0] < self._uri.needed_shares:
+ if potential_shnums[0] < self._verifycap.needed_shares:
shnum = potential_shnums[0]
else:
shnum = random.choice(potential_shnums)
100.0 * segnum / self._vup.num_segments))
# memory footprint: when the SegmentDownloader finishes pulling down
# all shares, we have 1*segment_size of usage.
- segmentdler = SegmentDownloader(self, segnum, self._uri.needed_shares,
+ segmentdler = SegmentDownloader(self, segnum, self._verifycap.needed_shares,
self._results)
started = time.time()
d = segmentdler.start()
def _got_segment(self, buffers):
precondition(self._crypttext_hash_tree)
started_decrypt = time.time()
- self._status.set_progress(float(self._current_segnum)/self._uri.size)
+ self._status.set_progress(float(self._current_segnum)/self._verifycap.size)
if self._current_segnum + 1 == self._vup.num_segments:
# This is the last segment.
# Trim off any padding added by the upload side. We never send empty segments. If
# the data was an exact multiple of the segment size, the last segment will be full.
- tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._uri.needed_shares)
+ tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares)
num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size)
# Remove buffers which don't contain any part of the tail.
del buffers[num_buffers_used:]
# Then write this segment to the target.
if not self._opened:
self._opened = True
- self._downloadable.open(self._uri.size)
+ self._target.open(self._verifycap.size)
for buffer in buffers:
- self._downloadable.write(buffer)
+ self._target.write(buffer)
self._bytes_done += len(buffer)
- self._status.set_progress(float(self._bytes_done)/self._uri.size)
+ self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
self._current_segnum += 1
if self._results:
"bad crypttext_hash: computed=%s, expected=%s" %
(base32.b2a(self._ciphertext_hasher.digest()),
base32.b2a(self._vup.crypttext_hash)))
- _assert(self._bytes_done == self._uri.size, self._bytes_done, self._uri.size)
+ _assert(self._bytes_done == self._verifycap.size, self._bytes_done, self._verifycap.size)
self._status.set_progress(1)
- self._downloadable.close()
- return self._downloadable.finish()
+ self._target.close()
+ return self._target.finish()
def get_download_status(self):
return self._status
self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
- dl = FileDownloader(self.parent, u.get_verify_cap(), target)
+ dl = CiphertextDownloader(self.parent, u.get_verify_cap(), target)
self._add_download(dl)
d = dl.start()
return d