From 0f94923f223a0d2dbfe69d064019d743a7eeeb03 Mon Sep 17 00:00:00 2001 From: Zooko O'Whielacronx Date: Mon, 19 Jul 2010 01:20:00 -0700 Subject: [PATCH] immutable: use PrefixingLogMixin to organize logging in Tahoe2PeerSelector and add more detailed messages about peer --- src/allmydata/immutable/upload.py | 49 +++++++++++++++++-------------- src/allmydata/storage/server.py | 5 +++- src/allmydata/test/test_upload.py | 5 ++-- 3 files changed, 34 insertions(+), 25 deletions(-) diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index ecbae824..07045711 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -74,6 +74,9 @@ EXTENSION_SIZE = 1000 # TODO: actual extensions are closer to 419 bytes, so we can probably lower # this. +def pretty_print_shnum_to_servers(s): + return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ]) + class PeerTracker: def __init__(self, peerid, storage_server, sharesize, blocksize, num_segments, num_share_hashes, @@ -152,7 +155,7 @@ class PeerTracker: del self.buckets[sharenum] -class Tahoe2PeerSelector: +class Tahoe2PeerSelector(log.PrefixingLogMixin): def __init__(self, upload_id, logparent=None, upload_status=None): self.upload_id = upload_id @@ -163,7 +166,8 @@ class Tahoe2PeerSelector: self.num_peers_contacted = 0 self.last_failure_msg = None self._status = IUploadStatus(upload_status) - self._log_parent = log.msg("%s starting" % self, parent=logparent) + log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id) + self.log("starting", level=log.OPERATIONAL) def __repr__(self): return "" % self.upload_id @@ -268,10 +272,9 @@ class Tahoe2PeerSelector: ds.append(d) self.num_peers_contacted += 1 self.query_count += 1 - log.msg("asking peer %s for any existing shares for " - "upload id %s" - % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id), - level=log.NOISY, parent=self._log_parent) + self.log("asking peer %s for any existing shares" % + (idlib.shortnodeid_b2a(peer.peerid),), + level=log.NOISY) dl = defer.DeferredList(ds) dl.addCallback(lambda ign: self._loop()) return dl @@ -283,18 +286,18 @@ class Tahoe2PeerSelector: Tahoe2PeerSelector._existing_shares. """ if isinstance(res, failure.Failure): - log.msg("%s got error during existing shares check: %s" + self.log("%s got error during existing shares check: %s" % (idlib.shortnodeid_b2a(peer), res), - level=log.UNUSUAL, parent=self._log_parent) + level=log.UNUSUAL) self.error_count += 1 self.bad_query_count += 1 else: buckets = res if buckets: self.peers_with_shares.add(peer) - log.msg("response from peer %s: alreadygot=%s" + self.log("response to get_buckets() from peer %s: alreadygot=%s" % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))), - level=log.NOISY, parent=self._log_parent) + level=log.NOISY) for bucket in buckets: self.preexisting_shares.setdefault(bucket, set()).add(peer) if self.homeless_shares and bucket in self.homeless_shares: @@ -328,9 +331,8 @@ class Tahoe2PeerSelector: merged = merge_peers(self.preexisting_shares, self.use_peers) effective_happiness = servers_of_happiness(merged) if self.servers_of_happiness <= effective_happiness: - msg = ("peer selection successful for %s: %s" % (self, - self._get_progress_message())) - log.msg(msg, parent=self._log_parent) + msg = ("server selection successful for %s: %s: pretty_print_merged: %s, self.use_peers: %s, self.preexisting_shares: %s" % (self, self._get_progress_message(), pretty_print_shnum_to_servers(merged), [', '.join(["%s: %s" % (k, idlib.shortnodeid_b2a(v._nodeid),) for k,v in p.buckets.iteritems()]) for p in self.use_peers], pretty_print_shnum_to_servers(self.preexisting_shares))) + self.log(msg, level=log.OPERATIONAL) return (self.use_peers, self.preexisting_shares) else: # We're not okay right now, but maybe we can fix it by @@ -374,8 +376,7 @@ class Tahoe2PeerSelector: self.needed_shares, self.servers_of_happiness, effective_happiness) - log.msg("server selection unsuccessful for %r: %s (%s), merged=%r" - % (self, msg, self._get_progress_message(), merged), level=log.INFREQUENT) + self.log("server selection unsuccessful for %r: %s (%s), merged=%s" % (self, msg, self._get_progress_message(), pretty_print_shnum_to_servers(merged)), level=log.INFREQUENT) return self._failed("%s (%s)" % (msg, self._get_progress_message())) if self.uncontacted_peers: @@ -398,7 +399,7 @@ class Tahoe2PeerSelector: elif self.contacted_peers: # ask a peer that we've already asked. if not self._started_second_pass: - log.msg("starting second pass", parent=self._log_parent, + self.log("starting second pass", level=log.NOISY) self._started_second_pass = True num_shares = mathutil.div_ceil(len(self.homeless_shares), @@ -436,20 +437,23 @@ class Tahoe2PeerSelector: self._get_progress_message())) if self.last_failure_msg: msg += " (%s)" % (self.last_failure_msg,) - log.msg(msg, level=log.UNUSUAL, parent=self._log_parent) + self.log(msg, level=log.UNUSUAL) return self._failed(msg) else: # we placed enough to be happy, so we're done if self._status: self._status.set_status("Placed all shares") + msg = ("server selection successful (no more servers) for %s: %s: %s" % (self, + self._get_progress_message(), pretty_print_shnum_to_servers(merged))) + self.log(msg, level=log.OPERATIONAL) return (self.use_peers, self.preexisting_shares) def _got_response(self, res, peer, shares_to_ask, put_peer_here): if isinstance(res, failure.Failure): # This is unusual, and probably indicates a bug or a network # problem. - log.msg("%s got error during peer selection: %s" % (peer, res), - level=log.UNUSUAL, parent=self._log_parent) + self.log("%s got error during peer selection: %s" % (peer, res), + level=log.UNUSUAL) self.error_count += 1 self.bad_query_count += 1 self.homeless_shares = list(shares_to_ask) + self.homeless_shares @@ -468,10 +472,10 @@ class Tahoe2PeerSelector: self.last_failure_msg = msg else: (alreadygot, allocated) = res - log.msg("response from peer %s: alreadygot=%s, allocated=%s" + self.log("response to allocate_buckets() from peer %s: alreadygot=%s, allocated=%s" % (idlib.shortnodeid_b2a(peer.peerid), tuple(sorted(alreadygot)), tuple(sorted(allocated))), - level=log.NOISY, parent=self._log_parent) + level=log.NOISY) progress = False for s in alreadygot: self.preexisting_shares.setdefault(s, set()).add(peer.peerid) @@ -914,7 +918,7 @@ class CHKUploader: @paran already_peers: a dict mapping sharenum to a set of peerids that claim to already have this share """ - self.log("_send_shares, upload_servers is %s" % (upload_servers,)) + self.log("set_shareholders; upload_servers is %s, already_peers is %s" % ([', '.join(["%s: %s" % (k, idlib.shortnodeid_b2a(v._nodeid),) for k,v in p.buckets.iteritems()]) for p in upload_servers], already_peers)) # record already-present shares in self._results self._results.preexisting_shares = len(already_peers) @@ -928,6 +932,7 @@ class CHKUploader: for shnum in peer.buckets: self._peer_trackers[shnum] = peer servermap.setdefault(shnum, set()).add(peer.peerid) + self.log("set_shareholders; %s (%s) == %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers])) assert len(buckets) == sum([len(peer.buckets) for peer in upload_servers]), "%s (%s) != %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers]) encoder.set_shareholders(buckets, servermap) diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index 22ec1621..1af9c890 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -5,7 +5,7 @@ from twisted.application import service from zope.interface import implements from allmydata.interfaces import RIStorageServer, IStatsProducer -from allmydata.util import fileutil, log, time_format +from allmydata.util import fileutil, idlib, log, time_format import allmydata # for __full_version__ from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir @@ -106,6 +106,9 @@ class StorageServer(service.MultiService, Referenceable): expiration_sharetypes) self.lease_checker.setServiceParent(self) + def __repr__(self): + return "" % (idlib.shortnodeid_b2a(self.my_nodeid),) + def add_bucket_counter(self): statefile = os.path.join(self.storedir, "bucket_counter.state") self.bucket_counter = BucketCountingCrawler(self, statefile) diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index ab5ffae5..0cf9495f 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -11,6 +11,7 @@ import allmydata # for __full_version__ from allmydata import uri, monitor, client from allmydata.immutable import upload, encode from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError +from allmydata.util import log from allmydata.util.assertutil import precondition from allmydata.util.deferredutil import DeferredListShouldSucceed from allmydata.test.no_network import GridTestMixin @@ -710,6 +711,7 @@ def combinations(iterable, r): def is_happy_enough(servertoshnums, h, k): """ I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """ + print "servertoshnums: ", servertoshnums, "h: ", h, "k: ", k if len(servertoshnums) < h: return False # print "servertoshnums: ", servertoshnums, h, k @@ -798,9 +800,9 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, def _add_server(self, server_number, readonly=False): assert self.g, "I tried to find a grid at self.g, but failed" ss = self.g.make_server(server_number, readonly) + log.msg("just created a server, number: %s => %s" % (server_number, ss,)) self.g.add_server(server_number, ss) - def _add_server_with_share(self, server_number, share_number=None, readonly=False): self._add_server(server_number, readonly) @@ -861,7 +863,6 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, d.addCallback(_store_shares) return d - def test_configure_parameters(self): self.basedir = self.mktemp() hooks = {0: self._set_up_nodes_extra_config} -- 2.45.2