from foolscap import Referenceable
from allmydata.util import idlib, bencode
+from allmydata.util.idlib import peerid_to_short_string as shortid
from allmydata.util.deferredutil import DeferredListShouldSucceed
from allmydata import codec
for p in self.permuted:
assert isinstance(p, str)
# we will shrink self.permuted as we give up on peers
- self.peer_index = 0
- self.goodness_points = 0
- self.landlords = [] # list of (peerid, bucket_num, remotebucket)
- d = defer.maybeDeferred(self._check_next_peer)
+ d = defer.maybeDeferred(self._find_peers)
d.addCallback(self._got_enough_peers)
d.addCallback(self._compute_uri)
return d
def _compute_uri(self, params):
return "URI:%s" % bencode.bencode((self._verifierid, params))
+ def _build_not_enough_peers_error(self):
+ yes = ",".join([shortid(p) for p in self.peers_who_said_yes])
+ no = ",".join([shortid(p) for p in self.peers_who_said_no])
+ err = ",".join([shortid(p) for p in self.peers_who_had_errors])
+ msg = ("%s goodness, want %s, have %d "
+ "landlords, %d total peers, "
+ "peers:yes=%s;no=%s;err=%s" %
+ (self.goodness_points, self.target_goodness,
+ len(self.landlords), self._total_peers,
+ yes, no, err))
+ return msg
+
+ def _find_peers(self):
+ # this returns a Deferred which fires (with a meaningless value) when
+ # enough peers are found, or errbacks with a NotEnoughPeersError if
+ # not.
+ self.peer_index = 0
+ self.goodness_points = 0
+ self.landlords = [] # list of (peerid, bucket_num, remotebucket)
+ return self._check_next_peer()
+
def _check_next_peer(self):
if self.debug:
log.msg("FileUploader._check_next_peer: %d permuted, %d goodness"
(len(self.permuted), self.goodness_points,
self.target_goodness, len(self.landlords),
self._total_peers))
- if len(self.permuted) == 0:
- # there are no more to check
- yes = ",".join([idlib.peerid_to_short_string(p)
- for p in self.peers_who_said_yes])
- no = ",".join([idlib.peerid_to_short_string(p)
- for p in self.peers_who_said_no])
- err = ",".join([idlib.peerid_to_short_string(p)
- for p in self.peers_who_had_errors])
- msg = ("%s goodness, want %s, have %d "
- "landlords, %d total peers, "
- "peers:yes=%s;no=%s;err=%s" %
- (self.goodness_points, self.target_goodness,
- len(self.landlords), self._total_peers,
- yes, no, err))
+ if (self.goodness_points >= self.target_goodness and
+ len(self.landlords) >= self.min_shares):
+ if self.debug: print " we're done!"
+ return "done"
+ if not self.permuted:
+ # we've run out of peers to check without finding enough, which
+ # means we won't be able to upload this file. Bummer.
+ msg = self._build_not_enough_peers_error()
log.msg("NotEnoughPeersError: %s" % msg)
raise NotEnoughPeersError(msg)
+
+ # otherwise we use self.peer_index to rotate through all the usable
+ # peers. It gets inremented elsewhere, but wrapped here.
if self.peer_index >= len(self.permuted):
self.peer_index = 0
peerid = self.permuted[self.peer_index]
+ d = self._check_peer(peerid)
+ d.addCallback(lambda res: self._check_next_peer())
+ return d
+
+ def _check_peer(self, peerid):
+ # contact a single peer, and ask them to hold a share. If they say
+ # yes, we update self.landlords and self.goodness_points, and
+ # increment self.peer_index. If they say no, or are uncontactable, we
+ # remove them from self.permuted. This returns a Deferred which never
+ # errbacks.
+
+ bucket_num = len(self.landlords)
d = self._peer.get_remote_service(peerid, "storageserver")
def _got_peer(service):
- bucket_num = len(self.landlords)
- if self.debug: print "asking %s" % idlib.b2a(peerid)
+ if self.debug: print "asking %s" % shortid(peerid)
d2 = service.callRemote("allocate_bucket",
verifierid=self._verifierid,
bucket_num=bucket_num,
size=self._share_size,
leaser=self._peer.nodeid,
canary=Referenceable())
- def _allocate_response(bucket):
- if self.debug:
- print " peerid %s will grant us a lease" % idlib.b2a(peerid)
- self.peers_who_said_yes.append(peerid)
- self.landlords.append( (peerid, bucket_num, bucket) )
- self.goodness_points += 1
- if (self.goodness_points >= self.target_goodness and
- len(self.landlords) >= self.min_shares):
- if self.debug: print " we're done!"
- raise HaveAllPeersError()
- # otherwise we fall through to allocate more peers
- d2.addCallback(_allocate_response)
return d2
d.addCallback(_got_peer)
- def _done_with_peer(res):
- if self.debug: print "done with peer %s:" % idlib.b2a(peerid)
- if isinstance(res, failure.Failure):
- if res.check(HaveAllPeersError):
- if self.debug: print " all done"
- # we're done!
- return
- if res.check(TooFullError):
- if self.debug: print " too full"
- self.peers_who_said_no.append(peerid)
- elif res.check(IndexError):
- if self.debug: print " no connection"
- self.peers_who_had_errors.append(peerid)
- else:
- if self.debug: print " other error:", res
- self.peers_who_had_errors.append(peerid)
- self.permuted.remove(peerid) # this peer was unusable
+
+ def _allocate_response(bucket):
+ if self.debug:
+ print " peerid %s will grant us a lease" % shortid(peerid)
+ self.peers_who_said_yes.append(peerid)
+ self.landlords.append( (peerid, bucket_num, bucket) )
+ self.goodness_points += 1
+ self.peer_index += 1
+
+ d.addCallback(_allocate_response)
+
+ def _err(f):
+ if self.debug: print "err from peer %s:" % idlib.b2a(peerid)
+ assert isinstance(f, failure.Failure)
+ if f.check(TooFullError):
+ if self.debug: print " too full"
+ self.peers_who_said_no.append(peerid)
+ elif f.check(IndexError):
+ if self.debug: print " no connection"
+ self.peers_who_had_errors.append(peerid)
else:
- if self.debug: print " they gave us a lease"
- # we get here for either good peers (when we still need
- # more), or after checking a bad peer (and thus still need
- # more). So now we need to grab a new peer.
- self.peer_index += 1
- return self._check_next_peer()
- d.addBoth(_done_with_peer)
+ if self.debug: print " other error:", res
+ self.peers_who_had_errors.append(peerid)
+ log.msg("FileUploader._check_peer(%s): err" % shortid(peerid))
+ log.msg(f)
+ self.permuted.remove(peerid) # this peer was unusable
+ return None
+ d.addErrback(_err)
return d
def _got_enough_peers(self, res):