offloaded: improve logging across the board
authorBrian Warner <warner@lothar.com>
Thu, 17 Jan 2008 08:11:35 +0000 (01:11 -0700)
committerBrian Warner <warner@lothar.com>
Thu, 17 Jan 2008 08:11:35 +0000 (01:11 -0700)
src/allmydata/download.py
src/allmydata/encode.py
src/allmydata/offloaded.py
src/allmydata/upload.py

index a0a0367ec3f15278d5e2ed4ded7997eeb4820a1e..28810a5fb2c36aa217d248968654318c89bb18c5 100644 (file)
@@ -1,13 +1,12 @@
 
 import os, random
 from zope.interface import implements
-from twisted.python import log
 from twisted.internet import defer
 from twisted.internet.interfaces import IPushProducer, IConsumer
 from twisted.application import service
 from foolscap.eventual import eventually
 
-from allmydata.util import idlib, mathutil, hashutil
+from allmydata.util import idlib, mathutil, hashutil, log
 from allmydata.util.assertutil import _assert
 from allmydata import codec, hashtree, storage, uri
 from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI
@@ -29,7 +28,7 @@ class DownloadStopped(Exception):
     pass
 
 class Output:
-    def __init__(self, downloadable, key, total_length):
+    def __init__(self, downloadable, key, total_length, log_parent):
         self.downloadable = downloadable
         self._decryptor = AES(key)
         self._crypttext_hasher = hashutil.crypttext_hasher()
@@ -40,6 +39,14 @@ class Output:
         self._plaintext_hash_tree = None
         self._crypttext_hash_tree = None
         self._opened = False
+        self._log_parent = log_parent
+
+    def log(self, *args, **kwargs):
+        if "parent" not in kwargs:
+            kwargs["parent"] = self._log_parent
+        if "facility" not in kwargs:
+            kwargs["facility"] = "download.output"
+        return log.msg(*args, **kwargs)
 
     def setup_hashtrees(self, plaintext_hashtree, crypttext_hashtree):
         self._plaintext_hash_tree = plaintext_hashtree
@@ -56,6 +63,10 @@ class Output:
             ch = hashutil.crypttext_segment_hasher()
             ch.update(crypttext)
             crypttext_leaves = {self._segment_number: ch.digest()}
+            self.log(format="crypttext leaf hash (%(bytes)sB) [%(segnum)d] is %(hash)s",
+                     bytes=len(crypttext),
+                     segnum=self._segment_number, hash=idlib.b2a(ch.digest()),
+                     level=log.NOISY)
             self._crypttext_hash_tree.set_hashes(leaves=crypttext_leaves)
 
         plaintext = self._decryptor.process(crypttext)
@@ -68,6 +79,10 @@ class Output:
             ph = hashutil.plaintext_segment_hasher()
             ph.update(plaintext)
             plaintext_leaves = {self._segment_number: ph.digest()}
+            self.log(format="plaintext leaf hash (%(bytes)sB) [%(segnum)d] is %(hash)s",
+                     bytes=len(plaintext),
+                     segnum=self._segment_number, hash=idlib.b2a(ph.digest()),
+                     level=log.NOISY)
             self._plaintext_hash_tree.set_hashes(leaves=plaintext_leaves)
 
         self._segment_number += 1
@@ -79,12 +94,14 @@ class Output:
         self.downloadable.write(plaintext)
 
     def fail(self, why):
-        log.msg("UNUSUAL: download failed: %s" % why)
+        # this is really unusual, and deserves maximum forensics
+        self.log("download failed!", failure=why, level=log.SCARY)
         self.downloadable.fail(why)
 
     def close(self):
         self.crypttext_hash = self._crypttext_hasher.digest()
         self.plaintext_hash = self._plaintext_hasher.digest()
+        self.log("download finished, closing IDownloadable", level=log.NOISY)
         self.downloadable.close()
 
     def finish(self):
@@ -322,7 +339,7 @@ class FileDownloader:
         if IConsumer.providedBy(downloadable):
             downloadable.registerProducer(self, True)
         self._downloadable = downloadable
-        self._output = Output(downloadable, u.key, self._size)
+        self._output = Output(downloadable, u.key, self._size, self._log_number)
         self._paused = False
         self._stopped = False
 
@@ -342,15 +359,16 @@ class FileDownloader:
 
     def init_logging(self):
         self._log_prefix = prefix = idlib.b2a(self._storage_index)[:6]
-        num = self._client.log("FileDownloader(%s): starting" % prefix)
+        num = self._client.log(format="FileDownloader(%(si)s): starting",
+                               si=idlib.b2a(self._storage_index))
         self._log_number = num
 
-    def log(self, msg, parent=None):
-        if parent is None:
-            parent = self._log_number
-        return self._client.log("FileDownloader(%s): %s" % (self._log_prefix,
-                                                            msg),
-                                parent=parent)
+    def log(self, *args, **kwargs):
+        if "parent" not in kwargs:
+            kwargs["parent"] = self._log_number
+        if "facility" not in kwargs:
+            kwargs["facility"] = "tahoe.download"
+        return log.msg(*args, **kwargs)
 
     def pauseProducing(self):
         if self._paused:
index b47af0372b0ee9405a6b1f05059a8de82e4d1f16..06c8e1668d559254fa75f9e3205f779b23bd19b3 100644 (file)
@@ -6,7 +6,7 @@ from foolscap import eventual
 from allmydata import uri
 from allmydata.hashtree import HashTree
 from allmydata.util import mathutil, hashutil, idlib, log
-from allmydata.util.assertutil import _assert
+from allmydata.util.assertutil import _assert, precondition
 from allmydata.codec import CRSEncoder
 from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
      IEncryptedUploadable
@@ -73,13 +73,14 @@ PiB=1024*TiB
 class Encoder(object):
     implements(IEncoder)
 
-    def __init__(self, parent=None):
+    def __init__(self, log_parent=None):
         object.__init__(self)
         self.uri_extension_data = {}
         self._codec = None
-        self._parent = parent
-        if self._parent:
-            self._log_number = self._parent.log("creating Encoder %s" % self)
+        precondition(log_parent is None or isinstance(log_parent, int),
+                     log_parent)
+        self._log_number = log.msg("creating Encoder %s" % self,
+                                   facility="tahoe.encoder", parent=log_parent)
         self._aborted = False
 
     def __repr__(self):
@@ -88,16 +89,17 @@ class Encoder(object):
         return "<Encoder for unknown storage index>"
 
     def log(self, *args, **kwargs):
-        if not self._parent:
-            return
         if "parent" not in kwargs:
             kwargs["parent"] = self._log_number
-        return self._parent.log(*args, **kwargs)
+        if "facility" not in kwargs:
+            kwargs["facility"] = "tahoe.encoder"
+        return log.msg(*args, **kwargs)
 
     def set_encrypted_uploadable(self, uploadable):
         eu = self._uploadable = IEncryptedUploadable(uploadable)
         d = eu.get_size()
         def _got_size(size):
+            self.log(format="file size: %(size)d", size=size)
             self.file_size = size
         d.addCallback(_got_size)
         d.addCallback(lambda res: eu.get_all_encoding_parameters())
@@ -193,8 +195,7 @@ class Encoder(object):
         self.landlords = landlords.copy()
 
     def start(self):
-        if self._parent:
-            self._log_number = self._parent.log("%s starting" % (self,))
+        self.log("%s starting" % (self,))
         #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
         assert self._codec
         self._crypttext_hasher = hashutil.crypttext_hasher()
@@ -455,6 +456,8 @@ class Encoder(object):
                                                    self.num_segments)
         d.addCallback(_got)
         def _got_hashtree_leaves(leaves):
+            self.log("Encoder: got plaintext_hashtree_leaves: %s" %
+                     (",".join([idlib.b2a(h) for h in leaves]),))
             ht = list(HashTree(list(leaves)))
             self.uri_extension_data["plaintext_root_hash"] = ht[0]
             self._plaintext_hashtree_nodes = ht
index 30a340ea0a5fffc3821953865c888b3436a8e631..1412000c7b162d4931a6954772229f6896c5af92 100644 (file)
@@ -32,7 +32,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
                          parent=log_number)
 
         self._client = helper.parent
-        self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file)
+        self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
+                                             self._log_number)
         self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
         self._finished_observers = observer.OneShotObserverList()
 
@@ -102,16 +103,18 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
 
 class AskUntilSuccessMixin:
     # create me with a _reader array
+    _last_failure = None
 
     def add_reader(self, reader):
         self._readers.append(reader)
 
     def call(self, *args, **kwargs):
         if not self._readers:
-            raise NotEnoughWritersError("ran out of assisted uploaders")
+            raise NotEnoughWritersError("ran out of assisted uploaders, last failure was %s" % self._last_failure)
         rr = self._readers[0]
         d = rr.callRemote(*args, **kwargs)
         def _err(f):
+            self._last_failure = f
             if rr in self._readers:
                 self._readers.remove(rr)
             self._upload_helper.log("call to assisted uploader %s failed" % rr,
@@ -135,15 +138,23 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
     the ciphertext to 'encoded_file'.
     """
 
-    def __init__(self, helper, incoming_file, encoded_file):
+    def __init__(self, helper, incoming_file, encoded_file, logparent):
         self._upload_helper = helper
         self._incoming_file = incoming_file
         self._encoding_file = encoded_file
+        self._log_parent = logparent
         self._done_observers = observer.OneShotObserverList()
         self._readers = []
         self._started = False
         self._f = None
 
+    def log(self, *args, **kwargs):
+        if "facility" not in kwargs:
+            kwargs["facility"] = "tahoe.helper.chkupload.fetch"
+        if "parent" not in kwargs:
+            kwargs["parent"] = self._log_parent
+        return log.msg(*args, **kwargs)
+
     def add_reader(self, reader):
         AskUntilSuccessMixin.add_reader(self, reader)
         self._start()
@@ -161,12 +172,14 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
         d.addErrback(self._failed)
 
     def _got_size(self, size):
+        self.log("total size is %d bytes" % size, level=log.NOISY)
         self._expected_size = size
 
     def _start_reading(self, res):
         # then find out how much crypttext we have on disk
         if os.path.exists(self._incoming_file):
             self._have = os.stat(self._incoming_file)[stat.ST_SIZE]
+            self.log("we already have %d bytes" % self._have, level=log.NOISY)
         else:
             self._have = 0
         self._f = open(self._incoming_file, "wb")
@@ -200,10 +213,12 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
         d = defer.maybeDeferred(self._fetch)
         def _done(finished):
             if finished:
+                self.log("finished reading ciphertext", level=log.NOISY)
                 fire_when_done.callback(None)
             else:
                 self._loop(fire_when_done)
         def _err(f):
+            self.log("ciphertext read failed", failure=f, level=log.UNUSUAL)
             fire_when_done.errback(f)
         d.addCallbacks(_done, _err)
         return None
@@ -213,6 +228,8 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
         fetch_size = min(needed, self.CHUNK_SIZE)
         if fetch_size == 0:
             return True # all done
+        self.log("fetching %d-%d" % (self._have, self._have+fetch_size),
+                 level=log.NOISY)
         d = self.call("read_encrypted", self._have, fetch_size)
         def _got_data(ciphertext_v):
             for data in ciphertext_v:
@@ -241,6 +258,9 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
         self._f.close()
         self._f = None
         self._readers = []
+        self.log(format="done fetching ciphertext, size=%(size)d",
+                 size=os.stat(self._incoming_file)[stat.ST_SIZE],
+                 level=log.NOISY)
         os.rename(self._incoming_file, self._encoding_file)
         self._done_observers.fire(None)
 
index 1d339dfd85540b32e3eb05fe6e9ed6160ec3725c..e156b4a3ae9ea7ac67154475c485f8a90d44fb0c 100644 (file)
@@ -340,6 +340,11 @@ class EncryptAnUploadable:
         self._encoding_parameters = None
         self._file_size = None
 
+    def log(self, *args, **kwargs):
+        if "facility" not in kwargs:
+            kwargs["facility"] = "upload.encryption"
+        return log.msg(*args, **kwargs)
+
     def get_size(self):
         if self._file_size is not None:
             return defer.succeed(self._file_size)
@@ -381,6 +386,8 @@ class EncryptAnUploadable:
             segsize = mathutil.next_multiple(segsize, k)
             self._segment_size = segsize # used by segment hashers
             self._encoding_parameters = (k, happy, n, segsize)
+            self.log("my encoding parameters: %s" %
+                     (self._encoding_parameters,), level=log.NOISY)
             return self._encoding_parameters
         d.addCallback(_got_pieces)
         return d
@@ -433,6 +440,14 @@ class EncryptAnUploadable:
                 # we've filled this segment
                 self._plaintext_segment_hashes.append(p.digest())
                 self._plaintext_segment_hasher = None
+                self.log("closed hash [%d]: %dB" %
+                         (len(self._plaintext_segment_hashes)-1,
+                          self._plaintext_segment_hashed_bytes),
+                         level=log.NOISY)
+                self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
+                         segnum=len(self._plaintext_segment_hashes)-1,
+                         hash=idlib.b2a(p.digest()),
+                         level=log.NOISY)
 
             offset += this_segment
 
@@ -452,6 +467,8 @@ class EncryptAnUploadable:
             # memory: each chunk is destroyed as soon as we're done with it.
             while data:
                 chunk = data.pop(0)
+                log.msg(" read_encrypted handling %dB-sized chunk" % len(chunk),
+                        level=log.NOISY)
                 self._plaintext_hasher.update(chunk)
                 self._update_segment_hash(chunk)
                 cryptdata.append(self._encryptor.process(chunk))
@@ -467,6 +484,13 @@ class EncryptAnUploadable:
             p, segment_left = self._get_segment_hasher()
             self._plaintext_segment_hashes.append(p.digest())
             del self._plaintext_segment_hasher
+            self.log("closing plaintext leaf hasher, hashed %d bytes" %
+                     self._plaintext_segment_hashed_bytes,
+                     level=log.NOISY)
+            self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
+                     segnum=len(self._plaintext_segment_hashes)-1,
+                     hash=idlib.b2a(p.digest()),
+                     level=log.NOISY)
         assert len(self._plaintext_segment_hashes) == num_segments
         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
 
@@ -522,7 +546,7 @@ class CHKUploader:
     def start_encrypted(self, encrypted):
         eu = IEncryptedUploadable(encrypted)
 
-        self._encoder = e = encode.Encoder(self)
+        self._encoder = e = encode.Encoder(self._log_number)
         d = e.set_encrypted_uploadable(eu)
         d.addCallback(self.locate_all_shareholders)
         d.addCallback(self.set_shareholders, e)
@@ -637,6 +661,9 @@ class RemoteEncryptedUploadable(Referenceable):
         d.addCallback(_read)
         return d
     def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
+        log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
+                (first, last-1, num_segments),
+                level=log.NOISY)
         d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
         d.addCallback(list)
         return d