]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
more hierarchical logging: download/upload/encode
authorBrian Warner <warner@allmydata.com>
Tue, 20 Nov 2007 02:33:41 +0000 (19:33 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 20 Nov 2007 02:33:41 +0000 (19:33 -0700)
src/allmydata/download.py
src/allmydata/encode.py
src/allmydata/test/test_upload.py
src/allmydata/upload.py

index 962473a75cf49dd59d375dea02460706e1fca8b3..888346672393eb28d1547f7fb75122beb7ecc04b 100644 (file)
@@ -219,17 +219,27 @@ class BlockDownloader:
         self.vbucket = vbucket
         self.blocknum = blocknum
         self.parent = parent
+        self._log_number = self.parent.log("starting block %d" % blocknum)
+
+    def log(self, msg, parent=None):
+        if parent is None:
+            parent = self._log_number
+        return self.parent.log(msg, parent=parent)
 
     def start(self, segnum):
+        lognum = self.log("get_block(segnum=%d)" % segnum)
         d = self.vbucket.get_block(segnum)
-        d.addCallbacks(self._hold_block, self._got_block_error)
+        d.addCallbacks(self._hold_block, self._got_block_error,
+                       callbackArgs=(lognum,), errbackArgs=(lognum,))
         return d
 
-    def _hold_block(self, data):
+    def _hold_block(self, data, lognum):
+        self.log("got block", parent=lognum)
         self.parent.hold_block(self.blocknum, data)
 
-    def _got_block_error(self, f):
-        log.msg("BlockDownloader[%d] got error: %s" % (self.blocknum, f))
+    def _got_block_error(self, f, lognum):
+        self.log("BlockDownloader[%d] got error: %s" % (self.blocknum, f),
+                 parent=lognum)
         self.parent.bucket_failed(self.vbucket)
 
 class SegmentDownloader:
@@ -244,6 +254,13 @@ class SegmentDownloader:
         self.segmentnumber = segmentnumber
         self.needed_blocks = needed_shares
         self.blocks = {} # k: blocknum, v: data
+        self._log_number = self.parent.log("starting segment %d" %
+                                           segmentnumber)
+
+    def log(self, msg, parent=None):
+        if parent is None:
+            parent = self._log_number
+        return self.parent.log(msg, parent=parent)
 
     def start(self):
         return self._download()
index 90ace0a5d34b098688feb6567cfd726079324d98..4544cd7e124441518c4ca0deb2d1f6691ecec0b8 100644 (file)
@@ -2,7 +2,6 @@
 
 from zope.interface import implements
 from twisted.internet import defer
-from twisted.python import log
 from foolscap import eventual
 from allmydata import uri
 from allmydata.hashtree import HashTree
@@ -76,7 +75,7 @@ class Encoder(object):
     TOTAL_SHARES = 10
     MAX_SEGMENT_SIZE = 1*MiB
 
-    def __init__(self, options={}):
+    def __init__(self, options={}, parent=None):
         object.__init__(self)
         self.MAX_SEGMENT_SIZE = options.get("max_segment_size",
                                             self.MAX_SEGMENT_SIZE)
@@ -89,12 +88,22 @@ class Encoder(object):
         self.TOTAL_SHARES = n
         self.uri_extension_data = {}
         self._codec = None
+        self._parent = parent
+        if self._parent:
+            self._log_number = self._parent.log("starting Encoder %s" % self)
 
     def __repr__(self):
         if hasattr(self, "_storage_index"):
             return "<Encoder for %s>" % idlib.b2a(self._storage_index)[:6]
         return "<Encoder for unknown storage index>"
 
+    def log(self, msg, parent=None):
+        if not self._parent:
+            return
+        if parent is None:
+            parent = self._log_number
+        return self._parent.log(msg, parent=parent)
+
     def set_size(self, size):
         assert not self._codec
         self.file_size = size
@@ -105,6 +114,7 @@ class Encoder(object):
         self.NEEDED_SHARES = k
         self.SHARES_OF_HAPPINESS = d
         self.TOTAL_SHARES = n
+        self.log("set_params: %d,%d,%d" % (k, d, n))
 
     def _setup_codec(self):
         self.num_shares = self.TOTAL_SHARES
@@ -205,6 +215,7 @@ class Encoder(object):
         self.landlords = landlords.copy()
 
     def start(self):
+        self.log("starting")
         #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
         if not self._codec:
             self._setup_codec()
@@ -372,10 +383,11 @@ class Encoder(object):
         _assert(set(self.landlords.keys()).issubset(set(shareids)),
                 shareids=shareids, landlords=self.landlords)
         dl = []
+        lognum = self.log("send_segment(%d)" % segnum)
         for i in range(len(shares)):
             subshare = shares[i]
             shareid = shareids[i]
-            d = self.send_subshare(shareid, segnum, subshare)
+            d = self.send_subshare(shareid, segnum, subshare, lognum)
             dl.append(d)
             subshare_hash = hashutil.block_hash(subshare)
             #from allmydata.util import idlib
@@ -387,39 +399,46 @@ class Encoder(object):
 
         dl = self._gather_responses(dl)
         def _logit(res):
-            log.msg("%s uploaded %s / %s bytes (%d%%) of your file." %
-                    (self,
-                     self.segment_size*(segnum+1),
-                     self.segment_size*self.num_segments,
-                     100 * (segnum+1) / self.num_segments,
-                     ))
+            self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
+                     (self,
+                      self.segment_size*(segnum+1),
+                      self.segment_size*self.num_segments,
+                      100 * (segnum+1) / self.num_segments,
+                      ))
             return res
         dl.addCallback(_logit)
         return dl
 
-    def send_subshare(self, shareid, segment_num, subshare):
+    def send_subshare(self, shareid, segment_num, subshare, lognum):
         if shareid not in self.landlords:
             return defer.succeed(None)
         sh = self.landlords[shareid]
+        lognum2 = self.log("put_block to %s" % self.landlords[shareid],
+                           parent=lognum)
         d = sh.put_block(segment_num, subshare)
+        def _done(res):
+            self.log("put_block done", parent=lognum2)
+            return res
+        d.addCallback(_done)
         d.addErrback(self._remove_shareholder, shareid,
                      "segnum=%d" % segment_num)
         return d
 
     def _remove_shareholder(self, why, shareid, where):
-        log.msg("error while sending %s to shareholder=%d: %s" %
-                (where, shareid, why)) # UNUSUAL
+        ln = self.log("UNUSUAL: error while sending %s to shareholder=%d: %s" %
+                      (where, shareid, why))
         if shareid in self.landlords:
             del self.landlords[shareid]
         else:
             # even more UNUSUAL
-            log.msg(" weird, they weren't in our list of landlords")
+            self.log("WEIRD: they weren't in our list of landlords", parent=ln)
         if len(self.landlords) < self.shares_of_happiness:
             msg = "lost too many shareholders during upload: %s" % why
             raise NotEnoughPeersError(msg)
-        log.msg("but we can still continue with %s shares, we'll be happy "
-                "with at least %s" % (len(self.landlords),
-                                      self.shares_of_happiness))
+        self.log("but we can still continue with %s shares, we'll be happy "
+                 "with at least %s" % (len(self.landlords),
+                                       self.shares_of_happiness),
+                 parent=ln)
 
     def _gather_responses(self, dl):
         d = defer.DeferredList(dl, fireOnOneErrback=True)
@@ -452,7 +471,7 @@ class Encoder(object):
         return d
 
     def send_plaintext_hash_tree_to_all_shareholders(self):
-        log.msg("%s sending plaintext hash tree" % self)
+        self.log("sending plaintext hash tree")
         dl = []
         for shareid in self.landlords.keys():
             d = self.send_plaintext_hash_tree(shareid,
@@ -469,7 +488,7 @@ class Encoder(object):
         return d
 
     def send_crypttext_hash_tree_to_all_shareholders(self):
-        log.msg("%s sending crypttext hash tree" % self)
+        self.log("sending crypttext hash tree")
         t = HashTree(self._crypttext_hashes)
         all_hashes = list(t)
         self.uri_extension_data["crypttext_root_hash"] = t[0]
@@ -487,7 +506,7 @@ class Encoder(object):
         return d
 
     def send_all_subshare_hash_trees(self):
-        log.msg("%s sending subshare hash trees" % self)
+        self.log("sending subshare hash trees")
         dl = []
         for shareid,hashes in enumerate(self.subshare_hashes):
             # hashes is a list of the hashes of all subshares that were sent
@@ -514,7 +533,7 @@ class Encoder(object):
         # validate their share. This includes the share hash itself, but does
         # not include the top-level hash root (which is stored securely in
         # the URI instead).
-        log.msg("%s sending all share hash trees" % self)
+        self.log("sending all share hash trees")
         dl = []
         for h in self.share_root_hashes:
             assert h
@@ -540,7 +559,7 @@ class Encoder(object):
         return d
 
     def send_uri_extension_to_all_shareholders(self):
-        log.msg("%s: sending uri_extension" % self)
+        self.log("sending uri_extension")
         for k in ('crypttext_root_hash', 'crypttext_hash',
                   'plaintext_root_hash', 'plaintext_hash',
                   ):
@@ -559,7 +578,7 @@ class Encoder(object):
         return d
 
     def close_all_shareholders(self):
-        log.msg("%s: closing shareholders" % self)
+        self.log("closing shareholders")
         dl = []
         for shareid in self.landlords:
             d = self.landlords[shareid].close()
@@ -568,12 +587,12 @@ class Encoder(object):
         return self._gather_responses(dl)
 
     def done(self):
-        log.msg("%s: upload done" % self)
+        self.log("upload done")
         return (self.uri_extension_hash, self.required_shares,
                 self.num_shares, self.file_size)
 
     def err(self, f):
-        log.msg("%s: upload failed: %s" % (self, f)) # UNUSUAL
+        self.log("UNUSUAL: %s: upload failed: %s" % (self, f))
         if f.check(defer.FirstError):
             return f.value.subFailure
         return f
index b7d112626a48bc9bc8ac2bf8954e4b583cefdfc6..f4318565e313b46dc500771a9bd3b480f48b4e05 100644 (file)
@@ -139,7 +139,6 @@ class FakeClient:
     def __init__(self, mode="good", num_servers=50):
         self.mode = mode
         self.num_servers = num_servers
-        self.introducer_client = FakeIntroducerClient()
     def get_permuted_peers(self, storage_index, include_myself):
         peers = [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),)
                   for fakeid in range(self.num_servers) ]
index b300f95cd2671724737316667df2404aaa33dd12..496568c7e11a807120996b8f887cf948fd9b10b5 100644 (file)
@@ -420,10 +420,16 @@ class CHKUploader:
         self._client = client
         self._wait_for_numpeers = wait_for_numpeers
         self._options = options
+        self._log_number = self._client.log("CHKUploader starting")
 
     def set_params(self, encoding_parameters):
         self._encoding_parameters = encoding_parameters
 
+    def log(self, msg, parent=None):
+        if parent is None:
+            parent = self._log_number
+        return self._client.log(msg, parent=parent)
+
     def start(self, uploadable):
         """Start uploading the file.
 
@@ -431,7 +437,7 @@ class CHKUploader:
         string)."""
 
         uploadable = IUploadable(uploadable)
-        log.msg("starting upload of %s" % uploadable)
+        self.log("starting upload of %s" % uploadable)
 
         eu = EncryptAnUploadable(uploadable)
         d = self.start_encrypted(eu)
@@ -445,7 +451,7 @@ class CHKUploader:
     def start_encrypted(self, encrypted):
         eu = IEncryptedUploadable(encrypted)
 
-        e = encode.Encoder(self._options)
+        e = encode.Encoder(self._options, self)
         e.set_params(self._encoding_parameters)
         d = e.set_encrypted_uploadable(eu)
         def _wait_for_peers(res):
@@ -467,6 +473,7 @@ class CHKUploader:
     def locate_all_shareholders(self, encoder):
         storage_index = encoder.get_param("storage_index")
         upload_id = idlib.b2a(storage_index)[:6]
+        self.log("using storage index %s" % upload_id)
         peer_selector = self.peer_selector_class(upload_id)
 
         share_size = encoder.get_param("share_size")
@@ -484,7 +491,7 @@ class CHKUploader:
         """
         @param used_peers: a sequence of PeerTracker objects
         """
-        log.msg("_send_shares, used_peers is %s" % (used_peers,))
+        self.log("_send_shares, used_peers is %s" % (used_peers,))
         for peer in used_peers:
             assert isinstance(peer, PeerTracker)
         buckets = {}