From: Brian Warner Date: Wed, 6 Jun 2007 19:40:16 +0000 (-0700) Subject: encode.py: clean up handling of lost peers during upload, add some logging X-Git-Url: https://git.rkrishnan.org/somewhere?a=commitdiff_plain;h=f4c048bbeba15f511a7d2cf48276979591deaa33;p=tahoe-lafs%2Ftahoe-lafs.git encode.py: clean up handling of lost peers during upload, add some logging --- diff --git a/src/allmydata/encode.py b/src/allmydata/encode.py index 57b2c3e9..84faa770 100644 --- a/src/allmydata/encode.py +++ b/src/allmydata/encode.py @@ -254,7 +254,7 @@ class Encoder(object): dl.append(d) subshare_hash = block_hash(subshare) self.subshare_hashes[shareid].append(subshare_hash) - dl = defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True) + dl = self._gather_responses(dl) def _logit(res): log.msg("%s uploaded %s / %s bytes of your file." % (self, self.segment_size*(segnum+1), self.segment_size*self.num_segments)) return res @@ -273,10 +273,32 @@ class Encoder(object): def _remove_shareholder(self, why, shareid, where): log.msg("error while sending %s to shareholder=%d: %s" % (where, shareid, why)) # UNUSUAL - del self.landlords[shareid] + if shareid in self.landlords: + del self.landlords[shareid] + else: + # even more UNUSUAL + log.msg(" weird, they weren't in our list of landlords") if len(self.landlords) < self.shares_of_happiness: msg = "lost too many shareholders during upload" raise NotEnoughPeersError(msg) + log.msg("but we can still continue with %s shares, we'll be happy " + "with at least %s" % (len(self.landlords), + self.shares_of_happiness)) + + def _gather_responses(self, dl): + d = defer.DeferredList(dl, fireOnOneErrback=True) + def _eatNotEnoughPeersError(f): + # all exceptions that occur while talking to a peer are handled + # in _remove_shareholder. That might raise NotEnoughPeersError, + # which will cause the DeferredList to errback but which should + # otherwise be consumed. Allow non-NotEnoughPeersError exceptions + # to pass through as an unhandled errback. We use this in lieu of + # consumeErrors=True to allow coding errors to be logged. + f.trap(NotEnoughPeersError) + return None + for d0 in dl: + d0.addErrback(_eatNotEnoughPeersError) + return d def send_all_subshare_hash_trees(self): log.msg("%s sending subshare hash trees" % self) @@ -285,8 +307,7 @@ class Encoder(object): # hashes is a list of the hashes of all subshares that were sent # to shareholder[shareid]. dl.append(self.send_one_subshare_hash_tree(shareid, hashes)) - return defer.DeferredList(dl, fireOnOneErrback=True, - consumeErrors=True) + return self._gather_responses(dl) def send_one_subshare_hash_tree(self, shareid, subshare_hashes): t = HashTree(subshare_hashes) @@ -322,8 +343,7 @@ class Encoder(object): needed_hash_indices = t.needed_hashes(i, include_leaf=True) hashes = [(hi, t[hi]) for hi in needed_hash_indices] dl.append(self.send_one_share_hash_tree(i, hashes)) - return defer.DeferredList(dl, fireOnOneErrback=True, - consumeErrors=True) + return self._gather_responses(dl) def send_one_share_hash_tree(self, shareid, needed_hashes): if shareid not in self.landlords: @@ -340,8 +360,7 @@ class Encoder(object): dl = [] for shareid in self.landlords.keys(): dl.append(self.send_thingA(shareid, thingA)) - return defer.DeferredList(dl, fireOnOneErrback=True, - consumeErrors=True) + return self._gather_responses(dl) def send_thingA(self, shareid, thingA): sh = self.landlords[shareid] @@ -356,8 +375,7 @@ class Encoder(object): d = self.landlords[shareid].callRemote("close") d.addErrback(self._remove_shareholder, shareid, "close") dl.append(d) - return defer.DeferredList(dl, fireOnOneErrback=True, - consumeErrors=True) + return self._gather_responses(dl) def done(self): log.msg("%s: upload done" % self)