]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
encode.py: clean up handling of lost peers during upload, add some logging
authorBrian Warner <warner@allmydata.com>
Wed, 6 Jun 2007 19:40:16 +0000 (12:40 -0700)
committerBrian Warner <warner@allmydata.com>
Wed, 6 Jun 2007 19:40:16 +0000 (12:40 -0700)
src/allmydata/encode.py

index 57b2c3e965994707bbe1ac420b4d534d6a15c44c..84faa77036323b88945dafd36a478c35ef68bb2c 100644 (file)
@@ -254,7 +254,7 @@ class Encoder(object):
             dl.append(d)
             subshare_hash = block_hash(subshare)
             self.subshare_hashes[shareid].append(subshare_hash)
-        dl = defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
+        dl = self._gather_responses(dl)
         def _logit(res):
             log.msg("%s uploaded %s / %s bytes of your file." % (self, self.segment_size*(segnum+1), self.segment_size*self.num_segments))
             return res
@@ -273,10 +273,32 @@ class Encoder(object):
     def _remove_shareholder(self, why, shareid, where):
         log.msg("error while sending %s to shareholder=%d: %s" %
                 (where, shareid, why)) # UNUSUAL
-        del self.landlords[shareid]
+        if shareid in self.landlords:
+            del self.landlords[shareid]
+        else:
+            # even more UNUSUAL
+            log.msg(" weird, they weren't in our list of landlords")
         if len(self.landlords) < self.shares_of_happiness:
             msg = "lost too many shareholders during upload"
             raise NotEnoughPeersError(msg)
+        log.msg("but we can still continue with %s shares, we'll be happy "
+                "with at least %s" % (len(self.landlords),
+                                      self.shares_of_happiness))
+
+    def _gather_responses(self, dl):
+        d = defer.DeferredList(dl, fireOnOneErrback=True)
+        def _eatNotEnoughPeersError(f):
+            # all exceptions that occur while talking to a peer are handled
+            # in _remove_shareholder. That might raise NotEnoughPeersError,
+            # which will cause the DeferredList to errback but which should
+            # otherwise be consumed. Allow non-NotEnoughPeersError exceptions
+            # to pass through as an unhandled errback. We use this in lieu of
+            # consumeErrors=True to allow coding errors to be logged.
+            f.trap(NotEnoughPeersError)
+            return None
+        for d0 in dl:
+            d0.addErrback(_eatNotEnoughPeersError)
+        return d
 
     def send_all_subshare_hash_trees(self):
         log.msg("%s sending subshare hash trees" % self)
@@ -285,8 +307,7 @@ class Encoder(object):
             # hashes is a list of the hashes of all subshares that were sent
             # to shareholder[shareid].
             dl.append(self.send_one_subshare_hash_tree(shareid, hashes))
-        return defer.DeferredList(dl, fireOnOneErrback=True,
-                                  consumeErrors=True)
+        return self._gather_responses(dl)
 
     def send_one_subshare_hash_tree(self, shareid, subshare_hashes):
         t = HashTree(subshare_hashes)
@@ -322,8 +343,7 @@ class Encoder(object):
             needed_hash_indices = t.needed_hashes(i, include_leaf=True)
             hashes = [(hi, t[hi]) for hi in needed_hash_indices]
             dl.append(self.send_one_share_hash_tree(i, hashes))
-        return defer.DeferredList(dl, fireOnOneErrback=True,
-                                  consumeErrors=True)
+        return self._gather_responses(dl)
 
     def send_one_share_hash_tree(self, shareid, needed_hashes):
         if shareid not in self.landlords:
@@ -340,8 +360,7 @@ class Encoder(object):
         dl = []
         for shareid in self.landlords.keys():
             dl.append(self.send_thingA(shareid, thingA))
-        return defer.DeferredList(dl, fireOnOneErrback=True,
-                                  consumeErrors=True)
+        return self._gather_responses(dl)
 
     def send_thingA(self, shareid, thingA):
         sh = self.landlords[shareid]
@@ -356,8 +375,7 @@ class Encoder(object):
             d = self.landlords[shareid].callRemote("close")
             d.addErrback(self._remove_shareholder, shareid, "close")
             dl.append(d)
-        return defer.DeferredList(dl, fireOnOneErrback=True,
-                                  consumeErrors=True)
+        return self._gather_responses(dl)
 
     def done(self):
         log.msg("%s: upload done" % self)