]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
upload: abort the bucket upon any write error, and do it with callRemoteOnly to avoid...
authorBrian Warner <warner@allmydata.com>
Tue, 10 Jun 2008 18:55:28 +0000 (11:55 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 10 Jun 2008 18:55:28 +0000 (11:55 -0700)
src/allmydata/encode.py
src/allmydata/storage.py

index affb181fb66b98ec73469278e120392eccd9c1cb..766292fee4baef022ea9f654580f0d6279b28298 100644 (file)
@@ -251,7 +251,7 @@ class Encoder(object):
         d.addCallback(lambda res: self.send_uri_extension_to_all_shareholders())
 
         d.addCallback(lambda res: self.close_all_shareholders())
-        d.addCallbacks(lambda res: self.done(), self.err)
+        d.addCallbacks(self.done, self.err)
         return d
 
     def set_status(self, status):
@@ -481,6 +481,7 @@ class Encoder(object):
                       method=where, shnum=shareid,
                       level=log.UNUSUAL, failure=why)
         if shareid in self.landlords:
+            self.landlords[shareid].abort()
             del self.landlords[shareid]
         else:
             # even more UNUSUAL
@@ -678,7 +679,7 @@ class Encoder(object):
             dl.append(d)
         return self._gather_responses(dl)
 
-    def done(self):
+    def done(self, res):
         self.log("upload done", level=log.OPERATIONAL)
         self.set_status("Done")
         self.set_encode_and_push_progress(extra=1.0) # done
@@ -699,19 +700,11 @@ class Encoder(object):
         # we need to abort any remaining shareholders, so they'll delete the
         # partial share, allowing someone else to upload it again.
         self.log("aborting shareholders", level=log.UNUSUAL)
-        dl = []
         for shareid in list(self.landlords.keys()):
-            d = self.landlords[shareid].abort()
-            d.addErrback(self._remove_shareholder, shareid, "abort")
-            dl.append(d)
-        d = self._gather_responses(dl)
-        def _done(res):
-            self.log("shareholders aborted", level=log.UNUSUAL)
-            if f.check(defer.FirstError):
-                return f.value.subFailure
-            return f
-        d.addCallback(_done)
-        return d
+            self.landlords[shareid].abort()
+        if f.check(defer.FirstError):
+            return f.value.subFailure
+        return f
 
     def get_shares_placed(self):
         # return a set of share numbers that were successfully placed.
index 7bee77d5d80827cb945dc061ee567ef035f7f6b7..0627d7d13d1fc0c611d6193812331630850b0831 100644 (file)
@@ -1215,7 +1215,7 @@ class WriteBucketProxy:
         return self._rref.callRemote("close")
 
     def abort(self):
-        return self._rref.callRemote("abort")
+        return self._rref.callRemoteOnly("abort")
 
 class ReadBucketProxy:
     implements(IStorageBucketReader)