# TODO: actual extensions are closer to 419 bytes, so we can probably lower
# this.
+def pretty_print_shnum_to_servers(s):
+ return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
+
class PeerTracker:
def __init__(self, peerid, storage_server,
sharesize, blocksize, num_segments, num_share_hashes,
del self.buckets[sharenum]
-class Tahoe2PeerSelector:
+class Tahoe2PeerSelector(log.PrefixingLogMixin):
def __init__(self, upload_id, logparent=None, upload_status=None):
self.upload_id = upload_id
self.num_peers_contacted = 0
self.last_failure_msg = None
self._status = IUploadStatus(upload_status)
- self._log_parent = log.msg("%s starting" % self, parent=logparent)
+ log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
+ self.log("starting", level=log.OPERATIONAL)
def __repr__(self):
return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
ds.append(d)
self.num_peers_contacted += 1
self.query_count += 1
- log.msg("asking peer %s for any existing shares for "
- "upload id %s"
- % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
- level=log.NOISY, parent=self._log_parent)
+ self.log("asking peer %s for any existing shares" %
+ (idlib.shortnodeid_b2a(peer.peerid),),
+ level=log.NOISY)
dl = defer.DeferredList(ds)
dl.addCallback(lambda ign: self._loop())
return dl
Tahoe2PeerSelector._existing_shares.
"""
if isinstance(res, failure.Failure):
- log.msg("%s got error during existing shares check: %s"
+ self.log("%s got error during existing shares check: %s"
% (idlib.shortnodeid_b2a(peer), res),
- level=log.UNUSUAL, parent=self._log_parent)
+ level=log.UNUSUAL)
self.error_count += 1
self.bad_query_count += 1
else:
buckets = res
if buckets:
self.peers_with_shares.add(peer)
- log.msg("response from peer %s: alreadygot=%s"
+ self.log("response to get_buckets() from peer %s: alreadygot=%s"
% (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
- level=log.NOISY, parent=self._log_parent)
+ level=log.NOISY)
for bucket in buckets:
self.preexisting_shares.setdefault(bucket, set()).add(peer)
if self.homeless_shares and bucket in self.homeless_shares:
merged = merge_peers(self.preexisting_shares, self.use_peers)
effective_happiness = servers_of_happiness(merged)
if self.servers_of_happiness <= effective_happiness:
- msg = ("peer selection successful for %s: %s" % (self,
- self._get_progress_message()))
- log.msg(msg, parent=self._log_parent)
+ msg = ("server selection successful for %s: %s: pretty_print_merged: %s, self.use_peers: %s, self.preexisting_shares: %s" % (self, self._get_progress_message(), pretty_print_shnum_to_servers(merged), [', '.join(["%s: %s" % (k, idlib.shortnodeid_b2a(v._nodeid),) for k,v in p.buckets.iteritems()]) for p in self.use_peers], pretty_print_shnum_to_servers(self.preexisting_shares)))
+ self.log(msg, level=log.OPERATIONAL)
return (self.use_peers, self.preexisting_shares)
else:
# We're not okay right now, but maybe we can fix it by
self.needed_shares,
self.servers_of_happiness,
effective_happiness)
- log.msg("server selection unsuccessful for %r: %s (%s), merged=%r"
- % (self, msg, self._get_progress_message(), merged), level=log.INFREQUENT)
+ self.log("server selection unsuccessful for %r: %s (%s), merged=%s" % (self, msg, self._get_progress_message(), pretty_print_shnum_to_servers(merged)), level=log.INFREQUENT)
return self._failed("%s (%s)" % (msg, self._get_progress_message()))
if self.uncontacted_peers:
elif self.contacted_peers:
# ask a peer that we've already asked.
if not self._started_second_pass:
- log.msg("starting second pass", parent=self._log_parent,
+ self.log("starting second pass",
level=log.NOISY)
self._started_second_pass = True
num_shares = mathutil.div_ceil(len(self.homeless_shares),
self._get_progress_message()))
if self.last_failure_msg:
msg += " (%s)" % (self.last_failure_msg,)
- log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
+ self.log(msg, level=log.UNUSUAL)
return self._failed(msg)
else:
# we placed enough to be happy, so we're done
if self._status:
self._status.set_status("Placed all shares")
+ msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
+ self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
+ self.log(msg, level=log.OPERATIONAL)
return (self.use_peers, self.preexisting_shares)
def _got_response(self, res, peer, shares_to_ask, put_peer_here):
if isinstance(res, failure.Failure):
# This is unusual, and probably indicates a bug or a network
# problem.
- log.msg("%s got error during peer selection: %s" % (peer, res),
- level=log.UNUSUAL, parent=self._log_parent)
+ self.log("%s got error during peer selection: %s" % (peer, res),
+ level=log.UNUSUAL)
self.error_count += 1
self.bad_query_count += 1
self.homeless_shares = list(shares_to_ask) + self.homeless_shares
self.last_failure_msg = msg
else:
(alreadygot, allocated) = res
- log.msg("response from peer %s: alreadygot=%s, allocated=%s"
+ self.log("response to allocate_buckets() from peer %s: alreadygot=%s, allocated=%s"
% (idlib.shortnodeid_b2a(peer.peerid),
tuple(sorted(alreadygot)), tuple(sorted(allocated))),
- level=log.NOISY, parent=self._log_parent)
+ level=log.NOISY)
progress = False
for s in alreadygot:
self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
@paran already_peers: a dict mapping sharenum to a set of peerids
that claim to already have this share
"""
- self.log("_send_shares, upload_servers is %s" % (upload_servers,))
+ self.log("set_shareholders; upload_servers is %s, already_peers is %s" % ([', '.join(["%s: %s" % (k, idlib.shortnodeid_b2a(v._nodeid),) for k,v in p.buckets.iteritems()]) for p in upload_servers], already_peers))
# record already-present shares in self._results
self._results.preexisting_shares = len(already_peers)
for shnum in peer.buckets:
self._peer_trackers[shnum] = peer
servermap.setdefault(shnum, set()).add(peer.peerid)
+ self.log("set_shareholders; %s (%s) == %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers]))
assert len(buckets) == sum([len(peer.buckets) for peer in upload_servers]), "%s (%s) != %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers])
encoder.set_shareholders(buckets, servermap)
from allmydata import uri, monitor, client
from allmydata.immutable import upload, encode
from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
+from allmydata.util import log
from allmydata.util.assertutil import precondition
from allmydata.util.deferredutil import DeferredListShouldSucceed
from allmydata.test.no_network import GridTestMixin
def is_happy_enough(servertoshnums, h, k):
""" I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """
+ print "servertoshnums: ", servertoshnums, "h: ", h, "k: ", k
if len(servertoshnums) < h:
return False
# print "servertoshnums: ", servertoshnums, h, k
def _add_server(self, server_number, readonly=False):
assert self.g, "I tried to find a grid at self.g, but failed"
ss = self.g.make_server(server_number, readonly)
+ log.msg("just created a server, number: %s => %s" % (server_number, ss,))
self.g.add_server(server_number, ss)
-
def _add_server_with_share(self, server_number, share_number=None,
readonly=False):
self._add_server(server_number, readonly)
d.addCallback(_store_shares)
return d
-
def test_configure_parameters(self):
self.basedir = self.mktemp()
hooks = {0: self._set_up_nodes_extra_config}