self.vbucket = vbucket
self.blocknum = blocknum
self.parent = parent
+ self._log_number = self.parent.log("starting block %d" % blocknum)
+
+ def log(self, msg, parent=None):
+ if parent is None:
+ parent = self._log_number
+ return self.parent.log(msg, parent=parent)
def start(self, segnum):
+ lognum = self.log("get_block(segnum=%d)" % segnum)
d = self.vbucket.get_block(segnum)
- d.addCallbacks(self._hold_block, self._got_block_error)
+ d.addCallbacks(self._hold_block, self._got_block_error,
+ callbackArgs=(lognum,), errbackArgs=(lognum,))
return d
- def _hold_block(self, data):
+ def _hold_block(self, data, lognum):
+ self.log("got block", parent=lognum)
self.parent.hold_block(self.blocknum, data)
- def _got_block_error(self, f):
- log.msg("BlockDownloader[%d] got error: %s" % (self.blocknum, f))
+ def _got_block_error(self, f, lognum):
+ self.log("BlockDownloader[%d] got error: %s" % (self.blocknum, f),
+ parent=lognum)
self.parent.bucket_failed(self.vbucket)
class SegmentDownloader:
self.segmentnumber = segmentnumber
self.needed_blocks = needed_shares
self.blocks = {} # k: blocknum, v: data
+ self._log_number = self.parent.log("starting segment %d" %
+ segmentnumber)
+
+ def log(self, msg, parent=None):
+ if parent is None:
+ parent = self._log_number
+ return self.parent.log(msg, parent=parent)
def start(self):
return self._download()
from zope.interface import implements
from twisted.internet import defer
-from twisted.python import log
from foolscap import eventual
from allmydata import uri
from allmydata.hashtree import HashTree
TOTAL_SHARES = 10
MAX_SEGMENT_SIZE = 1*MiB
- def __init__(self, options={}):
+ def __init__(self, options={}, parent=None):
object.__init__(self)
self.MAX_SEGMENT_SIZE = options.get("max_segment_size",
self.MAX_SEGMENT_SIZE)
self.TOTAL_SHARES = n
self.uri_extension_data = {}
self._codec = None
+ self._parent = parent
+ if self._parent:
+ self._log_number = self._parent.log("starting Encoder %s" % self)
def __repr__(self):
if hasattr(self, "_storage_index"):
return "<Encoder for %s>" % idlib.b2a(self._storage_index)[:6]
return "<Encoder for unknown storage index>"
+ def log(self, msg, parent=None):
+ if not self._parent:
+ return
+ if parent is None:
+ parent = self._log_number
+ return self._parent.log(msg, parent=parent)
+
def set_size(self, size):
assert not self._codec
self.file_size = size
self.NEEDED_SHARES = k
self.SHARES_OF_HAPPINESS = d
self.TOTAL_SHARES = n
+ self.log("set_params: %d,%d,%d" % (k, d, n))
def _setup_codec(self):
self.num_shares = self.TOTAL_SHARES
self.landlords = landlords.copy()
def start(self):
+ self.log("starting")
#paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
if not self._codec:
self._setup_codec()
_assert(set(self.landlords.keys()).issubset(set(shareids)),
shareids=shareids, landlords=self.landlords)
dl = []
+ lognum = self.log("send_segment(%d)" % segnum)
for i in range(len(shares)):
subshare = shares[i]
shareid = shareids[i]
- d = self.send_subshare(shareid, segnum, subshare)
+ d = self.send_subshare(shareid, segnum, subshare, lognum)
dl.append(d)
subshare_hash = hashutil.block_hash(subshare)
#from allmydata.util import idlib
dl = self._gather_responses(dl)
def _logit(res):
- log.msg("%s uploaded %s / %s bytes (%d%%) of your file." %
- (self,
- self.segment_size*(segnum+1),
- self.segment_size*self.num_segments,
- 100 * (segnum+1) / self.num_segments,
- ))
+ self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
+ (self,
+ self.segment_size*(segnum+1),
+ self.segment_size*self.num_segments,
+ 100 * (segnum+1) / self.num_segments,
+ ))
return res
dl.addCallback(_logit)
return dl
- def send_subshare(self, shareid, segment_num, subshare):
+ def send_subshare(self, shareid, segment_num, subshare, lognum):
if shareid not in self.landlords:
return defer.succeed(None)
sh = self.landlords[shareid]
+ lognum2 = self.log("put_block to %s" % self.landlords[shareid],
+ parent=lognum)
d = sh.put_block(segment_num, subshare)
+ def _done(res):
+ self.log("put_block done", parent=lognum2)
+ return res
+ d.addCallback(_done)
d.addErrback(self._remove_shareholder, shareid,
"segnum=%d" % segment_num)
return d
def _remove_shareholder(self, why, shareid, where):
- log.msg("error while sending %s to shareholder=%d: %s" %
- (where, shareid, why)) # UNUSUAL
+ ln = self.log("UNUSUAL: error while sending %s to shareholder=%d: %s" %
+ (where, shareid, why))
if shareid in self.landlords:
del self.landlords[shareid]
else:
# even more UNUSUAL
- log.msg(" weird, they weren't in our list of landlords")
+ self.log("WEIRD: they weren't in our list of landlords", parent=ln)
if len(self.landlords) < self.shares_of_happiness:
msg = "lost too many shareholders during upload: %s" % why
raise NotEnoughPeersError(msg)
- log.msg("but we can still continue with %s shares, we'll be happy "
- "with at least %s" % (len(self.landlords),
- self.shares_of_happiness))
+ self.log("but we can still continue with %s shares, we'll be happy "
+ "with at least %s" % (len(self.landlords),
+ self.shares_of_happiness),
+ parent=ln)
def _gather_responses(self, dl):
d = defer.DeferredList(dl, fireOnOneErrback=True)
return d
def send_plaintext_hash_tree_to_all_shareholders(self):
- log.msg("%s sending plaintext hash tree" % self)
+ self.log("sending plaintext hash tree")
dl = []
for shareid in self.landlords.keys():
d = self.send_plaintext_hash_tree(shareid,
return d
def send_crypttext_hash_tree_to_all_shareholders(self):
- log.msg("%s sending crypttext hash tree" % self)
+ self.log("sending crypttext hash tree")
t = HashTree(self._crypttext_hashes)
all_hashes = list(t)
self.uri_extension_data["crypttext_root_hash"] = t[0]
return d
def send_all_subshare_hash_trees(self):
- log.msg("%s sending subshare hash trees" % self)
+ self.log("sending subshare hash trees")
dl = []
for shareid,hashes in enumerate(self.subshare_hashes):
# hashes is a list of the hashes of all subshares that were sent
# validate their share. This includes the share hash itself, but does
# not include the top-level hash root (which is stored securely in
# the URI instead).
- log.msg("%s sending all share hash trees" % self)
+ self.log("sending all share hash trees")
dl = []
for h in self.share_root_hashes:
assert h
return d
def send_uri_extension_to_all_shareholders(self):
- log.msg("%s: sending uri_extension" % self)
+ self.log("sending uri_extension")
for k in ('crypttext_root_hash', 'crypttext_hash',
'plaintext_root_hash', 'plaintext_hash',
):
return d
def close_all_shareholders(self):
- log.msg("%s: closing shareholders" % self)
+ self.log("closing shareholders")
dl = []
for shareid in self.landlords:
d = self.landlords[shareid].close()
return self._gather_responses(dl)
def done(self):
- log.msg("%s: upload done" % self)
+ self.log("upload done")
return (self.uri_extension_hash, self.required_shares,
self.num_shares, self.file_size)
def err(self, f):
- log.msg("%s: upload failed: %s" % (self, f)) # UNUSUAL
+ self.log("UNUSUAL: %s: upload failed: %s" % (self, f))
if f.check(defer.FirstError):
return f.value.subFailure
return f
self._client = client
self._wait_for_numpeers = wait_for_numpeers
self._options = options
+ self._log_number = self._client.log("CHKUploader starting")
def set_params(self, encoding_parameters):
self._encoding_parameters = encoding_parameters
+ def log(self, msg, parent=None):
+ if parent is None:
+ parent = self._log_number
+ return self._client.log(msg, parent=parent)
+
def start(self, uploadable):
"""Start uploading the file.
string)."""
uploadable = IUploadable(uploadable)
- log.msg("starting upload of %s" % uploadable)
+ self.log("starting upload of %s" % uploadable)
eu = EncryptAnUploadable(uploadable)
d = self.start_encrypted(eu)
def start_encrypted(self, encrypted):
eu = IEncryptedUploadable(encrypted)
- e = encode.Encoder(self._options)
+ e = encode.Encoder(self._options, self)
e.set_params(self._encoding_parameters)
d = e.set_encrypted_uploadable(eu)
def _wait_for_peers(res):
def locate_all_shareholders(self, encoder):
storage_index = encoder.get_param("storage_index")
upload_id = idlib.b2a(storage_index)[:6]
+ self.log("using storage index %s" % upload_id)
peer_selector = self.peer_selector_class(upload_id)
share_size = encoder.get_param("share_size")
"""
@param used_peers: a sequence of PeerTracker objects
"""
- log.msg("_send_shares, used_peers is %s" % (used_peers,))
+ self.log("_send_shares, used_peers is %s" % (used_peers,))
for peer in used_peers:
assert isinstance(peer, PeerTracker)
buckets = {}