d.addCallback(self._parse_and_validate)
return d
-class ValidatedReadBucketProxy:
+class ValidatedReadBucketProxy(log.PrefixingLogMixin):
"""I am a front-end for a remote storage bucket, responsible for
retrieving and validating data from that bucket.
share_hash_tree, share_root_hash,
num_blocks):
""" share_root_hash is the root of the share hash tree; share_root_hash is stored in the UEB """
+ prefix = "%d-%s-%s" % (sharenum, bucket, base32.b2a_l(share_hash_tree[0][:8], 60))
+ log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
self.sharenum = sharenum
self.bucket = bucket
self._share_hash = None # None means not validated yet
self._share_hash = sht.get_leaf(self.sharenum)
blockhash = hashutil.block_hash(blockdata)
- #log.msg("checking block_hash(shareid=%d, blocknum=%d) len=%d "
+ #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
# "%r .. %r: %s" %
# (self.sharenum, blocknum, len(blockdata),
# blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
except (hashtree.BadHashError, hashtree.NotEnoughHashesError):
# log.WEIRD: indicates undetected disk/network error, or more
# likely a programming error
- log.msg("hash failure in block=%d, shnum=%d on %s" %
+ self.log("hash failure in block=%d, shnum=%d on %s" %
(blocknum, self.sharenum, self.bucket))
if self._share_hash:
- log.msg(""" failure occurred when checking the block_hash_tree.
+ self.log(""" failure occurred when checking the block_hash_tree.
This suggests that either the block data was bad, or that the
block hashes we received along with it were bad.""")
else:
- log.msg(""" the failure probably occurred when checking the
+ self.log(""" the failure probably occurred when checking the
share_hash_tree, which suggests that the share hashes we
received from the remote peer were bad.""")
- log.msg(" have self._share_hash: %s" % bool(self._share_hash))
- log.msg(" block length: %d" % len(blockdata))
- log.msg(" block hash: %s" % base32.b2a_or_none(blockhash))
+ self.log(" have self._share_hash: %s" % bool(self._share_hash))
+ self.log(" block length: %d" % len(blockdata))
+ self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
if len(blockdata) < 100:
- log.msg(" block data: %r" % (blockdata,))
+ self.log(" block data: %r" % (blockdata,))
else:
- log.msg(" block data start/end: %r .. %r" %
+ self.log(" block data start/end: %r .. %r" %
(blockdata[:50], blockdata[-50:]))
- log.msg(" root hash: %s" % base32.b2a(self._share_root_hash))
- log.msg(" share hash tree:\n" + self.share_hash_tree.dump())
- log.msg(" block hash tree:\n" + self.block_hash_tree.dump())
+ self.log(" root hash: %s" % base32.b2a(self._share_root_hash))
+ self.log(" share hash tree:\n" + self.share_hash_tree.dump())
+ self.log(" block hash tree:\n" + self.block_hash_tree.dump())
lines = []
for i,h in sorted(sharehashes):
lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
- log.msg(" sharehashes:\n" + "\n".join(lines) + "\n")
+ self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
lines = []
for i,h in enumerate(blockhashes):
lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
-class BlockDownloader:
+class BlockDownloader(log.PrefixingLogMixin):
"""I am responsible for downloading a single block (from a single bucket)
for a single segment.
"""
def __init__(self, vbucket, blocknum, parent, results):
+ prefix = "%s-%d" % (vbucket, blocknum)
+ log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
self.vbucket = vbucket
self.blocknum = blocknum
self.parent = parent
self.results = results
- self._log_number = self.parent.log("starting block %d" % blocknum)
-
- def log(self, *args, **kwargs):
- if "parent" not in kwargs:
- kwargs["parent"] = self._log_number
- return self.parent.log(*args, **kwargs)
def start(self, segnum):
- lognum = self.log("get_block(segnum=%d)" % segnum)
+ self.log("get_block(segnum=%d)" % segnum)
started = time.time()
d = self.vbucket.get_block(segnum)
d.addCallbacks(self._hold_block, self._got_block_error,
- callbackArgs=(started, lognum,), errbackArgs=(lognum,))
+ callbackArgs=(started,))
return d
- def _hold_block(self, data, started, lognum):
+ def _hold_block(self, data, started):
if self.results:
elapsed = time.time() - started
peerid = self.vbucket.bucket.get_peerid()
if peerid not in self.results.timings["fetch_per_server"]:
self.results.timings["fetch_per_server"][peerid] = []
self.results.timings["fetch_per_server"][peerid].append(elapsed)
- self.log("got block", parent=lognum)
+ self.log("got block")
self.parent.hold_block(self.blocknum, data)
- def _got_block_error(self, f, lognum):
+ def _got_block_error(self, f):
level = log.WEIRD
if f.check(DeadReferenceError):
level = log.UNUSUAL
self.log("BlockDownloader[%d] got error" % self.blocknum,
- failure=f, level=level, parent=lognum, umid="5Z4uHQ")
+ failure=f, level=level, umid="5Z4uHQ")
if self.results:
peerid = self.vbucket.bucket.get_peerid()
self.results.server_problems[peerid] = str(f)
def set_results(self, value):
self.results = value
-class FileDownloader:
+class FileDownloader(log.PrefixingLogMixin):
implements(IPushProducer)
_status = None
def __init__(self, client, u, downloadable):
precondition(isinstance(u, uri.CHKFileURI), u)
+
+ prefix=base32.b2a_l(u.get_storage_index()[:8], 60)
+ log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
self._client = client
self._uri = u
- self._storage_index = u.storage_index
+ self._storage_index = u.get_storage_index()
self._uri_extension_hash = u.uri_extension_hash
self._vup = None # ValidatedExtendedURIProxy
- self._si_s = storage.si_b2a(self._storage_index)
- self.init_logging()
-
self._started = time.time()
self._status = s = DownloadStatus()
s.set_status("Starting")
if IConsumer.providedBy(downloadable):
downloadable.registerProducer(self, True)
self._downloadable = downloadable
- self._output = Output(downloadable, u.key, self._uri.size, self._log_number,
+ self._output = Output(downloadable, u.key, self._uri.size, self._parentmsgid,
self._status)
self.active_buckets = {} # k: shnum, v: bucket
self._crypttext_hash_tree = None
- def init_logging(self):
- self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
- num = self._client.log(format="FileDownloader(%(si)s): starting",
- si=storage.si_b2a(self._storage_index))
- self._log_number = num
-
- def log(self, *args, **kwargs):
- if "parent" not in kwargs:
- kwargs["parent"] = self._log_number
- if "facility" not in kwargs:
- kwargs["facility"] = "tahoe.download"
- return log.msg(*args, **kwargs)
-
def pauseProducing(self):
if self._paused:
return
level = log.WEIRD
if f.check(DeadReferenceError):
level = log.UNUSUAL
- self._client.log("Error during get_buckets", failure=f, level=level,
+ self.log("Error during get_buckets", failure=f, level=level,
umid="3uuBUQ")
def bucket_failed(self, vbucket):
vups = []
for sharenum, bucket in self._share_buckets:
vups.append(ValidatedExtendedURIProxy(bucket, self._uri.get_verify_cap(), self._fetch_failures))
- vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._log_number)
+ vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
d = vto.start()
def _got_uri_extension(vup):
if self._status:
self._status.set_status("Retrieving crypttext hash tree")
- vto = ValidatedThingObtainer(vchtps , debugname="vchtps", log_id=self._log_number)
+ vto = ValidatedThingObtainer(vchtps , debugname="vchtps", log_id=self._parentmsgid)
d = vto.start()
def _got_crypttext_hash_tree(res):