]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
immutable repairer: errback any pending readers of DownUpConnectorwhen it runs out...
authorZooko O'Whielacronx <zooko@zooko.com>
Thu, 12 Feb 2009 03:11:29 +0000 (20:11 -0700)
committerZooko O'Whielacronx <zooko@zooko.com>
Thu, 12 Feb 2009 03:11:29 +0000 (20:11 -0700)
src/allmydata/immutable/repairer.py
src/allmydata/test/test_repairer.py

index 880188c8b83334d04ec5a9f4095e50573973fae2..92957d977ee86a029fb08b5e53fc08374eebc5fe 100644 (file)
@@ -73,6 +73,16 @@ class Repairer(log.PrefixingLogMixin):
 
         return d
 
+class PrematureClose(Exception):
+    # Uploader asked DUC to read a certain number of bytes, and
+    # Downloader closed DUC before writing enough bytes to satisfy the
+    # read.
+    def __init__(self, requested, avail):
+        self.requested = requested
+        self.avail = avail
+    def __repr__(self):
+        return "<%s requested: %d, avail: %d>" % (self.__class__.__name__, self.requested, self.avail)
+
 class DownUpConnector(log.PrefixingLogMixin):
     implements(IEncryptedUploadable, IDownloadTarget, IConsumer)
     """ I act like an "encrypted uploadable" -- something that a local uploader can read
@@ -174,6 +184,10 @@ class DownUpConnector(log.PrefixingLogMixin):
         pass
     def close(self):
         self._closed_to_pusher = True
+        # Any reads which haven't been satisfied by now are not going to be satisfied.
+        while self.next_read_ds:
+            self.next_read_ds.popleft().errback(
+                PrematureClose(self.next_read_lens.popleft(), self.bufsiz))
 
     # methods to satisfy the IEncryptedUploader interface
     # (From the perspective of an uploader I am an IEncryptedUploadable.)
index 814e97d79bba872d8a68afc547088b5aa6b5b48a..c7f1e50bab8eef727aeb00bad24294968ce2df14 100644 (file)
@@ -353,7 +353,7 @@ class DownUpConnector(unittest.TestCase):
         duc.write('\x02')
         return d
 
-    def test_leftovers(self):
+    def test_extra(self):
         duc = repairer.DownUpConnector()
         duc.registerProducer(None, True) # just because you have to call registerProducer first
         # case 1: total data in buf is < requested data at time of request
@@ -367,6 +367,20 @@ class DownUpConnector(unittest.TestCase):
         duc.write('\x02\0x03')
         return d
 
+    def test_premature_close(self):
+        duc = repairer.DownUpConnector()
+        duc.registerProducer(None, True) # just because you have to call registerProducer first
+        d = duc.read_encrypted(2, False)
+        duc.close()
+        def _callb(f):
+            self.fail("Should have errbacked.")
+        def _errb(f):
+            f.trap(repairer.PrematureClose)
+            self.failUnlessEqual(f.value.requested, 2)
+            # Okay, you pass the test.
+        d.addCallbacks(_callb, _errb)
+        return d
+
 class Repairer(common.ShareManglingMixin, unittest.TestCase):
     def test_test_code(self):
         # The following process of stashing the shares, running