dl.append(d)
subshare_hash = block_hash(subshare)
self.subshare_hashes[shareid].append(subshare_hash)
- dl = defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
+ dl = self._gather_responses(dl)
def _logit(res):
log.msg("%s uploaded %s / %s bytes of your file." % (self, self.segment_size*(segnum+1), self.segment_size*self.num_segments))
return res
def _remove_shareholder(self, why, shareid, where):
log.msg("error while sending %s to shareholder=%d: %s" %
(where, shareid, why)) # UNUSUAL
- del self.landlords[shareid]
+ if shareid in self.landlords:
+ del self.landlords[shareid]
+ else:
+ # even more UNUSUAL
+ log.msg(" weird, they weren't in our list of landlords")
if len(self.landlords) < self.shares_of_happiness:
msg = "lost too many shareholders during upload"
raise NotEnoughPeersError(msg)
+ log.msg("but we can still continue with %s shares, we'll be happy "
+ "with at least %s" % (len(self.landlords),
+ self.shares_of_happiness))
+
+ def _gather_responses(self, dl):
+ d = defer.DeferredList(dl, fireOnOneErrback=True)
+ def _eatNotEnoughPeersError(f):
+ # all exceptions that occur while talking to a peer are handled
+ # in _remove_shareholder. That might raise NotEnoughPeersError,
+ # which will cause the DeferredList to errback but which should
+ # otherwise be consumed. Allow non-NotEnoughPeersError exceptions
+ # to pass through as an unhandled errback. We use this in lieu of
+ # consumeErrors=True to allow coding errors to be logged.
+ f.trap(NotEnoughPeersError)
+ return None
+ for d0 in dl:
+ d0.addErrback(_eatNotEnoughPeersError)
+ return d
def send_all_subshare_hash_trees(self):
log.msg("%s sending subshare hash trees" % self)
# hashes is a list of the hashes of all subshares that were sent
# to shareholder[shareid].
dl.append(self.send_one_subshare_hash_tree(shareid, hashes))
- return defer.DeferredList(dl, fireOnOneErrback=True,
- consumeErrors=True)
+ return self._gather_responses(dl)
def send_one_subshare_hash_tree(self, shareid, subshare_hashes):
t = HashTree(subshare_hashes)
needed_hash_indices = t.needed_hashes(i, include_leaf=True)
hashes = [(hi, t[hi]) for hi in needed_hash_indices]
dl.append(self.send_one_share_hash_tree(i, hashes))
- return defer.DeferredList(dl, fireOnOneErrback=True,
- consumeErrors=True)
+ return self._gather_responses(dl)
def send_one_share_hash_tree(self, shareid, needed_hashes):
if shareid not in self.landlords:
dl = []
for shareid in self.landlords.keys():
dl.append(self.send_thingA(shareid, thingA))
- return defer.DeferredList(dl, fireOnOneErrback=True,
- consumeErrors=True)
+ return self._gather_responses(dl)
def send_thingA(self, shareid, thingA):
sh = self.landlords[shareid]
d = self.landlords[shareid].callRemote("close")
d.addErrback(self._remove_shareholder, shareid, "close")
dl.append(d)
- return defer.DeferredList(dl, fireOnOneErrback=True,
- consumeErrors=True)
+ return self._gather_responses(dl)
def done(self):
log.msg("%s: upload done" % self)