From b1ca238176bfea38bda18d2531e967fcc1ad28a8 Mon Sep 17 00:00:00 2001
From: Brian Warner <>
Date: Tue, 28 Oct 2008 17:56:18 -0700
Subject: [PATCH] #527: respond to GETs with early ranges quickly, without
 waiting for the whole file to download. Fixes the alacrity problems with the
 earlier code. Still needs cache expiration.

 src/allmydata/immutable/ | 163 ++++++++++++++++++----------
 src/allmydata/test/   |  13 +++
 2 files changed, 121 insertions(+), 55 deletions(-)

diff --git a/src/allmydata/immutable/ b/src/allmydata/immutable/
index a9a4aa4e..40c98a83 100644
--- a/src/allmydata/immutable/
+++ b/src/allmydata/immutable/
@@ -5,8 +5,10 @@ from zope.interface import implements
 from twisted.internet import defer
 from twisted.internet.interfaces import IPushProducer, IConsumer
 from twisted.protocols import basic
-from allmydata.interfaces import IFileNode, IFileURI, ICheckable
-from allmydata.util import observer, log, base32
+from foolscap.eventual import eventually
+from allmydata.interfaces import IFileNode, IFileURI, ICheckable, \
+     IDownloadTarget
+from allmydata.util import log, base32
 from allmydata.immutable.checker import SimpleCHKFileChecker, \
 from allmydata.immutable import download
@@ -60,21 +62,116 @@ class PortionOfFile:
             self.bytes_left -= len(data)
         return data
-class FileNode(_ImmutableFileNodeBase):
-    checker_class = SimpleCHKFileChecker
-    verifier_class = SimpleCHKFileVerifier
+class CacheFile:
+    implements(IDownloadTarget)
-    def __init__(self, uri, client, cachefile):
-        _ImmutableFileNodeBase.__init__(self, uri, client)
-        self.cachefile = cachefile
+    def __init__(self, node, filename):
+        self.node = node
+        self.milestones = set() # of (offset,size,Deferred)
+        self.cachefilename = filename
+        self.download_in_progress = False
         # five states:
         #  new FileNode, no downloads ever performed
         #  new FileNode, leftover file (partial)
         #  new FileNode, leftover file (whole)
         #  download in progress, not yet complete
         #  download complete
+    def when_range_available(self, offset, size):
+        assert isinstance(offset, (int,long))
+        assert isinstance(size, (int,long))
+        d = defer.Deferred()
+        self.milestones.add( (offset,size,d) )
+        self._check_milestones()
+        if self.milestones and not self.download_in_progress:
+            self.download_in_progress = True
+            log.msg(format=("immutable filenode read [%(si)s]: " +
+                            "starting download"),
+                    si=base32.b2a(self.node.u.storage_index),
+                    umid="h26Heg", level=log.OPERATIONAL)
+            downloader = self.node._client.getServiceNamed("downloader")
+            d2 =, self)
+            d2.addBoth(self._download_done)
+            d2.addErrback(self._download_failed)
+            d2.addErrback(log.err, umid="cQaM9g")
+        return d
+    def read(self, consumer, offset, size):
+        assert offset+size <= self.get_filesize()
+        f = PortionOfFile(self.cachefilename, offset, size)
+        d = basic.FileSender().beginFileTransfer(f, consumer)
+        d.addCallback(lambda lastSent: consumer)
+        return d
+    def _download_done(self, res):
+        # clear download_in_progress, so failed downloads can be re-tried
         self.download_in_progress = False
-        self.fully_cached_observer = observer.OneShotObserverList()
+        return res
+    def _download_failed(self, f):
+        # tell anyone who's waiting that we failed
+        for m in self.milestones:
+            (offset,size,d) = m
+            eventually(d.errback, f)
+        self.milestones.clear()
+    def _check_milestones(self):
+        current_size = self.get_filesize()
+        for m in list(self.milestones):
+            (offset,size,d) = m
+            if offset+size <= current_size:
+                log.msg(format=("immutable filenode read [%(si)s] " +
+                                "%(offset)d+%(size)d vs %(filesize)d: " +
+                                "done"),
+                        si=base32.b2a(self.node.u.storage_index),
+                        offset=offset, size=size, filesize=current_size,
+                        umid="nuedUg", level=log.NOISY)
+                self.milestones.discard(m)
+                eventually(d.callback, None)
+            else:
+                log.msg(format=("immutable filenode read [%(si)s] " +
+                                "%(offset)d+%(size)d vs %(filesize)d: " +
+                                "still waiting"),
+                        si=base32.b2a(self.node.u.storage_index),
+                        offset=offset, size=size, filesize=current_size,
+                        umid="8PKOhg", level=log.NOISY)
+    def get_filesize(self):
+        try:
+            filesize = os.stat(self.cachefilename)[stat.ST_SIZE]
+        except OSError:
+            filesize = 0
+        return filesize
+    def open(self, size):
+        self.f = open(self.cachefilename, "wb")
+    def write(self, data):
+        self.f.write(data)
+        self._check_milestones()
+    def close(self):
+        self.f.close()
+        self._check_milestones()
+    def fail(self, why):
+        pass
+    def register_canceller(self, cb):
+        pass
+    def finish(self):
+        return None
+class FileNode(_ImmutableFileNodeBase):
+    checker_class = SimpleCHKFileChecker
+    verifier_class = SimpleCHKFileVerifier
+    def __init__(self, uri, client, cachefile):
+        _ImmutableFileNodeBase.__init__(self, uri, client)
+        self.cachefile = CacheFile(self, cachefile)
     def get_uri(self):
         return self.u.to_string()
@@ -121,23 +218,6 @@ class FileNode(_ImmutableFileNodeBase):
         if size is None:
             size = self.get_size() - offset
-        assert self.cachefile
-        try:
-            filesize = os.stat(self.cachefile)[stat.ST_SIZE]
-        except OSError:
-            filesize = 0
-        if filesize >= offset+size:
-            log.msg(format=("immutable filenode read [%(si)s]: " +
-                            "satisfied from cache " +
-                            "(read %(start)d+%(size)d, filesize %(filesize)d)"),
-                    si=base32.b2a(self.u.storage_index),
-                    start=offset, size=size, filesize=filesize,
-                    umid="5p5ECA", level=log.OPERATIONAL)
-            f = PortionOfFile(self.cachefile, offset, size)
-            d = basic.FileSender().beginFileTransfer(f, consumer)
-            d.addCallback(lambda lastSent: consumer)
-            return d
         if offset == 0 and size == self.get_size():
             # don't use the cache, just do a normal streaming download
@@ -147,37 +227,10 @@ class FileNode(_ImmutableFileNodeBase):
                     umid="VRSBwg", level=log.OPERATIONAL)
-        if not self.download_in_progress:
-            log.msg(format=("immutable filenode read [%(si)s]: " +
-                            "starting download"),
-                    si=base32.b2a(self.u.storage_index),
-                    umid="h26Heg", level=log.OPERATIONAL)
-            self.start_download_to_cache()
-        # The file is being downloaded, but the portion we want isn't yet
-        # available, so we have to wait. First cut: wait for the whole thing
-        # to download. The second cut will be to wait for a specific range
-        # milestone, with a download target that counts bytes and compares
-        # them against a milestone list.
-        log.msg(format=("immutable filenode read [%(si)s]: " +
-                        "waiting for download"),
-                si=base32.b2a(self.u.storage_index),
-                umid="l48V7Q", level=log.OPERATIONAL)
-        d = self.when_fully_cached()
-        d.addCallback(lambda ignored:, offset, size))
+        d = self.cachefile.when_range_available(offset, size)
+        d.addCallback(lambda res:, offset, size))
         return d
-    def start_download_to_cache(self):
-        assert not self.download_in_progress
-        self.download_in_progress = True
-        downloader = self._client.getServiceNamed("downloader")
-        d = downloader.download_to_filename(self.get_uri(), self.cachefile)
-        d.addBoth(
-    def when_fully_cached(self):
-        return self.fully_cached_observer.when_fired()
     def download(self, target):
         downloader = self._client.getServiceNamed("downloader")
         return, target)
diff --git a/src/allmydata/test/ b/src/allmydata/test/
index 70c03183..5a2fff15 100644
--- a/src/allmydata/test/
+++ b/src/allmydata/test/
@@ -202,9 +202,22 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
             def _read_tail_done(mc):
                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
             return d
+        def _test_bad_read(res):
+            bad_u = uri.from_string_filenode(self.uri)
+            bad_u.key = self.flip_bit(bad_u.key)
+            bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
+            # this should cause an error during download
+            d = self.shouldFail2(NotEnoughSharesError, "'download bad node'",
+                                 None,
+                       , MemoryConsumer(), offset=2)
+            return d
+        d.addCallback(_test_bad_read)
         def _download_nonexistent_uri(res):
             baduri = self.mangle_uri(self.uri)
             log.msg("about to download non-existent URI", level=log.UNUSUAL,