From: Brian Warner Date: Thu, 17 Jan 2008 08:11:35 +0000 (-0700) Subject: offloaded: improve logging across the board X-Git-Tag: allmydata-tahoe-0.8.0~302 X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/uri/frontends/%22news.html/?a=commitdiff_plain;h=c597e67c2b859c4fd0419bf4e6c58ad7b0c8c311;p=tahoe-lafs%2Ftahoe-lafs.git offloaded: improve logging across the board --- diff --git a/src/allmydata/download.py b/src/allmydata/download.py index a0a0367e..28810a5f 100644 --- a/src/allmydata/download.py +++ b/src/allmydata/download.py @@ -1,13 +1,12 @@ import os, random from zope.interface import implements -from twisted.python import log from twisted.internet import defer from twisted.internet.interfaces import IPushProducer, IConsumer from twisted.application import service from foolscap.eventual import eventually -from allmydata.util import idlib, mathutil, hashutil +from allmydata.util import idlib, mathutil, hashutil, log from allmydata.util.assertutil import _assert from allmydata import codec, hashtree, storage, uri from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI @@ -29,7 +28,7 @@ class DownloadStopped(Exception): pass class Output: - def __init__(self, downloadable, key, total_length): + def __init__(self, downloadable, key, total_length, log_parent): self.downloadable = downloadable self._decryptor = AES(key) self._crypttext_hasher = hashutil.crypttext_hasher() @@ -40,6 +39,14 @@ class Output: self._plaintext_hash_tree = None self._crypttext_hash_tree = None self._opened = False + self._log_parent = log_parent + + def log(self, *args, **kwargs): + if "parent" not in kwargs: + kwargs["parent"] = self._log_parent + if "facility" not in kwargs: + kwargs["facility"] = "download.output" + return log.msg(*args, **kwargs) def setup_hashtrees(self, plaintext_hashtree, crypttext_hashtree): self._plaintext_hash_tree = plaintext_hashtree @@ -56,6 +63,10 @@ class Output: ch = hashutil.crypttext_segment_hasher() ch.update(crypttext) crypttext_leaves = {self._segment_number: ch.digest()} + self.log(format="crypttext leaf hash (%(bytes)sB) [%(segnum)d] is %(hash)s", + bytes=len(crypttext), + segnum=self._segment_number, hash=idlib.b2a(ch.digest()), + level=log.NOISY) self._crypttext_hash_tree.set_hashes(leaves=crypttext_leaves) plaintext = self._decryptor.process(crypttext) @@ -68,6 +79,10 @@ class Output: ph = hashutil.plaintext_segment_hasher() ph.update(plaintext) plaintext_leaves = {self._segment_number: ph.digest()} + self.log(format="plaintext leaf hash (%(bytes)sB) [%(segnum)d] is %(hash)s", + bytes=len(plaintext), + segnum=self._segment_number, hash=idlib.b2a(ph.digest()), + level=log.NOISY) self._plaintext_hash_tree.set_hashes(leaves=plaintext_leaves) self._segment_number += 1 @@ -79,12 +94,14 @@ class Output: self.downloadable.write(plaintext) def fail(self, why): - log.msg("UNUSUAL: download failed: %s" % why) + # this is really unusual, and deserves maximum forensics + self.log("download failed!", failure=why, level=log.SCARY) self.downloadable.fail(why) def close(self): self.crypttext_hash = self._crypttext_hasher.digest() self.plaintext_hash = self._plaintext_hasher.digest() + self.log("download finished, closing IDownloadable", level=log.NOISY) self.downloadable.close() def finish(self): @@ -322,7 +339,7 @@ class FileDownloader: if IConsumer.providedBy(downloadable): downloadable.registerProducer(self, True) self._downloadable = downloadable - self._output = Output(downloadable, u.key, self._size) + self._output = Output(downloadable, u.key, self._size, self._log_number) self._paused = False self._stopped = False @@ -342,15 +359,16 @@ class FileDownloader: def init_logging(self): self._log_prefix = prefix = idlib.b2a(self._storage_index)[:6] - num = self._client.log("FileDownloader(%s): starting" % prefix) + num = self._client.log(format="FileDownloader(%(si)s): starting", + si=idlib.b2a(self._storage_index)) self._log_number = num - def log(self, msg, parent=None): - if parent is None: - parent = self._log_number - return self._client.log("FileDownloader(%s): %s" % (self._log_prefix, - msg), - parent=parent) + def log(self, *args, **kwargs): + if "parent" not in kwargs: + kwargs["parent"] = self._log_number + if "facility" not in kwargs: + kwargs["facility"] = "tahoe.download" + return log.msg(*args, **kwargs) def pauseProducing(self): if self._paused: diff --git a/src/allmydata/encode.py b/src/allmydata/encode.py index b47af037..06c8e166 100644 --- a/src/allmydata/encode.py +++ b/src/allmydata/encode.py @@ -6,7 +6,7 @@ from foolscap import eventual from allmydata import uri from allmydata.hashtree import HashTree from allmydata.util import mathutil, hashutil, idlib, log -from allmydata.util.assertutil import _assert +from allmydata.util.assertutil import _assert, precondition from allmydata.codec import CRSEncoder from allmydata.interfaces import IEncoder, IStorageBucketWriter, \ IEncryptedUploadable @@ -73,13 +73,14 @@ PiB=1024*TiB class Encoder(object): implements(IEncoder) - def __init__(self, parent=None): + def __init__(self, log_parent=None): object.__init__(self) self.uri_extension_data = {} self._codec = None - self._parent = parent - if self._parent: - self._log_number = self._parent.log("creating Encoder %s" % self) + precondition(log_parent is None or isinstance(log_parent, int), + log_parent) + self._log_number = log.msg("creating Encoder %s" % self, + facility="tahoe.encoder", parent=log_parent) self._aborted = False def __repr__(self): @@ -88,16 +89,17 @@ class Encoder(object): return "" def log(self, *args, **kwargs): - if not self._parent: - return if "parent" not in kwargs: kwargs["parent"] = self._log_number - return self._parent.log(*args, **kwargs) + if "facility" not in kwargs: + kwargs["facility"] = "tahoe.encoder" + return log.msg(*args, **kwargs) def set_encrypted_uploadable(self, uploadable): eu = self._uploadable = IEncryptedUploadable(uploadable) d = eu.get_size() def _got_size(size): + self.log(format="file size: %(size)d", size=size) self.file_size = size d.addCallback(_got_size) d.addCallback(lambda res: eu.get_all_encoding_parameters()) @@ -193,8 +195,7 @@ class Encoder(object): self.landlords = landlords.copy() def start(self): - if self._parent: - self._log_number = self._parent.log("%s starting" % (self,)) + self.log("%s starting" % (self,)) #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares) assert self._codec self._crypttext_hasher = hashutil.crypttext_hasher() @@ -455,6 +456,8 @@ class Encoder(object): self.num_segments) d.addCallback(_got) def _got_hashtree_leaves(leaves): + self.log("Encoder: got plaintext_hashtree_leaves: %s" % + (",".join([idlib.b2a(h) for h in leaves]),)) ht = list(HashTree(list(leaves))) self.uri_extension_data["plaintext_root_hash"] = ht[0] self._plaintext_hashtree_nodes = ht diff --git a/src/allmydata/offloaded.py b/src/allmydata/offloaded.py index 30a340ea..1412000c 100644 --- a/src/allmydata/offloaded.py +++ b/src/allmydata/offloaded.py @@ -32,7 +32,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): parent=log_number) self._client = helper.parent - self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file) + self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file, + self._log_number) self._reader = LocalCiphertextReader(self, storage_index, encoding_file) self._finished_observers = observer.OneShotObserverList() @@ -102,16 +103,18 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): class AskUntilSuccessMixin: # create me with a _reader array + _last_failure = None def add_reader(self, reader): self._readers.append(reader) def call(self, *args, **kwargs): if not self._readers: - raise NotEnoughWritersError("ran out of assisted uploaders") + raise NotEnoughWritersError("ran out of assisted uploaders, last failure was %s" % self._last_failure) rr = self._readers[0] d = rr.callRemote(*args, **kwargs) def _err(f): + self._last_failure = f if rr in self._readers: self._readers.remove(rr) self._upload_helper.log("call to assisted uploader %s failed" % rr, @@ -135,15 +138,23 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin): the ciphertext to 'encoded_file'. """ - def __init__(self, helper, incoming_file, encoded_file): + def __init__(self, helper, incoming_file, encoded_file, logparent): self._upload_helper = helper self._incoming_file = incoming_file self._encoding_file = encoded_file + self._log_parent = logparent self._done_observers = observer.OneShotObserverList() self._readers = [] self._started = False self._f = None + def log(self, *args, **kwargs): + if "facility" not in kwargs: + kwargs["facility"] = "tahoe.helper.chkupload.fetch" + if "parent" not in kwargs: + kwargs["parent"] = self._log_parent + return log.msg(*args, **kwargs) + def add_reader(self, reader): AskUntilSuccessMixin.add_reader(self, reader) self._start() @@ -161,12 +172,14 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin): d.addErrback(self._failed) def _got_size(self, size): + self.log("total size is %d bytes" % size, level=log.NOISY) self._expected_size = size def _start_reading(self, res): # then find out how much crypttext we have on disk if os.path.exists(self._incoming_file): self._have = os.stat(self._incoming_file)[stat.ST_SIZE] + self.log("we already have %d bytes" % self._have, level=log.NOISY) else: self._have = 0 self._f = open(self._incoming_file, "wb") @@ -200,10 +213,12 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin): d = defer.maybeDeferred(self._fetch) def _done(finished): if finished: + self.log("finished reading ciphertext", level=log.NOISY) fire_when_done.callback(None) else: self._loop(fire_when_done) def _err(f): + self.log("ciphertext read failed", failure=f, level=log.UNUSUAL) fire_when_done.errback(f) d.addCallbacks(_done, _err) return None @@ -213,6 +228,8 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin): fetch_size = min(needed, self.CHUNK_SIZE) if fetch_size == 0: return True # all done + self.log("fetching %d-%d" % (self._have, self._have+fetch_size), + level=log.NOISY) d = self.call("read_encrypted", self._have, fetch_size) def _got_data(ciphertext_v): for data in ciphertext_v: @@ -241,6 +258,9 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin): self._f.close() self._f = None self._readers = [] + self.log(format="done fetching ciphertext, size=%(size)d", + size=os.stat(self._incoming_file)[stat.ST_SIZE], + level=log.NOISY) os.rename(self._incoming_file, self._encoding_file) self._done_observers.fire(None) diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index 1d339dfd..e156b4a3 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -340,6 +340,11 @@ class EncryptAnUploadable: self._encoding_parameters = None self._file_size = None + def log(self, *args, **kwargs): + if "facility" not in kwargs: + kwargs["facility"] = "upload.encryption" + return log.msg(*args, **kwargs) + def get_size(self): if self._file_size is not None: return defer.succeed(self._file_size) @@ -381,6 +386,8 @@ class EncryptAnUploadable: segsize = mathutil.next_multiple(segsize, k) self._segment_size = segsize # used by segment hashers self._encoding_parameters = (k, happy, n, segsize) + self.log("my encoding parameters: %s" % + (self._encoding_parameters,), level=log.NOISY) return self._encoding_parameters d.addCallback(_got_pieces) return d @@ -433,6 +440,14 @@ class EncryptAnUploadable: # we've filled this segment self._plaintext_segment_hashes.append(p.digest()) self._plaintext_segment_hasher = None + self.log("closed hash [%d]: %dB" % + (len(self._plaintext_segment_hashes)-1, + self._plaintext_segment_hashed_bytes), + level=log.NOISY) + self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s", + segnum=len(self._plaintext_segment_hashes)-1, + hash=idlib.b2a(p.digest()), + level=log.NOISY) offset += this_segment @@ -452,6 +467,8 @@ class EncryptAnUploadable: # memory: each chunk is destroyed as soon as we're done with it. while data: chunk = data.pop(0) + log.msg(" read_encrypted handling %dB-sized chunk" % len(chunk), + level=log.NOISY) self._plaintext_hasher.update(chunk) self._update_segment_hash(chunk) cryptdata.append(self._encryptor.process(chunk)) @@ -467,6 +484,13 @@ class EncryptAnUploadable: p, segment_left = self._get_segment_hasher() self._plaintext_segment_hashes.append(p.digest()) del self._plaintext_segment_hasher + self.log("closing plaintext leaf hasher, hashed %d bytes" % + self._plaintext_segment_hashed_bytes, + level=log.NOISY) + self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s", + segnum=len(self._plaintext_segment_hashes)-1, + hash=idlib.b2a(p.digest()), + level=log.NOISY) assert len(self._plaintext_segment_hashes) == num_segments return defer.succeed(tuple(self._plaintext_segment_hashes[first:last])) @@ -522,7 +546,7 @@ class CHKUploader: def start_encrypted(self, encrypted): eu = IEncryptedUploadable(encrypted) - self._encoder = e = encode.Encoder(self) + self._encoder = e = encode.Encoder(self._log_number) d = e.set_encrypted_uploadable(eu) d.addCallback(self.locate_all_shareholders) d.addCallback(self.set_shareholders, e) @@ -637,6 +661,9 @@ class RemoteEncryptedUploadable(Referenceable): d.addCallback(_read) return d def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments): + log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" % + (first, last-1, num_segments), + level=log.NOISY) d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments) d.addCallback(list) return d