From 76d7cc440444141b4ea84a3fc3327d3c06b620a4 Mon Sep 17 00:00:00 2001 From: Zooko O'Whielacronx Date: Wed, 11 Feb 2009 20:11:29 -0700 Subject: [PATCH] immutable repairer: errback any pending readers of DownUpConnectorwhen it runs out of bytes, and test that fact --- src/allmydata/immutable/repairer.py | 14 ++++++++++++++ src/allmydata/test/test_repairer.py | 16 +++++++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/allmydata/immutable/repairer.py b/src/allmydata/immutable/repairer.py index 880188c8..92957d97 100644 --- a/src/allmydata/immutable/repairer.py +++ b/src/allmydata/immutable/repairer.py @@ -73,6 +73,16 @@ class Repairer(log.PrefixingLogMixin): return d +class PrematureClose(Exception): + # Uploader asked DUC to read a certain number of bytes, and + # Downloader closed DUC before writing enough bytes to satisfy the + # read. + def __init__(self, requested, avail): + self.requested = requested + self.avail = avail + def __repr__(self): + return "<%s requested: %d, avail: %d>" % (self.__class__.__name__, self.requested, self.avail) + class DownUpConnector(log.PrefixingLogMixin): implements(IEncryptedUploadable, IDownloadTarget, IConsumer) """ I act like an "encrypted uploadable" -- something that a local uploader can read @@ -174,6 +184,10 @@ class DownUpConnector(log.PrefixingLogMixin): pass def close(self): self._closed_to_pusher = True + # Any reads which haven't been satisfied by now are not going to be satisfied. + while self.next_read_ds: + self.next_read_ds.popleft().errback( + PrematureClose(self.next_read_lens.popleft(), self.bufsiz)) # methods to satisfy the IEncryptedUploader interface # (From the perspective of an uploader I am an IEncryptedUploadable.) diff --git a/src/allmydata/test/test_repairer.py b/src/allmydata/test/test_repairer.py index 814e97d7..c7f1e50b 100644 --- a/src/allmydata/test/test_repairer.py +++ b/src/allmydata/test/test_repairer.py @@ -353,7 +353,7 @@ class DownUpConnector(unittest.TestCase): duc.write('\x02') return d - def test_leftovers(self): + def test_extra(self): duc = repairer.DownUpConnector() duc.registerProducer(None, True) # just because you have to call registerProducer first # case 1: total data in buf is < requested data at time of request @@ -367,6 +367,20 @@ class DownUpConnector(unittest.TestCase): duc.write('\x02\0x03') return d + def test_premature_close(self): + duc = repairer.DownUpConnector() + duc.registerProducer(None, True) # just because you have to call registerProducer first + d = duc.read_encrypted(2, False) + duc.close() + def _callb(f): + self.fail("Should have errbacked.") + def _errb(f): + f.trap(repairer.PrematureClose) + self.failUnlessEqual(f.value.requested, 2) + # Okay, you pass the test. + d.addCallbacks(_callb, _errb) + return d + class Repairer(common.ShareManglingMixin, unittest.TestCase): def test_test_code(self): # The following process of stashing the shares, running -- 2.45.2