from allmydata.util.assertutil import _assert, precondition
from allmydata.codec import CRSEncoder
from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
- IEncryptedUploadable, IUploadStatus, NotEnoughSharesError, NoSharesError
+ IEncryptedUploadable, IUploadStatus, UploadHappinessError
+
"""
The goal of the encoder is to turn the original file into a series of
msg = "lost too many servers during upload (still have %d, want %d): %s" % \
(len(servers_left),
self.servers_of_happiness, why)
- if servers_left:
- raise NotEnoughSharesError(msg)
- else:
- raise NoSharesError(msg)
+ raise UploadHappinessError(msg)
self.log("but we can still continue with %s shares, we'll be happy "
"with at least %s" % (len(servers_left),
self.servers_of_happiness),
d = defer.DeferredList(dl, fireOnOneErrback=True)
def _eatNotEnoughSharesError(f):
# all exceptions that occur while talking to a peer are handled
- # in _remove_shareholder. That might raise NotEnoughSharesError,
+ # in _remove_shareholder. That might raise UploadHappinessError,
# which will cause the DeferredList to errback but which should
- # otherwise be consumed. Allow non-NotEnoughSharesError exceptions
+ # otherwise be consumed. Allow non-UploadHappinessError exceptions
# to pass through as an unhandled errback. We use this in lieu of
# consumeErrors=True to allow coding errors to be logged.
- f.trap(NotEnoughSharesError, NoSharesError)
+ f.trap(UploadHappinessError)
return None
for d0 in dl:
d0.addErrback(_eatNotEnoughSharesError)
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
- NotEnoughSharesError, NoSharesError, NoServersError, \
- InsufficientVersionError
+ NoServersError, InsufficientVersionError, UploadHappinessError
from allmydata.immutable import layout
from pycryptopp.cipher.aes import AES
def query_allocated(self):
d = self._storageserver.callRemote("get_buckets",
self.storage_index)
- d.addCallback(self._got_allocate_reply)
return d
- def _got_allocate_reply(self, buckets):
- return (self.peerid, buckets)
-
def _got_reply(self, (alreadygot, buckets)):
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
b = {}
def __init__(self, upload_id, logparent=None, upload_status=None):
self.upload_id = upload_id
self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
+ # Peers that are working normally, but full.
+ self.full_count = 0
self.error_count = 0
self.num_peers_contacted = 0
self.last_failure_msg = None
peer = self.readonly_peers.pop()
assert isinstance(peer, PeerTracker)
d = peer.query_allocated()
- d.addCallback(self._handle_existing_response)
+ d.addBoth(self._handle_existing_response, peer.peerid)
+ self.num_peers_contacted += 1
+ self.query_count += 1
+ log.msg("asking peer %s for any existing shares for upload id %s"
+ % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
+ level=log.NOISY, parent=self._log_parent)
+ if self._status:
+ self._status.set_status("Contacting Peer %s to find "
+ "any existing shares"
+ % idlib.shortnodeid_b2a(peer.peerid))
return d
- def _handle_existing_response(self, (peer, buckets)):
- for bucket in buckets:
- if should_add_server(self.preexisting_shares, peer, bucket):
- self.preexisting_shares[bucket] = peer
- if self.homeless_shares and bucket in self.homeless_shares:
- self.homeless_shares.remove(bucket)
+ def _handle_existing_response(self, res, peer):
+ if isinstance(res, failure.Failure):
+ log.msg("%s got error during existing shares check: %s"
+ % (idlib.shortnodeid_b2a(peer), res),
+ level=log.UNUSUAL, parent=self._log_parent)
+ self.error_count += 1
+ self.bad_query_count += 1
+ else:
+ buckets = res
+ log.msg("response from peer %s: alreadygot=%s"
+ % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
+ level=log.NOISY, parent=self._log_parent)
+ for bucket in buckets:
+ if should_add_server(self.preexisting_shares, peer, bucket):
+ self.preexisting_shares[bucket] = peer
+ if self.homeless_shares and bucket in self.homeless_shares:
+ self.homeless_shares.remove(bucket)
+ self.full_count += 1
+ self.bad_query_count += 1
return self._existing_shares()
def _loop(self):
items.append((servernum, sharelist))
return self._loop()
else:
- raise NotEnoughSharesError("shares could only be placed "
+ raise UploadHappinessError("shares could only be placed "
"on %d servers (%d were requested)" %
(len(effective_happiness),
self.servers_of_happiness))
msg = ("placed %d shares out of %d total (%d homeless), "
"want to place on %d servers, "
"sent %d queries to %d peers, "
- "%d queries placed some shares, %d placed none, "
- "got %d errors" %
+ "%d queries placed some shares, %d placed none "
+ "(of which %d placed none due to the server being"
+ " full and %d placed none due to an error)" %
(self.total_shares - len(self.homeless_shares),
self.total_shares, len(self.homeless_shares),
self.servers_of_happiness,
self.query_count, self.num_peers_contacted,
self.good_query_count, self.bad_query_count,
- self.error_count))
+ self.full_count, self.error_count))
msg = "peer selection failed for %s: %s" % (self, msg)
if self.last_failure_msg:
msg += " (%s)" % (self.last_failure_msg,)
log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
- if placed_shares:
- raise NotEnoughSharesError(msg)
- else:
- raise NoSharesError(msg)
+ raise UploadHappinessError(msg)
else:
# we placed enough to be happy, so we're done
if self._status:
log.msg("%s got error during peer selection: %s" % (peer, res),
level=log.UNUSUAL, parent=self._log_parent)
self.error_count += 1
+ self.bad_query_count += 1
self.homeless_shares = list(shares_to_ask) + self.homeless_shares
if (self.uncontacted_peers
or self.contacted_peers
self.preexisting_shares[s] = peer.peerid
if s in self.homeless_shares:
self.homeless_shares.remove(s)
- progress = True
# the PeerTracker will remember which shares were allocated on
# that peer. We just have to remember to use them.
self.good_query_count += 1
else:
self.bad_query_count += 1
+ self.full_count += 1
if still_homeless:
# In networks with lots of space, this is very unusual and