]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
immutable: use PrefixingLogMixin to organize logging in Tahoe2PeerSelector and add...
authorZooko O'Whielacronx <zooko@zooko.com>
Mon, 19 Jul 2010 08:20:00 +0000 (01:20 -0700)
committerZooko O'Whielacronx <zooko@zooko.com>
Mon, 19 Jul 2010 08:20:00 +0000 (01:20 -0700)
src/allmydata/immutable/upload.py
src/allmydata/storage/server.py
src/allmydata/test/test_upload.py

index ecbae82400331ebb88ca5805e1a6049f6a314d52..07045711254d49521a5e1fa9f171becc8af26ed7 100644 (file)
@@ -74,6 +74,9 @@ EXTENSION_SIZE = 1000
 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
 # this.
 
+def pretty_print_shnum_to_servers(s):
+    return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
+
 class PeerTracker:
     def __init__(self, peerid, storage_server,
                  sharesize, blocksize, num_segments, num_share_hashes,
@@ -152,7 +155,7 @@ class PeerTracker:
                 del self.buckets[sharenum]
 
 
-class Tahoe2PeerSelector:
+class Tahoe2PeerSelector(log.PrefixingLogMixin):
 
     def __init__(self, upload_id, logparent=None, upload_status=None):
         self.upload_id = upload_id
@@ -163,7 +166,8 @@ class Tahoe2PeerSelector:
         self.num_peers_contacted = 0
         self.last_failure_msg = None
         self._status = IUploadStatus(upload_status)
-        self._log_parent = log.msg("%s starting" % self, parent=logparent)
+        log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
+        self.log("starting", level=log.OPERATIONAL)
 
     def __repr__(self):
         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
@@ -268,10 +272,9 @@ class Tahoe2PeerSelector:
             ds.append(d)
             self.num_peers_contacted += 1
             self.query_count += 1
-            log.msg("asking peer %s for any existing shares for "
-                    "upload id %s"
-                    % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
-                    level=log.NOISY, parent=self._log_parent)
+            self.log("asking peer %s for any existing shares" %
+                     (idlib.shortnodeid_b2a(peer.peerid),),
+                    level=log.NOISY)
         dl = defer.DeferredList(ds)
         dl.addCallback(lambda ign: self._loop())
         return dl
@@ -283,18 +286,18 @@ class Tahoe2PeerSelector:
         Tahoe2PeerSelector._existing_shares.
         """
         if isinstance(res, failure.Failure):
-            log.msg("%s got error during existing shares check: %s"
+            self.log("%s got error during existing shares check: %s"
                     % (idlib.shortnodeid_b2a(peer), res),
-                    level=log.UNUSUAL, parent=self._log_parent)
+                    level=log.UNUSUAL)
             self.error_count += 1
             self.bad_query_count += 1
         else:
             buckets = res
             if buckets:
                 self.peers_with_shares.add(peer)
-            log.msg("response from peer %s: alreadygot=%s"
+            self.log("response to get_buckets() from peer %s: alreadygot=%s"
                     % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
-                    level=log.NOISY, parent=self._log_parent)
+                    level=log.NOISY)
             for bucket in buckets:
                 self.preexisting_shares.setdefault(bucket, set()).add(peer)
                 if self.homeless_shares and bucket in self.homeless_shares:
@@ -328,9 +331,8 @@ class Tahoe2PeerSelector:
             merged = merge_peers(self.preexisting_shares, self.use_peers)
             effective_happiness = servers_of_happiness(merged)
             if self.servers_of_happiness <= effective_happiness:
-                msg = ("peer selection successful for %s: %s" % (self,
-                            self._get_progress_message()))
-                log.msg(msg, parent=self._log_parent)
+                msg = ("server selection successful for %s: %s: pretty_print_merged: %s, self.use_peers: %s, self.preexisting_shares: %s" % (self, self._get_progress_message(), pretty_print_shnum_to_servers(merged), [', '.join(["%s: %s" % (k, idlib.shortnodeid_b2a(v._nodeid),) for k,v in p.buckets.iteritems()]) for p in self.use_peers], pretty_print_shnum_to_servers(self.preexisting_shares)))
+                self.log(msg, level=log.OPERATIONAL)
                 return (self.use_peers, self.preexisting_shares)
             else:
                 # We're not okay right now, but maybe we can fix it by
@@ -374,8 +376,7 @@ class Tahoe2PeerSelector:
                                           self.needed_shares,
                                           self.servers_of_happiness,
                                           effective_happiness)
-                    log.msg("server selection unsuccessful for %r: %s (%s), merged=%r"
-                            % (self, msg, self._get_progress_message(), merged), level=log.INFREQUENT)
+                    self.log("server selection unsuccessful for %r: %s (%s), merged=%s" % (self, msg, self._get_progress_message(), pretty_print_shnum_to_servers(merged)), level=log.INFREQUENT)
                     return self._failed("%s (%s)" % (msg, self._get_progress_message()))
 
         if self.uncontacted_peers:
@@ -398,7 +399,7 @@ class Tahoe2PeerSelector:
         elif self.contacted_peers:
             # ask a peer that we've already asked.
             if not self._started_second_pass:
-                log.msg("starting second pass", parent=self._log_parent,
+                self.log("starting second pass",
                         level=log.NOISY)
                 self._started_second_pass = True
             num_shares = mathutil.div_ceil(len(self.homeless_shares),
@@ -436,20 +437,23 @@ class Tahoe2PeerSelector:
                                 self._get_progress_message()))
                 if self.last_failure_msg:
                     msg += " (%s)" % (self.last_failure_msg,)
-                log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
+                self.log(msg, level=log.UNUSUAL)
                 return self._failed(msg)
             else:
                 # we placed enough to be happy, so we're done
                 if self._status:
                     self._status.set_status("Placed all shares")
+                msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
+                            self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
+                self.log(msg, level=log.OPERATIONAL)
                 return (self.use_peers, self.preexisting_shares)
 
     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
         if isinstance(res, failure.Failure):
             # This is unusual, and probably indicates a bug or a network
             # problem.
-            log.msg("%s got error during peer selection: %s" % (peer, res),
-                    level=log.UNUSUAL, parent=self._log_parent)
+            self.log("%s got error during peer selection: %s" % (peer, res),
+                    level=log.UNUSUAL)
             self.error_count += 1
             self.bad_query_count += 1
             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
@@ -468,10 +472,10 @@ class Tahoe2PeerSelector:
                 self.last_failure_msg = msg
         else:
             (alreadygot, allocated) = res
-            log.msg("response from peer %s: alreadygot=%s, allocated=%s"
+            self.log("response to allocate_buckets() from peer %s: alreadygot=%s, allocated=%s"
                     % (idlib.shortnodeid_b2a(peer.peerid),
                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
-                    level=log.NOISY, parent=self._log_parent)
+                    level=log.NOISY)
             progress = False
             for s in alreadygot:
                 self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
@@ -914,7 +918,7 @@ class CHKUploader:
         @paran already_peers: a dict mapping sharenum to a set of peerids
                               that claim to already have this share
         """
-        self.log("_send_shares, upload_servers is %s" % (upload_servers,))
+        self.log("set_shareholders; upload_servers is %s, already_peers is %s" % ([', '.join(["%s: %s" % (k, idlib.shortnodeid_b2a(v._nodeid),) for k,v in p.buckets.iteritems()]) for p in upload_servers], already_peers))
         # record already-present shares in self._results
         self._results.preexisting_shares = len(already_peers)
 
@@ -928,6 +932,7 @@ class CHKUploader:
             for shnum in peer.buckets:
                 self._peer_trackers[shnum] = peer
                 servermap.setdefault(shnum, set()).add(peer.peerid)
+        self.log("set_shareholders; %s (%s) == %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers]))
         assert len(buckets) == sum([len(peer.buckets) for peer in upload_servers]), "%s (%s) != %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers])
         encoder.set_shareholders(buckets, servermap)
 
index 22ec1621ca6d582a1b135d8bf00b435eb7d33da2..1af9c8901d1a940b18b734b77d895beebd779b3c 100644 (file)
@@ -5,7 +5,7 @@ from twisted.application import service
 
 from zope.interface import implements
 from allmydata.interfaces import RIStorageServer, IStatsProducer
-from allmydata.util import fileutil, log, time_format
+from allmydata.util import fileutil, idlib, log, time_format
 import allmydata # for __full_version__
 
 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
@@ -106,6 +106,9 @@ class StorageServer(service.MultiService, Referenceable):
                                    expiration_sharetypes)
         self.lease_checker.setServiceParent(self)
 
+    def __repr__(self):
+        return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
+
     def add_bucket_counter(self):
         statefile = os.path.join(self.storedir, "bucket_counter.state")
         self.bucket_counter = BucketCountingCrawler(self, statefile)
index ab5ffae539055ca53c949cba4520e6eff3a435fe..0cf9495f409f2059f3fa4f2db6aefe060cba68d4 100644 (file)
@@ -11,6 +11,7 @@ import allmydata # for __full_version__
 from allmydata import uri, monitor, client
 from allmydata.immutable import upload, encode
 from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
+from allmydata.util import log
 from allmydata.util.assertutil import precondition
 from allmydata.util.deferredutil import DeferredListShouldSucceed
 from allmydata.test.no_network import GridTestMixin
@@ -710,6 +711,7 @@ def combinations(iterable, r):
 
 def is_happy_enough(servertoshnums, h, k):
     """ I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """
+    print "servertoshnums: ", servertoshnums, "h: ", h, "k: ", k
     if len(servertoshnums) < h:
         return False
     # print "servertoshnums: ", servertoshnums, h, k
@@ -798,9 +800,9 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
     def _add_server(self, server_number, readonly=False):
         assert self.g, "I tried to find a grid at self.g, but failed"
         ss = self.g.make_server(server_number, readonly)
+        log.msg("just created a server, number: %s => %s" % (server_number, ss,))
         self.g.add_server(server_number, ss)
 
-
     def _add_server_with_share(self, server_number, share_number=None,
                                readonly=False):
         self._add_server(server_number, readonly)
@@ -861,7 +863,6 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
         d.addCallback(_store_shares)
         return d
 
-
     def test_configure_parameters(self):
         self.basedir = self.mktemp()
         hooks = {0: self._set_up_nodes_extra_config}