From: Zooko O'Whielacronx Date: Thu, 12 Feb 2009 03:11:29 +0000 (-0700) Subject: immutable repairer: errback any pending readers of DownUpConnectorwhen it runs out... X-Git-Tag: allmydata-tahoe-1.3.0~21 X-Git-Url: https://git.rkrishnan.org/vdrive/components/com_hotproperty/status?a=commitdiff_plain;h=76d7cc440444141b4ea84a3fc3327d3c06b620a4;p=tahoe-lafs%2Ftahoe-lafs.git immutable repairer: errback any pending readers of DownUpConnectorwhen it runs out of bytes, and test that fact --- diff --git a/src/allmydata/immutable/repairer.py b/src/allmydata/immutable/repairer.py index 880188c8..92957d97 100644 --- a/src/allmydata/immutable/repairer.py +++ b/src/allmydata/immutable/repairer.py @@ -73,6 +73,16 @@ class Repairer(log.PrefixingLogMixin): return d +class PrematureClose(Exception): + # Uploader asked DUC to read a certain number of bytes, and + # Downloader closed DUC before writing enough bytes to satisfy the + # read. + def __init__(self, requested, avail): + self.requested = requested + self.avail = avail + def __repr__(self): + return "<%s requested: %d, avail: %d>" % (self.__class__.__name__, self.requested, self.avail) + class DownUpConnector(log.PrefixingLogMixin): implements(IEncryptedUploadable, IDownloadTarget, IConsumer) """ I act like an "encrypted uploadable" -- something that a local uploader can read @@ -174,6 +184,10 @@ class DownUpConnector(log.PrefixingLogMixin): pass def close(self): self._closed_to_pusher = True + # Any reads which haven't been satisfied by now are not going to be satisfied. + while self.next_read_ds: + self.next_read_ds.popleft().errback( + PrematureClose(self.next_read_lens.popleft(), self.bufsiz)) # methods to satisfy the IEncryptedUploader interface # (From the perspective of an uploader I am an IEncryptedUploadable.) diff --git a/src/allmydata/test/test_repairer.py b/src/allmydata/test/test_repairer.py index 814e97d7..c7f1e50b 100644 --- a/src/allmydata/test/test_repairer.py +++ b/src/allmydata/test/test_repairer.py @@ -353,7 +353,7 @@ class DownUpConnector(unittest.TestCase): duc.write('\x02') return d - def test_leftovers(self): + def test_extra(self): duc = repairer.DownUpConnector() duc.registerProducer(None, True) # just because you have to call registerProducer first # case 1: total data in buf is < requested data at time of request @@ -367,6 +367,20 @@ class DownUpConnector(unittest.TestCase): duc.write('\x02\0x03') return d + def test_premature_close(self): + duc = repairer.DownUpConnector() + duc.registerProducer(None, True) # just because you have to call registerProducer first + d = duc.read_encrypted(2, False) + duc.close() + def _callb(f): + self.fail("Should have errbacked.") + def _errb(f): + f.trap(repairer.PrematureClose) + self.failUnlessEqual(f.value.requested, 2) + # Okay, you pass the test. + d.addCallbacks(_callb, _errb) + return d + class Repairer(common.ShareManglingMixin, unittest.TestCase): def test_test_code(self): # The following process of stashing the shares, running