From: Zooko O'Whielacronx Date: Wed, 17 Dec 2008 01:04:50 +0000 (-0700) Subject: immutable: use new logging mixins to simplify logging X-Git-Url: https://git.rkrishnan.org/simplejson/__init__.py.html?a=commitdiff_plain;h=d67a3fe4b12f68bc06561df6a7c690859bdfb284;p=tahoe-lafs%2Ftahoe-lafs.git immutable: use new logging mixins to simplify logging --- diff --git a/src/allmydata/immutable/checker.py b/src/allmydata/immutable/checker.py index 69d6fb21..615e0f0a 100644 --- a/src/allmydata/immutable/checker.py +++ b/src/allmydata/immutable/checker.py @@ -166,6 +166,7 @@ class SimpleCHKFileVerifier(download.FileDownloader): def __init__(self, client, u, storage_index, k, N, size, ueb_hash): precondition(isinstance(u, CHKFileURI), u) + download.FileDownloader.__init__(self, client, u, None); self._client = client self._uri = u diff --git a/src/allmydata/immutable/download.py b/src/allmydata/immutable/download.py index e0cb9ac8..e8c0937d 100644 --- a/src/allmydata/immutable/download.py +++ b/src/allmydata/immutable/download.py @@ -335,7 +335,7 @@ class ValidatedExtendedURIProxy: d.addCallback(self._parse_and_validate) return d -class ValidatedReadBucketProxy: +class ValidatedReadBucketProxy(log.PrefixingLogMixin): """I am a front-end for a remote storage bucket, responsible for retrieving and validating data from that bucket. @@ -346,6 +346,8 @@ class ValidatedReadBucketProxy: share_hash_tree, share_root_hash, num_blocks): """ share_root_hash is the root of the share hash tree; share_root_hash is stored in the UEB """ + prefix = "%d-%s-%s" % (sharenum, bucket, base32.b2a_l(share_hash_tree[0][:8], 60)) + log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix) self.sharenum = sharenum self.bucket = bucket self._share_hash = None # None means not validated yet @@ -401,7 +403,7 @@ class ValidatedReadBucketProxy: self._share_hash = sht.get_leaf(self.sharenum) blockhash = hashutil.block_hash(blockdata) - #log.msg("checking block_hash(shareid=%d, blocknum=%d) len=%d " + #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d " # "%r .. %r: %s" % # (self.sharenum, blocknum, len(blockdata), # blockdata[:50], blockdata[-50:], base32.b2a(blockhash))) @@ -415,31 +417,31 @@ class ValidatedReadBucketProxy: except (hashtree.BadHashError, hashtree.NotEnoughHashesError): # log.WEIRD: indicates undetected disk/network error, or more # likely a programming error - log.msg("hash failure in block=%d, shnum=%d on %s" % + self.log("hash failure in block=%d, shnum=%d on %s" % (blocknum, self.sharenum, self.bucket)) if self._share_hash: - log.msg(""" failure occurred when checking the block_hash_tree. + self.log(""" failure occurred when checking the block_hash_tree. This suggests that either the block data was bad, or that the block hashes we received along with it were bad.""") else: - log.msg(""" the failure probably occurred when checking the + self.log(""" the failure probably occurred when checking the share_hash_tree, which suggests that the share hashes we received from the remote peer were bad.""") - log.msg(" have self._share_hash: %s" % bool(self._share_hash)) - log.msg(" block length: %d" % len(blockdata)) - log.msg(" block hash: %s" % base32.b2a_or_none(blockhash)) + self.log(" have self._share_hash: %s" % bool(self._share_hash)) + self.log(" block length: %d" % len(blockdata)) + self.log(" block hash: %s" % base32.b2a_or_none(blockhash)) if len(blockdata) < 100: - log.msg(" block data: %r" % (blockdata,)) + self.log(" block data: %r" % (blockdata,)) else: - log.msg(" block data start/end: %r .. %r" % + self.log(" block data start/end: %r .. %r" % (blockdata[:50], blockdata[-50:])) - log.msg(" root hash: %s" % base32.b2a(self._share_root_hash)) - log.msg(" share hash tree:\n" + self.share_hash_tree.dump()) - log.msg(" block hash tree:\n" + self.block_hash_tree.dump()) + self.log(" root hash: %s" % base32.b2a(self._share_root_hash)) + self.log(" share hash tree:\n" + self.share_hash_tree.dump()) + self.log(" block hash tree:\n" + self.block_hash_tree.dump()) lines = [] for i,h in sorted(sharehashes): lines.append("%3d: %s" % (i, base32.b2a_or_none(h))) - log.msg(" sharehashes:\n" + "\n".join(lines) + "\n") + self.log(" sharehashes:\n" + "\n".join(lines) + "\n") lines = [] for i,h in enumerate(blockhashes): lines.append("%3d: %s" % (i, base32.b2a_or_none(h))) @@ -454,7 +456,7 @@ class ValidatedReadBucketProxy: -class BlockDownloader: +class BlockDownloader(log.PrefixingLogMixin): """I am responsible for downloading a single block (from a single bucket) for a single segment. @@ -462,41 +464,37 @@ class BlockDownloader: """ def __init__(self, vbucket, blocknum, parent, results): + prefix = "%s-%d" % (vbucket, blocknum) + log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix) self.vbucket = vbucket self.blocknum = blocknum self.parent = parent self.results = results - self._log_number = self.parent.log("starting block %d" % blocknum) - - def log(self, *args, **kwargs): - if "parent" not in kwargs: - kwargs["parent"] = self._log_number - return self.parent.log(*args, **kwargs) def start(self, segnum): - lognum = self.log("get_block(segnum=%d)" % segnum) + self.log("get_block(segnum=%d)" % segnum) started = time.time() d = self.vbucket.get_block(segnum) d.addCallbacks(self._hold_block, self._got_block_error, - callbackArgs=(started, lognum,), errbackArgs=(lognum,)) + callbackArgs=(started,)) return d - def _hold_block(self, data, started, lognum): + def _hold_block(self, data, started): if self.results: elapsed = time.time() - started peerid = self.vbucket.bucket.get_peerid() if peerid not in self.results.timings["fetch_per_server"]: self.results.timings["fetch_per_server"][peerid] = [] self.results.timings["fetch_per_server"][peerid].append(elapsed) - self.log("got block", parent=lognum) + self.log("got block") self.parent.hold_block(self.blocknum, data) - def _got_block_error(self, f, lognum): + def _got_block_error(self, f): level = log.WEIRD if f.check(DeadReferenceError): level = log.UNUSUAL self.log("BlockDownloader[%d] got error" % self.blocknum, - failure=f, level=level, parent=lognum, umid="5Z4uHQ") + failure=f, level=level, umid="5Z4uHQ") if self.results: peerid = self.vbucket.bucket.get_peerid() self.results.server_problems[peerid] = str(f) @@ -626,22 +624,22 @@ class DownloadStatus: def set_results(self, value): self.results = value -class FileDownloader: +class FileDownloader(log.PrefixingLogMixin): implements(IPushProducer) _status = None def __init__(self, client, u, downloadable): precondition(isinstance(u, uri.CHKFileURI), u) + + prefix=base32.b2a_l(u.get_storage_index()[:8], 60) + log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix) self._client = client self._uri = u - self._storage_index = u.storage_index + self._storage_index = u.get_storage_index() self._uri_extension_hash = u.uri_extension_hash self._vup = None # ValidatedExtendedURIProxy - self._si_s = storage.si_b2a(self._storage_index) - self.init_logging() - self._started = time.time() self._status = s = DownloadStatus() s.set_status("Starting") @@ -665,7 +663,7 @@ class FileDownloader: if IConsumer.providedBy(downloadable): downloadable.registerProducer(self, True) self._downloadable = downloadable - self._output = Output(downloadable, u.key, self._uri.size, self._log_number, + self._output = Output(downloadable, u.key, self._uri.size, self._parentmsgid, self._status) self.active_buckets = {} # k: shnum, v: bucket @@ -676,19 +674,6 @@ class FileDownloader: self._crypttext_hash_tree = None - def init_logging(self): - self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5] - num = self._client.log(format="FileDownloader(%(si)s): starting", - si=storage.si_b2a(self._storage_index)) - self._log_number = num - - def log(self, *args, **kwargs): - if "parent" not in kwargs: - kwargs["parent"] = self._log_number - if "facility" not in kwargs: - kwargs["facility"] = "tahoe.download" - return log.msg(*args, **kwargs) - def pauseProducing(self): if self._paused: return @@ -788,7 +773,7 @@ class FileDownloader: level = log.WEIRD if f.check(DeadReferenceError): level = log.UNUSUAL - self._client.log("Error during get_buckets", failure=f, level=level, + self.log("Error during get_buckets", failure=f, level=level, umid="3uuBUQ") def bucket_failed(self, vbucket): @@ -831,7 +816,7 @@ class FileDownloader: vups = [] for sharenum, bucket in self._share_buckets: vups.append(ValidatedExtendedURIProxy(bucket, self._uri.get_verify_cap(), self._fetch_failures)) - vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._log_number) + vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid) d = vto.start() def _got_uri_extension(vup): @@ -866,7 +851,7 @@ class FileDownloader: if self._status: self._status.set_status("Retrieving crypttext hash tree") - vto = ValidatedThingObtainer(vchtps , debugname="vchtps", log_id=self._log_number) + vto = ValidatedThingObtainer(vchtps , debugname="vchtps", log_id=self._parentmsgid) d = vto.start() def _got_crypttext_hash_tree(res):