From 33a5f8ba6be3905105607cc8a31b4737f52ba64d Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Mon, 19 Nov 2007 19:33:41 -0700 Subject: [PATCH] more hierarchical logging: download/upload/encode --- src/allmydata/download.py | 25 ++++++++++-- src/allmydata/encode.py | 67 ++++++++++++++++++++----------- src/allmydata/test/test_upload.py | 1 - src/allmydata/upload.py | 13 ++++-- 4 files changed, 74 insertions(+), 32 deletions(-) diff --git a/src/allmydata/download.py b/src/allmydata/download.py index 962473a7..88834667 100644 --- a/src/allmydata/download.py +++ b/src/allmydata/download.py @@ -219,17 +219,27 @@ class BlockDownloader: self.vbucket = vbucket self.blocknum = blocknum self.parent = parent + self._log_number = self.parent.log("starting block %d" % blocknum) + + def log(self, msg, parent=None): + if parent is None: + parent = self._log_number + return self.parent.log(msg, parent=parent) def start(self, segnum): + lognum = self.log("get_block(segnum=%d)" % segnum) d = self.vbucket.get_block(segnum) - d.addCallbacks(self._hold_block, self._got_block_error) + d.addCallbacks(self._hold_block, self._got_block_error, + callbackArgs=(lognum,), errbackArgs=(lognum,)) return d - def _hold_block(self, data): + def _hold_block(self, data, lognum): + self.log("got block", parent=lognum) self.parent.hold_block(self.blocknum, data) - def _got_block_error(self, f): - log.msg("BlockDownloader[%d] got error: %s" % (self.blocknum, f)) + def _got_block_error(self, f, lognum): + self.log("BlockDownloader[%d] got error: %s" % (self.blocknum, f), + parent=lognum) self.parent.bucket_failed(self.vbucket) class SegmentDownloader: @@ -244,6 +254,13 @@ class SegmentDownloader: self.segmentnumber = segmentnumber self.needed_blocks = needed_shares self.blocks = {} # k: blocknum, v: data + self._log_number = self.parent.log("starting segment %d" % + segmentnumber) + + def log(self, msg, parent=None): + if parent is None: + parent = self._log_number + return self.parent.log(msg, parent=parent) def start(self): return self._download() diff --git a/src/allmydata/encode.py b/src/allmydata/encode.py index 90ace0a5..4544cd7e 100644 --- a/src/allmydata/encode.py +++ b/src/allmydata/encode.py @@ -2,7 +2,6 @@ from zope.interface import implements from twisted.internet import defer -from twisted.python import log from foolscap import eventual from allmydata import uri from allmydata.hashtree import HashTree @@ -76,7 +75,7 @@ class Encoder(object): TOTAL_SHARES = 10 MAX_SEGMENT_SIZE = 1*MiB - def __init__(self, options={}): + def __init__(self, options={}, parent=None): object.__init__(self) self.MAX_SEGMENT_SIZE = options.get("max_segment_size", self.MAX_SEGMENT_SIZE) @@ -89,12 +88,22 @@ class Encoder(object): self.TOTAL_SHARES = n self.uri_extension_data = {} self._codec = None + self._parent = parent + if self._parent: + self._log_number = self._parent.log("starting Encoder %s" % self) def __repr__(self): if hasattr(self, "_storage_index"): return "" % idlib.b2a(self._storage_index)[:6] return "" + def log(self, msg, parent=None): + if not self._parent: + return + if parent is None: + parent = self._log_number + return self._parent.log(msg, parent=parent) + def set_size(self, size): assert not self._codec self.file_size = size @@ -105,6 +114,7 @@ class Encoder(object): self.NEEDED_SHARES = k self.SHARES_OF_HAPPINESS = d self.TOTAL_SHARES = n + self.log("set_params: %d,%d,%d" % (k, d, n)) def _setup_codec(self): self.num_shares = self.TOTAL_SHARES @@ -205,6 +215,7 @@ class Encoder(object): self.landlords = landlords.copy() def start(self): + self.log("starting") #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares) if not self._codec: self._setup_codec() @@ -372,10 +383,11 @@ class Encoder(object): _assert(set(self.landlords.keys()).issubset(set(shareids)), shareids=shareids, landlords=self.landlords) dl = [] + lognum = self.log("send_segment(%d)" % segnum) for i in range(len(shares)): subshare = shares[i] shareid = shareids[i] - d = self.send_subshare(shareid, segnum, subshare) + d = self.send_subshare(shareid, segnum, subshare, lognum) dl.append(d) subshare_hash = hashutil.block_hash(subshare) #from allmydata.util import idlib @@ -387,39 +399,46 @@ class Encoder(object): dl = self._gather_responses(dl) def _logit(res): - log.msg("%s uploaded %s / %s bytes (%d%%) of your file." % - (self, - self.segment_size*(segnum+1), - self.segment_size*self.num_segments, - 100 * (segnum+1) / self.num_segments, - )) + self.log("%s uploaded %s / %s bytes (%d%%) of your file." % + (self, + self.segment_size*(segnum+1), + self.segment_size*self.num_segments, + 100 * (segnum+1) / self.num_segments, + )) return res dl.addCallback(_logit) return dl - def send_subshare(self, shareid, segment_num, subshare): + def send_subshare(self, shareid, segment_num, subshare, lognum): if shareid not in self.landlords: return defer.succeed(None) sh = self.landlords[shareid] + lognum2 = self.log("put_block to %s" % self.landlords[shareid], + parent=lognum) d = sh.put_block(segment_num, subshare) + def _done(res): + self.log("put_block done", parent=lognum2) + return res + d.addCallback(_done) d.addErrback(self._remove_shareholder, shareid, "segnum=%d" % segment_num) return d def _remove_shareholder(self, why, shareid, where): - log.msg("error while sending %s to shareholder=%d: %s" % - (where, shareid, why)) # UNUSUAL + ln = self.log("UNUSUAL: error while sending %s to shareholder=%d: %s" % + (where, shareid, why)) if shareid in self.landlords: del self.landlords[shareid] else: # even more UNUSUAL - log.msg(" weird, they weren't in our list of landlords") + self.log("WEIRD: they weren't in our list of landlords", parent=ln) if len(self.landlords) < self.shares_of_happiness: msg = "lost too many shareholders during upload: %s" % why raise NotEnoughPeersError(msg) - log.msg("but we can still continue with %s shares, we'll be happy " - "with at least %s" % (len(self.landlords), - self.shares_of_happiness)) + self.log("but we can still continue with %s shares, we'll be happy " + "with at least %s" % (len(self.landlords), + self.shares_of_happiness), + parent=ln) def _gather_responses(self, dl): d = defer.DeferredList(dl, fireOnOneErrback=True) @@ -452,7 +471,7 @@ class Encoder(object): return d def send_plaintext_hash_tree_to_all_shareholders(self): - log.msg("%s sending plaintext hash tree" % self) + self.log("sending plaintext hash tree") dl = [] for shareid in self.landlords.keys(): d = self.send_plaintext_hash_tree(shareid, @@ -469,7 +488,7 @@ class Encoder(object): return d def send_crypttext_hash_tree_to_all_shareholders(self): - log.msg("%s sending crypttext hash tree" % self) + self.log("sending crypttext hash tree") t = HashTree(self._crypttext_hashes) all_hashes = list(t) self.uri_extension_data["crypttext_root_hash"] = t[0] @@ -487,7 +506,7 @@ class Encoder(object): return d def send_all_subshare_hash_trees(self): - log.msg("%s sending subshare hash trees" % self) + self.log("sending subshare hash trees") dl = [] for shareid,hashes in enumerate(self.subshare_hashes): # hashes is a list of the hashes of all subshares that were sent @@ -514,7 +533,7 @@ class Encoder(object): # validate their share. This includes the share hash itself, but does # not include the top-level hash root (which is stored securely in # the URI instead). - log.msg("%s sending all share hash trees" % self) + self.log("sending all share hash trees") dl = [] for h in self.share_root_hashes: assert h @@ -540,7 +559,7 @@ class Encoder(object): return d def send_uri_extension_to_all_shareholders(self): - log.msg("%s: sending uri_extension" % self) + self.log("sending uri_extension") for k in ('crypttext_root_hash', 'crypttext_hash', 'plaintext_root_hash', 'plaintext_hash', ): @@ -559,7 +578,7 @@ class Encoder(object): return d def close_all_shareholders(self): - log.msg("%s: closing shareholders" % self) + self.log("closing shareholders") dl = [] for shareid in self.landlords: d = self.landlords[shareid].close() @@ -568,12 +587,12 @@ class Encoder(object): return self._gather_responses(dl) def done(self): - log.msg("%s: upload done" % self) + self.log("upload done") return (self.uri_extension_hash, self.required_shares, self.num_shares, self.file_size) def err(self, f): - log.msg("%s: upload failed: %s" % (self, f)) # UNUSUAL + self.log("UNUSUAL: %s: upload failed: %s" % (self, f)) if f.check(defer.FirstError): return f.value.subFailure return f diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index b7d11262..f4318565 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -139,7 +139,6 @@ class FakeClient: def __init__(self, mode="good", num_servers=50): self.mode = mode self.num_servers = num_servers - self.introducer_client = FakeIntroducerClient() def get_permuted_peers(self, storage_index, include_myself): peers = [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),) for fakeid in range(self.num_servers) ] diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index b300f95c..496568c7 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -420,10 +420,16 @@ class CHKUploader: self._client = client self._wait_for_numpeers = wait_for_numpeers self._options = options + self._log_number = self._client.log("CHKUploader starting") def set_params(self, encoding_parameters): self._encoding_parameters = encoding_parameters + def log(self, msg, parent=None): + if parent is None: + parent = self._log_number + return self._client.log(msg, parent=parent) + def start(self, uploadable): """Start uploading the file. @@ -431,7 +437,7 @@ class CHKUploader: string).""" uploadable = IUploadable(uploadable) - log.msg("starting upload of %s" % uploadable) + self.log("starting upload of %s" % uploadable) eu = EncryptAnUploadable(uploadable) d = self.start_encrypted(eu) @@ -445,7 +451,7 @@ class CHKUploader: def start_encrypted(self, encrypted): eu = IEncryptedUploadable(encrypted) - e = encode.Encoder(self._options) + e = encode.Encoder(self._options, self) e.set_params(self._encoding_parameters) d = e.set_encrypted_uploadable(eu) def _wait_for_peers(res): @@ -467,6 +473,7 @@ class CHKUploader: def locate_all_shareholders(self, encoder): storage_index = encoder.get_param("storage_index") upload_id = idlib.b2a(storage_index)[:6] + self.log("using storage index %s" % upload_id) peer_selector = self.peer_selector_class(upload_id) share_size = encoder.get_param("share_size") @@ -484,7 +491,7 @@ class CHKUploader: """ @param used_peers: a sequence of PeerTracker objects """ - log.msg("_send_shares, used_peers is %s" % (used_peers,)) + self.log("_send_shares, used_peers is %s" % (used_peers,)) for peer in used_peers: assert isinstance(peer, PeerTracker) buckets = {} -- 2.45.2