From 22a07e9bbe682d6e36efd97fdd44b82db8fcbfd4 Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@lothar.com>
Date: Wed, 4 Aug 2010 00:26:29 -0700
Subject: [PATCH] Rewrite immutable downloader (#798). This patch adds the new
 downloader itself.

---
 src/allmydata/immutable/download.py           | 1321 -----------------
 .../immutable/downloader/__init__.py          |    0
 src/allmydata/immutable/downloader/common.py  |   13 +
 src/allmydata/immutable/downloader/fetcher.py |  229 +++
 src/allmydata/immutable/downloader/finder.py  |  202 +++
 src/allmydata/immutable/downloader/node.py    |  471 ++++++
 .../immutable/downloader/segmentation.py      |  160 ++
 src/allmydata/immutable/downloader/share.py   |  848 +++++++++++
 src/allmydata/immutable/downloader/status.py  |  170 +++
 9 files changed, 2093 insertions(+), 1321 deletions(-)
 delete mode 100644 src/allmydata/immutable/download.py
 create mode 100644 src/allmydata/immutable/downloader/__init__.py
 create mode 100644 src/allmydata/immutable/downloader/common.py
 create mode 100644 src/allmydata/immutable/downloader/fetcher.py
 create mode 100644 src/allmydata/immutable/downloader/finder.py
 create mode 100644 src/allmydata/immutable/downloader/node.py
 create mode 100644 src/allmydata/immutable/downloader/segmentation.py
 create mode 100644 src/allmydata/immutable/downloader/share.py
 create mode 100644 src/allmydata/immutable/downloader/status.py

diff --git a/src/allmydata/immutable/download.py b/src/allmydata/immutable/download.py
deleted file mode 100644
index eb02c6aa..00000000
--- a/src/allmydata/immutable/download.py
+++ /dev/null
@@ -1,1321 +0,0 @@
-import random, weakref, itertools, time
-from zope.interface import implements
-from twisted.internet import defer, reactor
-from twisted.internet.interfaces import IPushProducer, IConsumer
-from foolscap.api import DeadReferenceError, RemoteException, eventually
-
-from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib
-from allmydata.util.assertutil import _assert, precondition
-from allmydata import codec, hashtree, uri
-from allmydata.interfaces import IDownloadTarget, IDownloader, IVerifierURI, \
-     IDownloadStatus, IDownloadResults, IValidatedThingProxy, \
-     IStorageBroker, NotEnoughSharesError, NoSharesError, NoServersError, \
-     UnableToFetchCriticalDownloadDataError
-from allmydata.immutable import layout
-from allmydata.monitor import Monitor
-from pycryptopp.cipher.aes import AES
-
-class IntegrityCheckReject(Exception):
-    pass
-
-class BadURIExtensionHashValue(IntegrityCheckReject):
-    pass
-class BadURIExtension(IntegrityCheckReject):
-    pass
-class UnsupportedErasureCodec(BadURIExtension):
-    pass
-class BadCrypttextHashValue(IntegrityCheckReject):
-    pass
-class BadOrMissingHash(IntegrityCheckReject):
-    pass
-
-class DownloadStopped(Exception):
-    pass
-
-class DownloadResults:
-    implements(IDownloadResults)
-
-    def __init__(self):
-        self.servers_used = set()
-        self.server_problems = {}
-        self.servermap = {}
-        self.timings = {}
-        self.file_size = None
-
-class DecryptingTarget(log.PrefixingLogMixin):
-    implements(IDownloadTarget, IConsumer)
-    def __init__(self, target, key, _log_msg_id=None):
-        precondition(IDownloadTarget.providedBy(target), target)
-        self.target = target
-        self._decryptor = AES(key)
-        prefix = str(target)
-        log.PrefixingLogMixin.__init__(self, "allmydata.immutable.download", _log_msg_id, prefix=prefix)
-    # methods to satisfy the IConsumer interface
-    def registerProducer(self, producer, streaming):
-        if IConsumer.providedBy(self.target):
-            self.target.registerProducer(producer, streaming)
-    def unregisterProducer(self):
-        if IConsumer.providedBy(self.target):
-            self.target.unregisterProducer()
-    def write(self, ciphertext):
-        plaintext = self._decryptor.process(ciphertext)
-        self.target.write(plaintext)
-    def open(self, size):
-        self.target.open(size)
-    def close(self):
-        self.target.close()
-    def finish(self):
-        return self.target.finish()
-    # The following methods is just to pass through to the next target, and
-    # just because that target might be a repairer.DownUpConnector, and just
-    # because the current CHKUpload object expects to find the storage index
-    # in its Uploadable.
-    def set_storageindex(self, storageindex):
-        self.target.set_storageindex(storageindex)
-    def set_encodingparams(self, encodingparams):
-        self.target.set_encodingparams(encodingparams)
-
-class ValidatedThingObtainer:
-    def __init__(self, validatedthingproxies, debugname, log_id):
-        self._validatedthingproxies = validatedthingproxies
-        self._debugname = debugname
-        self._log_id = log_id
-
-    def _bad(self, f, validatedthingproxy):
-        f.trap(RemoteException, DeadReferenceError,
-               IntegrityCheckReject, layout.LayoutInvalid,
-               layout.ShareVersionIncompatible)
-        level = log.WEIRD
-        if f.check(DeadReferenceError):
-            level = log.UNUSUAL
-        elif f.check(RemoteException):
-            level = log.WEIRD
-        else:
-            level = log.SCARY
-        log.msg(parent=self._log_id, facility="tahoe.immutable.download",
-                format="operation %(op)s from validatedthingproxy %(validatedthingproxy)s failed",
-                op=self._debugname, validatedthingproxy=str(validatedthingproxy),
-                failure=f, level=level, umid="JGXxBA")
-        if not self._validatedthingproxies:
-            raise UnableToFetchCriticalDownloadDataError("ran out of peers, last error was %s" % (f,))
-        # try again with a different one
-        d = self._try_the_next_one()
-        return d
-
-    def _try_the_next_one(self):
-        vtp = self._validatedthingproxies.pop(0)
-        # start() obtains, validates, and callsback-with the thing or else
-        # errbacks
-        d = vtp.start()
-        d.addErrback(self._bad, vtp)
-        return d
-
-    def start(self):
-        return self._try_the_next_one()
-
-class ValidatedCrypttextHashTreeProxy:
-    implements(IValidatedThingProxy)
-    """ I am a front-end for a remote crypttext hash tree using a local
-    ReadBucketProxy -- I use its get_crypttext_hashes() method and offer the
-    Validated Thing protocol (i.e., I have a start() method that fires with
-    self once I get a valid one)."""
-    def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments,
-                 fetch_failures=None):
-        # fetch_failures is for debugging -- see test_encode.py
-        self._readbucketproxy = readbucketproxy
-        self._num_segments = num_segments
-        self._fetch_failures = fetch_failures
-        self._crypttext_hash_tree = crypttext_hash_tree
-
-    def _validate(self, proposal):
-        ct_hashes = dict(list(enumerate(proposal)))
-        try:
-            self._crypttext_hash_tree.set_hashes(ct_hashes)
-        except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
-            if self._fetch_failures is not None:
-                self._fetch_failures["crypttext_hash_tree"] += 1
-            raise BadOrMissingHash(le)
-        # If we now have enough of the crypttext hash tree to integrity-check
-        # *any* segment of ciphertext, then we are done. TODO: It would have
-        # better alacrity if we downloaded only part of the crypttext hash
-        # tree at a time.
-        for segnum in range(self._num_segments):
-            if self._crypttext_hash_tree.needed_hashes(segnum):
-                raise BadOrMissingHash("not enough hashes to validate segment number %d" % (segnum,))
-        return self
-
-    def start(self):
-        d = self._readbucketproxy.get_crypttext_hashes()
-        d.addCallback(self._validate)
-        return d
-
-class ValidatedExtendedURIProxy:
-    implements(IValidatedThingProxy)
-    """ I am a front-end for a remote UEB (using a local ReadBucketProxy),
-    responsible for retrieving and validating the elements from the UEB."""
-
-    def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
-        # fetch_failures is for debugging -- see test_encode.py
-        self._fetch_failures = fetch_failures
-        self._readbucketproxy = readbucketproxy
-        precondition(IVerifierURI.providedBy(verifycap), verifycap)
-        self._verifycap = verifycap
-
-        # required
-        self.segment_size = None
-        self.crypttext_root_hash = None
-        self.share_root_hash = None
-
-        # computed
-        self.block_size = None
-        self.share_size = None
-        self.num_segments = None
-        self.tail_data_size = None
-        self.tail_segment_size = None
-
-        # optional
-        self.crypttext_hash = None
-
-    def __str__(self):
-        return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
-
-    def _check_integrity(self, data):
-        h = hashutil.uri_extension_hash(data)
-        if h != self._verifycap.uri_extension_hash:
-            msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
-                   (self._readbucketproxy,
-                    base32.b2a(self._verifycap.uri_extension_hash),
-                    base32.b2a(h)))
-            if self._fetch_failures is not None:
-                self._fetch_failures["uri_extension"] += 1
-            raise BadURIExtensionHashValue(msg)
-        else:
-            return data
-
-    def _parse_and_validate(self, data):
-        self.share_size = mathutil.div_ceil(self._verifycap.size,
-                                            self._verifycap.needed_shares)
-
-        d = uri.unpack_extension(data)
-
-        # There are several kinds of things that can be found in a UEB.
-        # First, things that we really need to learn from the UEB in order to
-        # do this download. Next: things which are optional but not redundant
-        # -- if they are present in the UEB they will get used. Next, things
-        # that are optional and redundant. These things are required to be
-        # consistent: they don't have to be in the UEB, but if they are in
-        # the UEB then they will be checked for consistency with the
-        # already-known facts, and if they are inconsistent then an exception
-        # will be raised. These things aren't actually used -- they are just
-        # tested for consistency and ignored. Finally: things which are
-        # deprecated -- they ought not be in the UEB at all, and if they are
-        # present then a warning will be logged but they are otherwise
-        # ignored.
-
-        # First, things that we really need to learn from the UEB:
-        # segment_size, crypttext_root_hash, and share_root_hash.
-        self.segment_size = d['segment_size']
-
-        self.block_size = mathutil.div_ceil(self.segment_size,
-                                            self._verifycap.needed_shares)
-        self.num_segments = mathutil.div_ceil(self._verifycap.size,
-                                              self.segment_size)
-
-        self.tail_data_size = self._verifycap.size % self.segment_size
-        if not self.tail_data_size:
-            self.tail_data_size = self.segment_size
-        # padding for erasure code
-        self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
-                                                        self._verifycap.needed_shares)
-
-        # Ciphertext hash tree root is mandatory, so that there is at most
-        # one ciphertext that matches this read-cap or verify-cap. The
-        # integrity check on the shares is not sufficient to prevent the
-        # original encoder from creating some shares of file A and other
-        # shares of file B.
-        self.crypttext_root_hash = d['crypttext_root_hash']
-
-        self.share_root_hash = d['share_root_hash']
-
-
-        # Next: things that are optional and not redundant: crypttext_hash
-        if d.has_key('crypttext_hash'):
-            self.crypttext_hash = d['crypttext_hash']
-            if len(self.crypttext_hash) != hashutil.CRYPTO_VAL_SIZE:
-                raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
-
-
-        # Next: things that are optional, redundant, and required to be
-        # consistent: codec_name, codec_params, tail_codec_params,
-        # num_segments, size, needed_shares, total_shares
-        if d.has_key('codec_name'):
-            if d['codec_name'] != "crs":
-                raise UnsupportedErasureCodec(d['codec_name'])
-
-        if d.has_key('codec_params'):
-            ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
-            if ucpss != self.segment_size:
-                raise BadURIExtension("inconsistent erasure code params: "
-                                      "ucpss: %s != self.segment_size: %s" %
-                                      (ucpss, self.segment_size))
-            if ucpns != self._verifycap.needed_shares:
-                raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
-                                      "self._verifycap.needed_shares: %s" %
-                                      (ucpns, self._verifycap.needed_shares))
-            if ucpts != self._verifycap.total_shares:
-                raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
-                                      "self._verifycap.total_shares: %s" %
-                                      (ucpts, self._verifycap.total_shares))
-
-        if d.has_key('tail_codec_params'):
-            utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
-            if utcpss != self.tail_segment_size:
-                raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
-                                      "self.tail_segment_size: %s, self._verifycap.size: %s, "
-                                      "self.segment_size: %s, self._verifycap.needed_shares: %s"
-                                      % (utcpss, self.tail_segment_size, self._verifycap.size,
-                                         self.segment_size, self._verifycap.needed_shares))
-            if utcpns != self._verifycap.needed_shares:
-                raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
-                                      "self._verifycap.needed_shares: %s" % (utcpns,
-                                                                             self._verifycap.needed_shares))
-            if utcpts != self._verifycap.total_shares:
-                raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
-                                      "self._verifycap.total_shares: %s" % (utcpts,
-                                                                            self._verifycap.total_shares))
-
-        if d.has_key('num_segments'):
-            if d['num_segments'] != self.num_segments:
-                raise BadURIExtension("inconsistent num_segments: size: %s, "
-                                      "segment_size: %s, computed_num_segments: %s, "
-                                      "ueb_num_segments: %s" % (self._verifycap.size,
-                                                                self.segment_size,
-                                                                self.num_segments, d['num_segments']))
-
-        if d.has_key('size'):
-            if d['size'] != self._verifycap.size:
-                raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
-                                      (self._verifycap.size, d['size']))
-
-        if d.has_key('needed_shares'):
-            if d['needed_shares'] != self._verifycap.needed_shares:
-                raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
-                                      "needed shares: %s" % (self._verifycap.total_shares,
-                                                             d['needed_shares']))
-
-        if d.has_key('total_shares'):
-            if d['total_shares'] != self._verifycap.total_shares:
-                raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
-                                      "total shares: %s" % (self._verifycap.total_shares,
-                                                            d['total_shares']))
-
-        # Finally, things that are deprecated and ignored: plaintext_hash,
-        # plaintext_root_hash
-        if d.get('plaintext_hash'):
-            log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
-                    "and is no longer used.  Ignoring.  %s" % (self,))
-        if d.get('plaintext_root_hash'):
-            log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
-                    "reasons and is no longer used.  Ignoring.  %s" % (self,))
-
-        return self
-
-    def start(self):
-        """Fetch the UEB from bucket, compare its hash to the hash from
-        verifycap, then parse it. Returns a deferred which is called back
-        with self once the fetch is successful, or is erred back if it
-        fails."""
-        d = self._readbucketproxy.get_uri_extension()
-        d.addCallback(self._check_integrity)
-        d.addCallback(self._parse_and_validate)
-        return d
-
-class ValidatedReadBucketProxy(log.PrefixingLogMixin):
-    """I am a front-end for a remote storage bucket, responsible for
-    retrieving and validating data from that bucket.
-
-    My get_block() method is used by BlockDownloaders.
-    """
-
-    def __init__(self, sharenum, bucket, share_hash_tree, num_blocks,
-                 block_size, share_size):
-        """ share_hash_tree is required to have already been initialized with
-        the root hash (the number-0 hash), using the share_root_hash from the
-        UEB"""
-        precondition(share_hash_tree[0] is not None, share_hash_tree)
-        prefix = "%d-%s-%s" % (sharenum, bucket,
-                               base32.b2a_l(share_hash_tree[0][:8], 60))
-        log.PrefixingLogMixin.__init__(self,
-                                       facility="tahoe.immutable.download",
-                                       prefix=prefix)
-        self.sharenum = sharenum
-        self.bucket = bucket
-        self.share_hash_tree = share_hash_tree
-        self.num_blocks = num_blocks
-        self.block_size = block_size
-        self.share_size = share_size
-        self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
-
-    def get_all_sharehashes(self):
-        """Retrieve and validate all the share-hash-tree nodes that are
-        included in this share, regardless of whether we need them to
-        validate the share or not. Each share contains a minimal Merkle tree
-        chain, but there is lots of overlap, so usually we'll be using hashes
-        from other shares and not reading every single hash from this share.
-        The Verifier uses this function to read and validate every single
-        hash from this share.
-
-        Call this (and wait for the Deferred it returns to fire) before
-        calling get_block() for the first time: this lets us check that the
-        share share contains enough hashes to validate its own data, and
-        avoids downloading any share hash twice.
-
-        I return a Deferred which errbacks upon failure, probably with
-        BadOrMissingHash."""
-
-        d = self.bucket.get_share_hashes()
-        def _got_share_hashes(sh):
-            sharehashes = dict(sh)
-            try:
-                self.share_hash_tree.set_hashes(sharehashes)
-            except IndexError, le:
-                raise BadOrMissingHash(le)
-            except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
-                raise BadOrMissingHash(le)
-        d.addCallback(_got_share_hashes)
-        return d
-
-    def get_all_blockhashes(self):
-        """Retrieve and validate all the block-hash-tree nodes that are
-        included in this share. Each share contains a full Merkle tree, but
-        we usually only fetch the minimal subset necessary for any particular
-        block. This function fetches everything at once. The Verifier uses
-        this function to validate the block hash tree.
-
-        Call this (and wait for the Deferred it returns to fire) after
-        calling get_all_sharehashes() and before calling get_block() for the
-        first time: this lets us check that the share contains all block
-        hashes and avoids downloading them multiple times.
-
-        I return a Deferred which errbacks upon failure, probably with
-        BadOrMissingHash.
-        """
-
-        # get_block_hashes(anything) currently always returns everything
-        needed = list(range(len(self.block_hash_tree)))
-        d = self.bucket.get_block_hashes(needed)
-        def _got_block_hashes(blockhashes):
-            if len(blockhashes) < len(self.block_hash_tree):
-                raise BadOrMissingHash()
-            bh = dict(enumerate(blockhashes))
-
-            try:
-                self.block_hash_tree.set_hashes(bh)
-            except IndexError, le:
-                raise BadOrMissingHash(le)
-            except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
-                raise BadOrMissingHash(le)
-        d.addCallback(_got_block_hashes)
-        return d
-
-    def get_all_crypttext_hashes(self, crypttext_hash_tree):
-        """Retrieve and validate all the crypttext-hash-tree nodes that are
-        in this share. Normally we don't look at these at all: the download
-        process fetches them incrementally as needed to validate each segment
-        of ciphertext. But this is a convenient place to give the Verifier a
-        function to validate all of these at once.
-
-        Call this with a new hashtree object for each share, initialized with
-        the crypttext hash tree root. I return a Deferred which errbacks upon
-        failure, probably with BadOrMissingHash.
-        """
-
-        # get_crypttext_hashes() always returns everything
-        d = self.bucket.get_crypttext_hashes()
-        def _got_crypttext_hashes(hashes):
-            if len(hashes) < len(crypttext_hash_tree):
-                raise BadOrMissingHash()
-            ct_hashes = dict(enumerate(hashes))
-            try:
-                crypttext_hash_tree.set_hashes(ct_hashes)
-            except IndexError, le:
-                raise BadOrMissingHash(le)
-            except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
-                raise BadOrMissingHash(le)
-        d.addCallback(_got_crypttext_hashes)
-        return d
-
-    def get_block(self, blocknum):
-        # the first time we use this bucket, we need to fetch enough elements
-        # of the share hash tree to validate it from our share hash up to the
-        # hashroot.
-        if self.share_hash_tree.needed_hashes(self.sharenum):
-            d1 = self.bucket.get_share_hashes()
-        else:
-            d1 = defer.succeed([])
-
-        # We might need to grab some elements of our block hash tree, to
-        # validate the requested block up to the share hash.
-        blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
-        # We don't need the root of the block hash tree, as that comes in the
-        # share tree.
-        blockhashesneeded.discard(0)
-        d2 = self.bucket.get_block_hashes(blockhashesneeded)
-
-        if blocknum < self.num_blocks-1:
-            thisblocksize = self.block_size
-        else:
-            thisblocksize = self.share_size % self.block_size
-            if thisblocksize == 0:
-                thisblocksize = self.block_size
-        d3 = self.bucket.get_block_data(blocknum,
-                                        self.block_size, thisblocksize)
-
-        dl = deferredutil.gatherResults([d1, d2, d3])
-        dl.addCallback(self._got_data, blocknum)
-        return dl
-
-    def _got_data(self, results, blocknum):
-        precondition(blocknum < self.num_blocks,
-                     self, blocknum, self.num_blocks)
-        sharehashes, blockhashes, blockdata = results
-        try:
-            sharehashes = dict(sharehashes)
-        except ValueError, le:
-            le.args = tuple(le.args + (sharehashes,))
-            raise
-        blockhashes = dict(enumerate(blockhashes))
-
-        candidate_share_hash = None # in case we log it in the except block below
-        blockhash = None # in case we log it in the except block below
-
-        try:
-            if self.share_hash_tree.needed_hashes(self.sharenum):
-                # This will raise exception if the values being passed do not
-                # match the root node of self.share_hash_tree.
-                try:
-                    self.share_hash_tree.set_hashes(sharehashes)
-                except IndexError, le:
-                    # Weird -- sharehashes contained index numbers outside of
-                    # the range that fit into this hash tree.
-                    raise BadOrMissingHash(le)
-
-            # To validate a block we need the root of the block hash tree,
-            # which is also one of the leafs of the share hash tree, and is
-            # called "the share hash".
-            if not self.block_hash_tree[0]: # empty -- no root node yet
-                # Get the share hash from the share hash tree.
-                share_hash = self.share_hash_tree.get_leaf(self.sharenum)
-                if not share_hash:
-                    # No root node in block_hash_tree and also the share hash
-                    # wasn't sent by the server.
-                    raise hashtree.NotEnoughHashesError
-                self.block_hash_tree.set_hashes({0: share_hash})
-
-            if self.block_hash_tree.needed_hashes(blocknum):
-                self.block_hash_tree.set_hashes(blockhashes)
-
-            blockhash = hashutil.block_hash(blockdata)
-            self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
-            #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
-            #        "%r .. %r: %s" %
-            #        (self.sharenum, blocknum, len(blockdata),
-            #         blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
-
-        except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
-            # log.WEIRD: indicates undetected disk/network error, or more
-            # likely a programming error
-            self.log("hash failure in block=%d, shnum=%d on %s" %
-                    (blocknum, self.sharenum, self.bucket))
-            if self.block_hash_tree.needed_hashes(blocknum):
-                self.log(""" failure occurred when checking the block_hash_tree.
-                This suggests that either the block data was bad, or that the
-                block hashes we received along with it were bad.""")
-            else:
-                self.log(""" the failure probably occurred when checking the
-                share_hash_tree, which suggests that the share hashes we
-                received from the remote peer were bad.""")
-            self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
-            self.log(" block length: %d" % len(blockdata))
-            self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
-            if len(blockdata) < 100:
-                self.log(" block data: %r" % (blockdata,))
-            else:
-                self.log(" block data start/end: %r .. %r" %
-                        (blockdata[:50], blockdata[-50:]))
-            self.log(" share hash tree:\n" + self.share_hash_tree.dump())
-            self.log(" block hash tree:\n" + self.block_hash_tree.dump())
-            lines = []
-            for i,h in sorted(sharehashes.items()):
-                lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
-            self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
-            lines = []
-            for i,h in blockhashes.items():
-                lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
-            log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
-            raise BadOrMissingHash(le)
-
-        # If we made it here, the block is good. If the hash trees didn't
-        # like what they saw, they would have raised a BadHashError, causing
-        # our caller to see a Failure and thus ignore this block (as well as
-        # dropping this bucket).
-        return blockdata
-
-
-
-class BlockDownloader(log.PrefixingLogMixin):
-    """I am responsible for downloading a single block (from a single bucket)
-    for a single segment.
-
-    I am a child of the SegmentDownloader.
-    """
-
-    def __init__(self, vbucket, blocknum, parent, results):
-        precondition(isinstance(vbucket, ValidatedReadBucketProxy), vbucket)
-        prefix = "%s-%d" % (vbucket, blocknum)
-        log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
-        self.vbucket = vbucket
-        self.blocknum = blocknum
-        self.parent = parent
-        self.results = results
-
-    def start(self, segnum):
-        self.log("get_block(segnum=%d)" % segnum)
-        started = time.time()
-        d = self.vbucket.get_block(segnum)
-        d.addCallbacks(self._hold_block, self._got_block_error,
-                       callbackArgs=(started,))
-        return d
-
-    def _hold_block(self, data, started):
-        if self.results:
-            elapsed = time.time() - started
-            peerid = self.vbucket.bucket.get_peerid()
-            if peerid not in self.results.timings["fetch_per_server"]:
-                self.results.timings["fetch_per_server"][peerid] = []
-            self.results.timings["fetch_per_server"][peerid].append(elapsed)
-        self.log("got block")
-        self.parent.hold_block(self.blocknum, data)
-
-    def _got_block_error(self, f):
-        f.trap(RemoteException, DeadReferenceError,
-               IntegrityCheckReject, layout.LayoutInvalid,
-               layout.ShareVersionIncompatible)
-        if f.check(RemoteException, DeadReferenceError):
-            level = log.UNUSUAL
-        else:
-            level = log.WEIRD
-        self.log("failure to get block", level=level, umid="5Z4uHQ")
-        if self.results:
-            peerid = self.vbucket.bucket.get_peerid()
-            self.results.server_problems[peerid] = str(f)
-        self.parent.bucket_failed(self.vbucket)
-
-class SegmentDownloader:
-    """I am responsible for downloading all the blocks for a single segment
-    of data.
-
-    I am a child of the CiphertextDownloader.
-    """
-
-    def __init__(self, parent, segmentnumber, needed_shares, results):
-        self.parent = parent
-        self.segmentnumber = segmentnumber
-        self.needed_blocks = needed_shares
-        self.blocks = {} # k: blocknum, v: data
-        self.results = results
-        self._log_number = self.parent.log("starting segment %d" %
-                                           segmentnumber)
-
-    def log(self, *args, **kwargs):
-        if "parent" not in kwargs:
-            kwargs["parent"] = self._log_number
-        return self.parent.log(*args, **kwargs)
-
-    def start(self):
-        return self._download()
-
-    def _download(self):
-        d = self._try()
-        def _done(res):
-            if len(self.blocks) >= self.needed_blocks:
-                # we only need self.needed_blocks blocks
-                # we want to get the smallest blockids, because they are
-                # more likely to be fast "primary blocks"
-                blockids = sorted(self.blocks.keys())[:self.needed_blocks]
-                blocks = []
-                for blocknum in blockids:
-                    blocks.append(self.blocks[blocknum])
-                return (blocks, blockids)
-            else:
-                return self._download()
-        d.addCallback(_done)
-        return d
-
-    def _try(self):
-        # fill our set of active buckets, maybe raising NotEnoughSharesError
-        active_buckets = self.parent._activate_enough_buckets()
-        # Now we have enough buckets, in self.parent.active_buckets.
-
-        # in test cases, bd.start might mutate active_buckets right away, so
-        # we need to put off calling start() until we've iterated all the way
-        # through it.
-        downloaders = []
-        for blocknum, vbucket in active_buckets.iteritems():
-            assert isinstance(vbucket, ValidatedReadBucketProxy), vbucket
-            bd = BlockDownloader(vbucket, blocknum, self, self.results)
-            downloaders.append(bd)
-            if self.results:
-                self.results.servers_used.add(vbucket.bucket.get_peerid())
-        l = [bd.start(self.segmentnumber) for bd in downloaders]
-        return defer.DeferredList(l, fireOnOneErrback=True)
-
-    def hold_block(self, blocknum, data):
-        self.blocks[blocknum] = data
-
-    def bucket_failed(self, vbucket):
-        self.parent.bucket_failed(vbucket)
-
-class DownloadStatus:
-    implements(IDownloadStatus)
-    statusid_counter = itertools.count(0)
-
-    def __init__(self):
-        self.storage_index = None
-        self.size = None
-        self.helper = False
-        self.status = "Not started"
-        self.progress = 0.0
-        self.paused = False
-        self.stopped = False
-        self.active = True
-        self.results = None
-        self.counter = self.statusid_counter.next()
-        self.started = time.time()
-
-    def get_started(self):
-        return self.started
-    def get_storage_index(self):
-        return self.storage_index
-    def get_size(self):
-        return self.size
-    def using_helper(self):
-        return self.helper
-    def get_status(self):
-        status = self.status
-        if self.paused:
-            status += " (output paused)"
-        if self.stopped:
-            status += " (output stopped)"
-        return status
-    def get_progress(self):
-        return self.progress
-    def get_active(self):
-        return self.active
-    def get_results(self):
-        return self.results
-    def get_counter(self):
-        return self.counter
-
-    def set_storage_index(self, si):
-        self.storage_index = si
-    def set_size(self, size):
-        self.size = size
-    def set_helper(self, helper):
-        self.helper = helper
-    def set_status(self, status):
-        self.status = status
-    def set_paused(self, paused):
-        self.paused = paused
-    def set_stopped(self, stopped):
-        self.stopped = stopped
-    def set_progress(self, value):
-        self.progress = value
-    def set_active(self, value):
-        self.active = value
-    def set_results(self, value):
-        self.results = value
-
-class CiphertextDownloader(log.PrefixingLogMixin):
-    """ I download shares, check their integrity, then decode them, check the
-    integrity of the resulting ciphertext, then and write it to my target.
-    Before I send any new request to a server, I always ask the 'monitor'
-    object that was passed into my constructor whether this task has been
-    cancelled (by invoking its raise_if_cancelled() method)."""
-    implements(IPushProducer)
-    _status = None
-
-    def __init__(self, storage_broker, v, target, monitor):
-
-        precondition(IStorageBroker.providedBy(storage_broker), storage_broker)
-        precondition(IVerifierURI.providedBy(v), v)
-        precondition(IDownloadTarget.providedBy(target), target)
-
-        self._storage_broker = storage_broker
-        self._verifycap = v
-        self._storage_index = v.get_storage_index()
-        self._uri_extension_hash = v.uri_extension_hash
-
-        prefix=base32.b2a_l(self._storage_index[:8], 60)
-        log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
-
-        self._started = time.time()
-        self._status = s = DownloadStatus()
-        s.set_status("Starting")
-        s.set_storage_index(self._storage_index)
-        s.set_size(self._verifycap.size)
-        s.set_helper(False)
-        s.set_active(True)
-
-        self._results = DownloadResults()
-        s.set_results(self._results)
-        self._results.file_size = self._verifycap.size
-        self._results.timings["servers_peer_selection"] = {}
-        self._results.timings["fetch_per_server"] = {}
-        self._results.timings["cumulative_fetch"] = 0.0
-        self._results.timings["cumulative_decode"] = 0.0
-        self._results.timings["cumulative_decrypt"] = 0.0
-        self._results.timings["paused"] = 0.0
-
-        self._paused = False
-        self._stopped = False
-        if IConsumer.providedBy(target):
-            target.registerProducer(self, True)
-        self._target = target
-        # Repairer (uploader) needs the storageindex.
-        self._target.set_storageindex(self._storage_index)
-        self._monitor = monitor
-        self._opened = False
-
-        self.active_buckets = {} # k: shnum, v: bucket
-        self._share_buckets = {} # k: sharenum, v: list of buckets
-
-        # _download_all_segments() will set this to:
-        # self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
-        self._share_vbuckets = None
-
-        self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
-
-        self._ciphertext_hasher = hashutil.crypttext_hasher()
-
-        self._bytes_done = 0
-        self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
-
-        # _got_uri_extension() will create the following:
-        # self._crypttext_hash_tree
-        # self._share_hash_tree
-        # self._current_segnum = 0
-        # self._vup # ValidatedExtendedURIProxy
-
-        # _get_all_shareholders() will create the following:
-        # self._total_queries
-        # self._responses_received = 0
-        # self._queries_failed = 0
-
-        # This is solely for the use of unit tests. It will be triggered when
-        # we start downloading shares.
-        self._stage_4_d = defer.Deferred()
-
-    def pauseProducing(self):
-        if self._paused:
-            return
-        self._paused = defer.Deferred()
-        self._paused_at = time.time()
-        if self._status:
-            self._status.set_paused(True)
-
-    def resumeProducing(self):
-        if self._paused:
-            paused_for = time.time() - self._paused_at
-            self._results.timings['paused'] += paused_for
-            p = self._paused
-            self._paused = None
-            eventually(p.callback, None)
-            if self._status:
-                self._status.set_paused(False)
-
-    def stopProducing(self):
-        self.log("Download.stopProducing")
-        self._stopped = True
-        self.resumeProducing()
-        if self._status:
-            self._status.set_stopped(True)
-            self._status.set_active(False)
-
-    def start(self):
-        self.log("starting download")
-
-        # first step: who should we download from?
-        d = defer.maybeDeferred(self._get_all_shareholders)
-        d.addBoth(self._got_all_shareholders)
-        # now get the uri_extension block from somebody and integrity check
-        # it and parse and validate its contents
-        d.addCallback(self._obtain_uri_extension)
-        d.addCallback(self._get_crypttext_hash_tree)
-        # once we know that, we can download blocks from everybody
-        d.addCallback(self._download_all_segments)
-        def _finished(res):
-            if self._status:
-                self._status.set_status("Finished")
-                self._status.set_active(False)
-                self._status.set_paused(False)
-            if IConsumer.providedBy(self._target):
-                self._target.unregisterProducer()
-            return res
-        d.addBoth(_finished)
-        def _failed(why):
-            if self._status:
-                self._status.set_status("Failed")
-                self._status.set_active(False)
-            if why.check(DownloadStopped):
-                # DownloadStopped just means the consumer aborted the
-                # download; not so scary.
-                self.log("download stopped", level=log.UNUSUAL)
-            else:
-                # This is really unusual, and deserves maximum forensics.
-                self.log("download failed!", failure=why, level=log.SCARY,
-                         umid="lp1vaQ")
-            return why
-        d.addErrback(_failed)
-        d.addCallback(self._done)
-        return d
-
-    def _get_all_shareholders(self):
-        """ Once the number of buckets that I know about is >= K then I
-        callback the Deferred that I return.
-
-        If all of the get_buckets deferreds have fired (whether callback
-        or errback) and I still don't have enough buckets then I'll also
-        callback -- not errback -- the Deferred that I return.
-        """
-        wait_for_enough_buckets_d = defer.Deferred()
-        self._wait_for_enough_buckets_d = wait_for_enough_buckets_d
-
-        sb = self._storage_broker
-        servers = sb.get_servers_for_index(self._storage_index)
-        if not servers:
-            raise NoServersError("broker gave us no servers!")
-
-        self._total_queries = len(servers)
-        self._responses_received = 0
-        self._queries_failed = 0
-        for (peerid,ss) in servers:
-            self.log(format="sending DYHB to [%(peerid)s]",
-                     peerid=idlib.shortnodeid_b2a(peerid),
-                     level=log.NOISY, umid="rT03hg")
-            d = ss.callRemote("get_buckets", self._storage_index)
-            d.addCallbacks(self._got_response, self._got_error,
-                           callbackArgs=(peerid,))
-            d.addBoth(self._check_got_all_responses)
-
-        if self._status:
-            self._status.set_status("Locating Shares (%d/%d)" %
-                                    (self._responses_received,
-                                     self._total_queries))
-        return wait_for_enough_buckets_d
-
-    def _check_got_all_responses(self, ignored=None):
-        assert (self._responses_received+self._queries_failed) <= self._total_queries
-        if self._wait_for_enough_buckets_d and (self._responses_received+self._queries_failed) == self._total_queries:
-            reactor.callLater(0, self._wait_for_enough_buckets_d.callback, False)
-            self._wait_for_enough_buckets_d = None
-
-    def _got_response(self, buckets, peerid):
-        # Note that this can continue to receive responses after _wait_for_enough_buckets_d
-        # has fired.
-        self._responses_received += 1
-        self.log(format="got results from [%(peerid)s]: shnums %(shnums)s",
-                 peerid=idlib.shortnodeid_b2a(peerid),
-                 shnums=sorted(buckets.keys()),
-                 level=log.NOISY, umid="o4uwFg")
-        if self._results:
-            elapsed = time.time() - self._started
-            self._results.timings["servers_peer_selection"][peerid] = elapsed
-        if self._status:
-            self._status.set_status("Locating Shares (%d/%d)" %
-                                    (self._responses_received,
-                                     self._total_queries))
-        for sharenum, bucket in buckets.iteritems():
-            b = layout.ReadBucketProxy(bucket, peerid, self._storage_index)
-            self.add_share_bucket(sharenum, b)
-            # If we just got enough buckets for the first time, then fire the
-            # deferred. Then remove it from self so that we don't fire it
-            # again.
-            if self._wait_for_enough_buckets_d and len(self._share_buckets) >= self._verifycap.needed_shares:
-                reactor.callLater(0, self._wait_for_enough_buckets_d.callback, True)
-                self._wait_for_enough_buckets_d = None
-
-            if self._share_vbuckets is not None:
-                vbucket = ValidatedReadBucketProxy(sharenum, b, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
-                self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
-
-            if self._results:
-                if peerid not in self._results.servermap:
-                    self._results.servermap[peerid] = set()
-                self._results.servermap[peerid].add(sharenum)
-
-    def add_share_bucket(self, sharenum, bucket):
-        # this is split out for the benefit of test_encode.py
-        self._share_buckets.setdefault(sharenum, []).append(bucket)
-
-    def _got_error(self, f):
-        self._queries_failed += 1
-        level = log.WEIRD
-        if f.check(DeadReferenceError):
-            level = log.UNUSUAL
-        self.log("Error during get_buckets", failure=f, level=level,
-                         umid="3uuBUQ")
-
-    def bucket_failed(self, vbucket):
-        shnum = vbucket.sharenum
-        del self.active_buckets[shnum]
-        s = self._share_vbuckets[shnum]
-        # s is a set of ValidatedReadBucketProxy instances
-        s.remove(vbucket)
-        # ... which might now be empty
-        if not s:
-            # there are no more buckets which can provide this share, so
-            # remove the key. This may prompt us to use a different share.
-            del self._share_vbuckets[shnum]
-
-    def _got_all_shareholders(self, res):
-        if self._results:
-            now = time.time()
-            self._results.timings["peer_selection"] = now - self._started
-
-        if len(self._share_buckets) < self._verifycap.needed_shares:
-            msg = "Failed to get enough shareholders: have %d, need %d" \
-                  % (len(self._share_buckets), self._verifycap.needed_shares)
-            if self._share_buckets:
-                raise NotEnoughSharesError(msg)
-            else:
-                raise NoSharesError(msg)
-
-        #for s in self._share_vbuckets.values():
-        #    for vb in s:
-        #        assert isinstance(vb, ValidatedReadBucketProxy), \
-        #               "vb is %s but should be a ValidatedReadBucketProxy" % (vb,)
-
-    def _obtain_uri_extension(self, ignored):
-        # all shareholders are supposed to have a copy of uri_extension, and
-        # all are supposed to be identical. We compute the hash of the data
-        # that comes back, and compare it against the version in our URI. If
-        # they don't match, ignore their data and try someone else.
-        if self._status:
-            self._status.set_status("Obtaining URI Extension")
-
-        uri_extension_fetch_started = time.time()
-
-        vups = []
-        for sharenum, buckets in self._share_buckets.iteritems():
-            for bucket in buckets:
-                vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
-        vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
-        d = vto.start()
-
-        def _got_uri_extension(vup):
-            precondition(isinstance(vup, ValidatedExtendedURIProxy), vup)
-            if self._results:
-                elapsed = time.time() - uri_extension_fetch_started
-                self._results.timings["uri_extension"] = elapsed
-
-            self._vup = vup
-            self._codec = codec.CRSDecoder()
-            self._codec.set_params(self._vup.segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
-            self._tail_codec = codec.CRSDecoder()
-            self._tail_codec.set_params(self._vup.tail_segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
-
-            self._current_segnum = 0
-
-            self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares)
-            self._share_hash_tree.set_hashes({0: vup.share_root_hash})
-
-            self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
-            self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash})
-
-            # Repairer (uploader) needs the encodingparams.
-            self._target.set_encodingparams((
-                self._verifycap.needed_shares,
-                0, # see ticket #778 for why this is
-                self._verifycap.total_shares,
-                self._vup.segment_size
-                ))
-        d.addCallback(_got_uri_extension)
-        return d
-
-    def _get_crypttext_hash_tree(self, res):
-        vchtps = []
-        for sharenum, buckets in self._share_buckets.iteritems():
-            for bucket in buckets:
-                vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
-                vchtps.append(vchtp)
-
-        _get_crypttext_hash_tree_started = time.time()
-        if self._status:
-            self._status.set_status("Retrieving crypttext hash tree")
-
-        vto = ValidatedThingObtainer(vchtps, debugname="vchtps",
-                                     log_id=self._parentmsgid)
-        d = vto.start()
-
-        def _got_crypttext_hash_tree(res):
-            # Good -- the self._crypttext_hash_tree that we passed to vchtp
-            # is now populated with hashes.
-            if self._results:
-                elapsed = time.time() - _get_crypttext_hash_tree_started
-                self._results.timings["hashtrees"] = elapsed
-        d.addCallback(_got_crypttext_hash_tree)
-        return d
-
-    def _activate_enough_buckets(self):
-        """either return a mapping from shnum to a ValidatedReadBucketProxy
-        that can provide data for that share, or raise NotEnoughSharesError"""
-
-        while len(self.active_buckets) < self._verifycap.needed_shares:
-            # need some more
-            handled_shnums = set(self.active_buckets.keys())
-            available_shnums = set(self._share_vbuckets.keys())
-            potential_shnums = list(available_shnums - handled_shnums)
-            if len(potential_shnums) < (self._verifycap.needed_shares
-                                        - len(self.active_buckets)):
-                have = len(potential_shnums) + len(self.active_buckets)
-                msg = "Unable to activate enough shares: have %d, need %d" \
-                      % (have, self._verifycap.needed_shares)
-                if have:
-                    raise NotEnoughSharesError(msg)
-                else:
-                    raise NoSharesError(msg)
-            # For the next share, choose a primary share if available, else a
-            # randomly chosen secondary share.
-            potential_shnums.sort()
-            if potential_shnums[0] < self._verifycap.needed_shares:
-                shnum = potential_shnums[0]
-            else:
-                shnum = random.choice(potential_shnums)
-            # and a random bucket that will provide it
-            validated_bucket = random.choice(list(self._share_vbuckets[shnum]))
-            self.active_buckets[shnum] = validated_bucket
-        return self.active_buckets
-
-
-    def _download_all_segments(self, res):
-        # From now on if new buckets are received then I will notice that
-        # self._share_vbuckets is not None and generate a vbucket for that new
-        # bucket and add it in to _share_vbuckets. (We had to wait because we
-        # didn't have self._vup and self._share_hash_tree earlier. We didn't
-        # need validated buckets until now -- now that we are ready to download
-        # shares.)
-        self._share_vbuckets = {}
-        for sharenum, buckets in self._share_buckets.iteritems():
-            for bucket in buckets:
-                vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
-                self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
-
-        # after the above code, self._share_vbuckets contains enough
-        # buckets to complete the download, and some extra ones to
-        # tolerate some buckets dropping out or having
-        # errors. self._share_vbuckets is a dictionary that maps from
-        # shnum to a set of ValidatedBuckets, which themselves are
-        # wrappers around RIBucketReader references.
-        self.active_buckets = {} # k: shnum, v: ValidatedReadBucketProxy instance
-
-        self._started_fetching = time.time()
-
-        d = defer.succeed(None)
-        for segnum in range(self._vup.num_segments):
-            d.addCallback(self._download_segment, segnum)
-            # this pause, at the end of write, prevents pre-fetch from
-            # happening until the consumer is ready for more data.
-            d.addCallback(self._check_for_pause)
-
-        self._stage_4_d.callback(None)
-        return d
-
-    def _check_for_pause(self, res):
-        if self._paused:
-            d = defer.Deferred()
-            self._paused.addCallback(lambda ignored: d.callback(res))
-            return d
-        if self._stopped:
-            raise DownloadStopped("our Consumer called stopProducing()")
-        self._monitor.raise_if_cancelled()
-        return res
-
-    def _download_segment(self, res, segnum):
-        if self._status:
-            self._status.set_status("Downloading segment %d of %d" %
-                                    (segnum+1, self._vup.num_segments))
-        self.log("downloading seg#%d of %d (%d%%)"
-                 % (segnum, self._vup.num_segments,
-                    100.0 * segnum / self._vup.num_segments))
-        # memory footprint: when the SegmentDownloader finishes pulling down
-        # all shares, we have 1*segment_size of usage.
-        segmentdler = SegmentDownloader(self, segnum,
-                                        self._verifycap.needed_shares,
-                                        self._results)
-        started = time.time()
-        d = segmentdler.start()
-        def _finished_fetching(res):
-            elapsed = time.time() - started
-            self._results.timings["cumulative_fetch"] += elapsed
-            return res
-        if self._results:
-            d.addCallback(_finished_fetching)
-        # pause before using more memory
-        d.addCallback(self._check_for_pause)
-        # while the codec does its job, we hit 2*segment_size
-        def _started_decode(res):
-            self._started_decode = time.time()
-            return res
-        if self._results:
-            d.addCallback(_started_decode)
-        if segnum + 1 == self._vup.num_segments:
-            codec = self._tail_codec
-        else:
-            codec = self._codec
-        d.addCallback(lambda (shares, shareids): codec.decode(shares, shareids))
-        # once the codec is done, we drop back to 1*segment_size, because
-        # 'shares' goes out of scope. The memory usage is all in the
-        # plaintext now, spread out into a bunch of tiny buffers.
-        def _finished_decode(res):
-            elapsed = time.time() - self._started_decode
-            self._results.timings["cumulative_decode"] += elapsed
-            return res
-        if self._results:
-            d.addCallback(_finished_decode)
-
-        # pause/check-for-stop just before writing, to honor stopProducing
-        d.addCallback(self._check_for_pause)
-        d.addCallback(self._got_segment)
-        return d
-
-    def _got_segment(self, buffers):
-        precondition(self._crypttext_hash_tree)
-        started_decrypt = time.time()
-        self._status.set_progress(float(self._current_segnum)/self._verifycap.size)
-
-        if self._current_segnum + 1 == self._vup.num_segments:
-            # This is the last segment.
-            # Trim off any padding added by the upload side. We never send
-            # empty segments. If the data was an exact multiple of the
-            # segment size, the last segment will be full.
-            tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares)
-            num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size)
-            # Remove buffers which don't contain any part of the tail.
-            del buffers[num_buffers_used:]
-            # Remove the past-the-tail-part of the last buffer.
-            tail_in_last_buf = self._vup.tail_data_size % tail_buf_size
-            if tail_in_last_buf == 0:
-                tail_in_last_buf = tail_buf_size
-            buffers[-1] = buffers[-1][:tail_in_last_buf]
-
-        # First compute the hash of this segment and check that it fits.
-        ch = hashutil.crypttext_segment_hasher()
-        for buffer in buffers:
-            self._ciphertext_hasher.update(buffer)
-            ch.update(buffer)
-        self._crypttext_hash_tree.set_hashes(leaves={self._current_segnum: ch.digest()})
-
-        # Then write this segment to the target.
-        if not self._opened:
-            self._opened = True
-            self._target.open(self._verifycap.size)
-
-        for buffer in buffers:
-            self._target.write(buffer)
-            self._bytes_done += len(buffer)
-
-        self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
-        self._current_segnum += 1
-
-        if self._results:
-            elapsed = time.time() - started_decrypt
-            self._results.timings["cumulative_decrypt"] += elapsed
-
-    def _done(self, res):
-        self.log("download done")
-        if self._results:
-            now = time.time()
-            self._results.timings["total"] = now - self._started
-            self._results.timings["segments"] = now - self._started_fetching
-        if self._vup.crypttext_hash:
-            _assert(self._vup.crypttext_hash == self._ciphertext_hasher.digest(),
-                    "bad crypttext_hash: computed=%s, expected=%s" %
-                    (base32.b2a(self._ciphertext_hasher.digest()),
-                     base32.b2a(self._vup.crypttext_hash)))
-        _assert(self._bytes_done == self._verifycap.size, self._bytes_done, self._verifycap.size)
-        self._status.set_progress(1)
-        self._target.close()
-        return self._target.finish()
-    def get_download_status(self):
-        return self._status
-
-
-class ConsumerAdapter:
-    implements(IDownloadTarget, IConsumer)
-    def __init__(self, consumer):
-        self._consumer = consumer
-
-    def registerProducer(self, producer, streaming):
-        self._consumer.registerProducer(producer, streaming)
-    def unregisterProducer(self):
-        self._consumer.unregisterProducer()
-
-    def open(self, size):
-        pass
-    def write(self, data):
-        self._consumer.write(data)
-    def close(self):
-        pass
-
-    def fail(self, why):
-        pass
-    def register_canceller(self, cb):
-        pass
-    def finish(self):
-        return self._consumer
-    # The following methods are just because the target might be a
-    # repairer.DownUpConnector, and just because the current CHKUpload object
-    # expects to find the storage index and encoding parameters in its
-    # Uploadable.
-    def set_storageindex(self, storageindex):
-        pass
-    def set_encodingparams(self, encodingparams):
-        pass
-
-
-class Downloader:
-    """I am a service that allows file downloading.
-    """
-    # TODO: in fact, this service only downloads immutable files (URI:CHK:).
-    # It is scheduled to go away, to be replaced by filenode.download()
-    implements(IDownloader)
-
-    def __init__(self, storage_broker, stats_provider):
-        self.storage_broker = storage_broker
-        self.stats_provider = stats_provider
-        self._all_downloads = weakref.WeakKeyDictionary() # for debugging
-
-    def download(self, u, t, _log_msg_id=None, monitor=None, history=None):
-        assert isinstance(u, uri.CHKFileURI)
-        t = IDownloadTarget(t)
-        assert t.write
-        assert t.close
-
-        if self.stats_provider:
-            # these counters are meant for network traffic, and don't
-            # include LIT files
-            self.stats_provider.count('downloader.files_downloaded', 1)
-            self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
-
-        target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
-        if not monitor:
-            monitor=Monitor()
-        dl = CiphertextDownloader(self.storage_broker,
-                                  u.get_verify_cap(), target,
-                                  monitor=monitor)
-        self._all_downloads[dl] = None
-        if history:
-            history.add_download(dl.get_download_status())
-        d = dl.start()
-        return d
diff --git a/src/allmydata/immutable/downloader/__init__.py b/src/allmydata/immutable/downloader/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/src/allmydata/immutable/downloader/common.py b/src/allmydata/immutable/downloader/common.py
new file mode 100644
index 00000000..e9dd2719
--- /dev/null
+++ b/src/allmydata/immutable/downloader/common.py
@@ -0,0 +1,13 @@
+
+(AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM) = \
+ ("AVAILABLE", "PENDING", "OVERDUE", "COMPLETE", "CORRUPT", "DEAD", "BADSEGNUM")
+
+class BadSegmentNumberError(Exception):
+    pass
+class WrongSegmentError(Exception):
+    pass
+class BadCiphertextHashError(Exception):
+    pass
+
+class DownloadStopped(Exception):
+    pass
diff --git a/src/allmydata/immutable/downloader/fetcher.py b/src/allmydata/immutable/downloader/fetcher.py
new file mode 100644
index 00000000..3918f659
--- /dev/null
+++ b/src/allmydata/immutable/downloader/fetcher.py
@@ -0,0 +1,229 @@
+
+from twisted.python.failure import Failure
+from foolscap.api import eventually
+from allmydata.interfaces import NotEnoughSharesError, NoSharesError
+from allmydata.util import log
+from allmydata.util.dictutil import DictOfSets
+from common import AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, \
+     BADSEGNUM, BadSegmentNumberError
+
+class SegmentFetcher:
+    """I am responsible for acquiring blocks for a single segment. I will use
+    the Share instances passed to my add_shares() method to locate, retrieve,
+    and validate those blocks. I expect my parent node to call my
+    no_more_shares() method when there are no more shares available. I will
+    call my parent's want_more_shares() method when I want more: I expect to
+    see at least one call to add_shares or no_more_shares afterwards.
+
+    When I have enough validated blocks, I will call my parent's
+    process_blocks() method with a dictionary that maps shnum to blockdata.
+    If I am unable to provide enough blocks, I will call my parent's
+    fetch_failed() method with (self, f). After either of these events, I
+    will shut down and do no further work. My parent can also call my stop()
+    method to have me shut down early."""
+
+    def __init__(self, node, segnum, k):
+        self._node = node # _Node
+        self.segnum = segnum
+        self._k = k
+        self._shares = {} # maps non-dead Share instance to a state, one of
+                          # (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT).
+                          # State transition map is:
+                          #  AVAILABLE -(send-read)-> PENDING
+                          #  PENDING -(timer)-> OVERDUE
+                          #  PENDING -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM
+                          #  OVERDUE -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM
+                          # If a share becomes DEAD, it is removed from the
+                          # dict. If it becomes BADSEGNUM, the whole fetch is
+                          # terminated.
+        self._share_observers = {} # maps Share to EventStreamObserver for
+                                   # active ones
+        self._shnums = DictOfSets() # maps shnum to the shares that provide it
+        self._blocks = {} # maps shnum to validated block data
+        self._no_more_shares = False
+        self._bad_segnum = False
+        self._last_failure = None
+        self._running = True
+
+    def stop(self):
+        log.msg("SegmentFetcher(%s).stop" % self._node._si_prefix,
+                level=log.NOISY, umid="LWyqpg")
+        self._cancel_all_requests()
+        self._running = False
+        self._shares.clear() # let GC work # ??? XXX
+
+
+    # called by our parent _Node
+
+    def add_shares(self, shares):
+        # called when ShareFinder locates a new share, and when a non-initial
+        # segment fetch is started and we already know about shares from the
+        # previous segment
+        for s in shares:
+            self._shares[s] = AVAILABLE
+            self._shnums.add(s._shnum, s)
+        eventually(self.loop)
+
+    def no_more_shares(self):
+        # ShareFinder tells us it's reached the end of its list
+        self._no_more_shares = True
+        eventually(self.loop)
+
+    # internal methods
+
+    def _count_shnums(self, *states):
+        """shnums for which at least one state is in the following list"""
+        shnums = []
+        for shnum,shares in self._shnums.iteritems():
+            matches = [s for s in shares if self._shares.get(s) in states]
+            if matches:
+                shnums.append(shnum)
+        return len(shnums)
+
+    def loop(self):
+        try:
+            # if any exception occurs here, kill the download
+            self._do_loop()
+        except BaseException:
+            self._node.fetch_failed(self, Failure())
+            raise
+
+    def _do_loop(self):
+        k = self._k
+        if not self._running:
+            return
+        if self._bad_segnum:
+            # oops, we were asking for a segment number beyond the end of the
+            # file. This is an error.
+            self.stop()
+            e = BadSegmentNumberError("segnum=%d, numsegs=%d" %
+                                      (self.segnum, self._node.num_segments))
+            f = Failure(e)
+            self._node.fetch_failed(self, f)
+            return
+
+        # are we done?
+        if self._count_shnums(COMPLETE) >= k:
+            # yay!
+            self.stop()
+            self._node.process_blocks(self.segnum, self._blocks)
+            return
+
+        # we may have exhausted everything
+        if (self._no_more_shares and
+            self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) < k):
+            # no more new shares are coming, and the remaining hopeful shares
+            # aren't going to be enough. boo!
+
+            log.msg("share states: %r" % (self._shares,),
+                    level=log.NOISY, umid="0ThykQ")
+            if self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) == 0:
+                format = ("no shares (need %(k)d)."
+                          " Last failure: %(last_failure)s")
+                args = { "k": k,
+                         "last_failure": self._last_failure }
+                error = NoSharesError
+            else:
+                format = ("ran out of shares: %(complete)d complete,"
+                          " %(pending)d pending, %(overdue)d overdue,"
+                          " %(unused)d unused, need %(k)d."
+                          " Last failure: %(last_failure)s")
+                args = {"complete": self._count_shnums(COMPLETE),
+                        "pending": self._count_shnums(PENDING),
+                        "overdue": self._count_shnums(OVERDUE),
+                        # 'unused' should be zero
+                        "unused": self._count_shnums(AVAILABLE),
+                        "k": k,
+                        "last_failure": self._last_failure,
+                        }
+                error = NotEnoughSharesError
+            log.msg(format=format, level=log.UNUSUAL, umid="1DsnTg", **args)
+            e = error(format % args)
+            f = Failure(e)
+            self.stop()
+            self._node.fetch_failed(self, f)
+            return
+
+        # nope, not done. Are we "block-hungry" (i.e. do we want to send out
+        # more read requests, or do we think we have enough in flight
+        # already?)
+        while self._count_shnums(PENDING, COMPLETE) < k:
+            # we're hungry.. are there any unused shares?
+            sent = self._send_new_request()
+            if not sent:
+                break
+
+        # ok, now are we "share-hungry" (i.e. do we have enough known shares
+        # to make us happy, or should we ask the ShareFinder to get us more?)
+        if self._count_shnums(AVAILABLE, PENDING, COMPLETE) < k:
+            # we're hungry for more shares
+            self._node.want_more_shares()
+            # that will trigger the ShareFinder to keep looking
+
+    def _find_one(self, shares, state):
+        # TODO could choose fastest
+        for s in shares:
+            if self._shares[s] == state:
+                return s
+        # can never get here, caller has assert in case of code bug
+
+    def _send_new_request(self):
+        for shnum,shares in sorted(self._shnums.iteritems()):
+            states = [self._shares[s] for s in shares]
+            if COMPLETE in states or PENDING in states:
+                # don't send redundant requests
+                continue
+            if AVAILABLE not in states:
+                # no candidates for this shnum, move on
+                continue
+            # here's a candidate. Send a request.
+            s = self._find_one(shares, AVAILABLE)
+            assert s
+            self._shares[s] = PENDING
+            self._share_observers[s] = o = s.get_block(self.segnum)
+            o.subscribe(self._block_request_activity, share=s, shnum=shnum)
+            # TODO: build up a list of candidates, then walk through the
+            # list, sending requests to the most desireable servers,
+            # re-checking our block-hunger each time. For non-initial segment
+            # fetches, this would let us stick with faster servers.
+            return True
+        # nothing was sent: don't call us again until you have more shares to
+        # work with, or one of the existing shares has been declared OVERDUE
+        return False
+
+    def _cancel_all_requests(self):
+        for o in self._share_observers.values():
+            o.cancel()
+        self._share_observers = {}
+
+    def _block_request_activity(self, share, shnum, state, block=None, f=None):
+        # called by Shares, in response to our s.send_request() calls.
+        if not self._running:
+            return
+        log.msg("SegmentFetcher(%s)._block_request_activity:"
+                " Share(sh%d-on-%s) -> %s" %
+                (self._node._si_prefix, shnum, share._peerid_s, state),
+                level=log.NOISY, umid="vilNWA")
+        # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal.
+        if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM):
+            self._share_observers.pop(share, None)
+        if state is COMPLETE:
+            # 'block' is fully validated
+            self._shares[share] = COMPLETE
+            self._blocks[shnum] = block
+        elif state is OVERDUE:
+            self._shares[share] = OVERDUE
+            # OVERDUE is not terminal: it will eventually transition to
+            # COMPLETE, CORRUPT, or DEAD.
+        elif state is CORRUPT:
+            self._shares[share] = CORRUPT
+        elif state is DEAD:
+            del self._shares[share]
+            self._shnums[shnum].remove(share)
+            self._last_failure = f
+        elif state is BADSEGNUM:
+            self._shares[share] = BADSEGNUM # ???
+            self._bad_segnum = True
+        eventually(self.loop)
+
+
diff --git a/src/allmydata/immutable/downloader/finder.py b/src/allmydata/immutable/downloader/finder.py
new file mode 100644
index 00000000..1a2a94da
--- /dev/null
+++ b/src/allmydata/immutable/downloader/finder.py
@@ -0,0 +1,202 @@
+
+import time
+now = time.time
+from foolscap.api import eventually
+from allmydata.util import base32, log, idlib
+
+from share import Share, CommonShare
+
+def incidentally(res, f, *args, **kwargs):
+    """Add me to a Deferred chain like this:
+     d.addBoth(incidentally, func, arg)
+    and I'll behave as if you'd added the following function:
+     def _(res):
+         func(arg)
+         return res
+    This is useful if you want to execute an expression when the Deferred
+    fires, but don't care about its value.
+    """
+    f(*args, **kwargs)
+    return res
+
+class RequestToken:
+    def __init__(self, peerid):
+        self.peerid = peerid
+
+class ShareFinder:
+    def __init__(self, storage_broker, verifycap, node, download_status,
+                 logparent=None, max_outstanding_requests=10):
+        self.running = True # stopped by Share.stop, from Terminator
+        self.verifycap = verifycap
+        self._started = False
+        self._storage_broker = storage_broker
+        self.share_consumer = self.node = node
+        self.max_outstanding_requests = max_outstanding_requests
+
+        self._hungry = False
+
+        self._commonshares = {} # shnum to CommonShare instance
+        self.undelivered_shares = []
+        self.pending_requests = set()
+
+        self._storage_index = verifycap.storage_index
+        self._si_prefix = base32.b2a_l(self._storage_index[:8], 60)
+        self._node_logparent = logparent
+        self._download_status = download_status
+        self._lp = log.msg(format="ShareFinder[si=%(si)s] starting",
+                           si=self._si_prefix,
+                           level=log.NOISY, parent=logparent, umid="2xjj2A")
+
+    def start_finding_servers(self):
+        # don't get servers until somebody uses us: creating the
+        # ImmutableFileNode should not cause work to happen yet. Test case is
+        # test_dirnode, which creates us with storage_broker=None
+        if not self._started:
+            si = self.verifycap.storage_index
+            s = self._storage_broker.get_servers_for_index(si)
+            self._servers = iter(s)
+            self._started = True
+
+    def log(self, *args, **kwargs):
+        if "parent" not in kwargs:
+            kwargs["parent"] = self._lp
+        return log.msg(*args, **kwargs)
+
+    def stop(self):
+        self.running = False
+
+    # called by our parent CiphertextDownloader
+    def hungry(self):
+        self.log(format="ShareFinder[si=%(si)s] hungry",
+                 si=self._si_prefix, level=log.NOISY, umid="NywYaQ")
+        self.start_finding_servers()
+        self._hungry = True
+        eventually(self.loop)
+
+    # internal methods
+    def loop(self):
+        undelivered_s = ",".join(["sh%d@%s" %
+                                  (s._shnum, idlib.shortnodeid_b2a(s._peerid))
+                                  for s in self.undelivered_shares])
+        pending_s = ",".join([idlib.shortnodeid_b2a(rt.peerid)
+                              for rt in self.pending_requests]) # sort?
+        self.log(format="ShareFinder loop: running=%(running)s"
+                 " hungry=%(hungry)s, undelivered=%(undelivered)s,"
+                 " pending=%(pending)s",
+                 running=self.running, hungry=self._hungry,
+                 undelivered=undelivered_s, pending=pending_s,
+                 level=log.NOISY, umid="kRtS4Q")
+        if not self.running:
+            return
+        if not self._hungry:
+            return
+        if self.undelivered_shares:
+            sh = self.undelivered_shares.pop(0)
+            # they will call hungry() again if they want more
+            self._hungry = False
+            self.log(format="delivering Share(shnum=%(shnum)d, server=%(peerid)s)",
+                     shnum=sh._shnum, peerid=sh._peerid_s,
+                     level=log.NOISY, umid="2n1qQw")
+            eventually(self.share_consumer.got_shares, [sh])
+            return
+
+        if len(self.pending_requests) >= self.max_outstanding_requests:
+            # cannot send more requests, must wait for some to retire
+            return
+
+        server = None
+        try:
+            if self._servers:
+                server = self._servers.next()
+        except StopIteration:
+            self._servers = None
+
+        if server:
+            self.send_request(server)
+            # we loop again to get parallel queries. The check above will
+            # prevent us from looping forever.
+            eventually(self.loop)
+            return
+
+        if self.pending_requests:
+            # no server, but there are still requests in flight: maybe one of
+            # them will make progress
+            return
+
+        self.log(format="ShareFinder.loop: no_more_shares, ever",
+                 level=log.UNUSUAL, umid="XjQlzg")
+        # we've run out of servers (so we can't send any more requests), and
+        # we have nothing in flight. No further progress can be made. They
+        # are destined to remain hungry.
+        self.share_consumer.no_more_shares()
+
+    def send_request(self, server):
+        peerid, rref = server
+        req = RequestToken(peerid)
+        self.pending_requests.add(req)
+        lp = self.log(format="sending DYHB to [%(peerid)s]",
+                      peerid=idlib.shortnodeid_b2a(peerid),
+                      level=log.NOISY, umid="Io7pyg")
+        d_ev = self._download_status.add_dyhb_sent(peerid, now())
+        d = rref.callRemote("get_buckets", self._storage_index)
+        d.addBoth(incidentally, self.pending_requests.discard, req)
+        d.addCallbacks(self._got_response, self._got_error,
+                       callbackArgs=(rref.version, peerid, req, d_ev, lp),
+                       errbackArgs=(peerid, req, d_ev, lp))
+        d.addErrback(log.err, format="error in send_request",
+                     level=log.WEIRD, parent=lp, umid="rpdV0w")
+        d.addCallback(incidentally, eventually, self.loop)
+
+    def _got_response(self, buckets, server_version, peerid, req, d_ev, lp):
+        shnums = sorted([shnum for shnum in buckets])
+        d_ev.finished(shnums, now())
+        if buckets:
+            shnums_s = ",".join([str(shnum) for shnum in shnums])
+            self.log(format="got shnums [%(shnums)s] from [%(peerid)s]",
+                     shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid),
+                     level=log.NOISY, parent=lp, umid="0fcEZw")
+        else:
+            self.log(format="no shares from [%(peerid)s]",
+                     peerid=idlib.shortnodeid_b2a(peerid),
+                     level=log.NOISY, parent=lp, umid="U7d4JA")
+        if self.node.num_segments is None:
+            best_numsegs = self.node.guessed_num_segments
+        else:
+            best_numsegs = self.node.num_segments
+        for shnum, bucket in buckets.iteritems():
+            self._create_share(best_numsegs, shnum, bucket, server_version,
+                               peerid)
+
+    def _create_share(self, best_numsegs, shnum, bucket, server_version,
+                      peerid):
+        if shnum in self._commonshares:
+            cs = self._commonshares[shnum]
+        else:
+            cs = CommonShare(best_numsegs, self._si_prefix, shnum,
+                             self._node_logparent)
+            # Share._get_satisfaction is responsible for updating
+            # CommonShare.set_numsegs after we know the UEB. Alternatives:
+            #  1: d = self.node.get_num_segments()
+            #     d.addCallback(cs.got_numsegs)
+            #   the problem is that the OneShotObserverList I was using
+            #   inserts an eventual-send between _get_satisfaction's
+            #   _satisfy_UEB and _satisfy_block_hash_tree, and the
+            #   CommonShare didn't get the num_segs message before
+            #   being asked to set block hash values. To resolve this
+            #   would require an immediate ObserverList instead of
+            #   an eventual-send -based one
+            #  2: break _get_satisfaction into Deferred-attached pieces.
+            #     Yuck.
+            self._commonshares[shnum] = cs
+        s = Share(bucket, server_version, self.verifycap, cs, self.node,
+                  self._download_status, peerid, shnum,
+                  self._node_logparent)
+        self.undelivered_shares.append(s)
+
+    def _got_error(self, f, peerid, req, d_ev, lp):
+        d_ev.finished("error", now())
+        self.log(format="got error from [%(peerid)s]",
+                 peerid=idlib.shortnodeid_b2a(peerid), failure=f,
+                 level=log.UNUSUAL, parent=lp, umid="zUKdCw")
+
+
diff --git a/src/allmydata/immutable/downloader/node.py b/src/allmydata/immutable/downloader/node.py
new file mode 100644
index 00000000..2991c9ee
--- /dev/null
+++ b/src/allmydata/immutable/downloader/node.py
@@ -0,0 +1,471 @@
+
+import time
+now = time.time
+from twisted.python.failure import Failure
+from twisted.internet import defer
+from foolscap.api import eventually
+from allmydata import uri
+from allmydata.codec import CRSDecoder
+from allmydata.util import base32, log, hashutil, mathutil, observer
+from allmydata.interfaces import DEFAULT_MAX_SEGMENT_SIZE
+from allmydata.hashtree import IncompleteHashTree, BadHashError, \
+     NotEnoughHashesError
+
+# local imports
+from finder import ShareFinder
+from fetcher import SegmentFetcher
+from segmentation import Segmentation
+from common import BadCiphertextHashError
+
+class Cancel:
+    def __init__(self, f):
+        self._f = f
+        self.cancelled = False
+    def cancel(self):
+        if not self.cancelled:
+            self.cancelled = True
+            self._f(self)
+
+class DownloadNode:
+    """Internal class which manages downloads and holds state. External
+    callers use CiphertextFileNode instead."""
+
+    # Share._node points to me
+    def __init__(self, verifycap, storage_broker, secret_holder,
+                 terminator, history, download_status):
+        assert isinstance(verifycap, uri.CHKFileVerifierURI)
+        self._verifycap = verifycap
+        self._storage_broker = storage_broker
+        self._si_prefix = base32.b2a_l(verifycap.storage_index[:8], 60)
+        self.running = True
+        if terminator:
+            terminator.register(self) # calls self.stop() at stopService()
+        # the rules are:
+        # 1: Only send network requests if you're active (self.running is True)
+        # 2: Use TimerService, not reactor.callLater
+        # 3: You can do eventual-sends any time.
+        # These rules should mean that once
+        # stopService()+flushEventualQueue() fires, everything will be done.
+        self._secret_holder = secret_holder
+        self._history = history
+        self._download_status = download_status
+
+        k, N = self._verifycap.needed_shares, self._verifycap.total_shares
+        self.share_hash_tree = IncompleteHashTree(N)
+
+        # we guess the segment size, so Segmentation can pull non-initial
+        # segments in a single roundtrip. This populates
+        # .guessed_segment_size, .guessed_num_segments, and
+        # .ciphertext_hash_tree (with a dummy, to let us guess which hashes
+        # we'll need)
+        self._build_guessed_tables(DEFAULT_MAX_SEGMENT_SIZE)
+
+        # filled in when we parse a valid UEB
+        self.have_UEB = False
+        self.segment_size = None
+        self.tail_segment_size = None
+        self.tail_segment_padded = None
+        self.num_segments = None
+        self.block_size = None
+        self.tail_block_size = None
+
+        # things to track callers that want data
+
+        # _segment_requests can have duplicates
+        self._segment_requests = [] # (segnum, d, cancel_handle)
+        self._active_segment = None # a SegmentFetcher, with .segnum
+
+        self._segsize_observers = observer.OneShotObserverList()
+
+        # we create one top-level logparent for this _Node, and another one
+        # for each read() call. Segmentation and get_segment() messages are
+        # associated with the read() call, everything else is tied to the
+        # _Node's log entry.
+        lp = log.msg(format="Immutable _Node(%(si)s) created: size=%(size)d,"
+                     " guessed_segsize=%(guessed_segsize)d,"
+                     " guessed_numsegs=%(guessed_numsegs)d",
+                     si=self._si_prefix, size=verifycap.size,
+                     guessed_segsize=self.guessed_segment_size,
+                     guessed_numsegs=self.guessed_num_segments,
+                     level=log.OPERATIONAL, umid="uJ0zAQ")
+        self._lp = lp
+
+        self._sharefinder = ShareFinder(storage_broker, verifycap, self,
+                                        self._download_status, lp)
+        self._shares = set()
+
+    def _build_guessed_tables(self, max_segment_size):
+        size = min(self._verifycap.size, max_segment_size)
+        s = mathutil.next_multiple(size, self._verifycap.needed_shares)
+        self.guessed_segment_size = s
+        r = self._calculate_sizes(self.guessed_segment_size)
+        self.guessed_num_segments = r["num_segments"]
+        # as with CommonShare, our ciphertext_hash_tree is a stub until we
+        # get the real num_segments
+        self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments)
+
+    def __repr__(self):
+        return "Imm_Node(%s)" % (self._si_prefix,)
+
+    def stop(self):
+        # called by the Terminator at shutdown, mostly for tests
+        if self._active_segment:
+            self._active_segment.stop()
+            self._active_segment = None
+        self._sharefinder.stop()
+
+    # things called by outside callers, via CiphertextFileNode. get_segment()
+    # may also be called by Segmentation.
+
+    def read(self, consumer, offset=0, size=None, read_ev=None):
+        """I am the main entry point, from which FileNode.read() can get
+        data. I feed the consumer with the desired range of ciphertext. I
+        return a Deferred that fires (with the consumer) when the read is
+        finished.
+
+        Note that there is no notion of a 'file pointer': each call to read()
+        uses an independent offset= value."""
+        # for concurrent operations: each gets its own Segmentation manager
+        if size is None:
+            size = self._verifycap.size
+        # clip size so offset+size does not go past EOF
+        size = min(size, self._verifycap.size-offset)
+        if read_ev is None:
+            read_ev = self._download_status.add_read_event(offset, size, now())
+
+        lp = log.msg(format="imm Node(%(si)s).read(%(offset)d, %(size)d)",
+                     si=base32.b2a(self._verifycap.storage_index)[:8],
+                     offset=offset, size=size,
+                     level=log.OPERATIONAL, parent=self._lp, umid="l3j3Ww")
+        if self._history:
+            sp = self._history.stats_provider
+            sp.count("downloader.files_downloaded", 1) # really read() calls
+            sp.count("downloader.bytes_downloaded", size)
+        s = Segmentation(self, offset, size, consumer, read_ev, lp)
+        # this raises an interesting question: what segments to fetch? if
+        # offset=0, always fetch the first segment, and then allow
+        # Segmentation to be responsible for pulling the subsequent ones if
+        # the first wasn't large enough. If offset>0, we're going to need an
+        # extra roundtrip to get the UEB (and therefore the segment size)
+        # before we can figure out which segment to get. TODO: allow the
+        # offset-table-guessing code (which starts by guessing the segsize)
+        # to assist the offset>0 process.
+        d = s.start()
+        def _done(res):
+            read_ev.finished(now())
+            return res
+        d.addBoth(_done)
+        return d
+
+    def get_segment(self, segnum, logparent=None):
+        """Begin downloading a segment. I return a tuple (d, c): 'd' is a
+        Deferred that fires with (offset,data) when the desired segment is
+        available, and c is an object on which c.cancel() can be called to
+        disavow interest in the segment (after which 'd' will never fire).
+
+        You probably need to know the segment size before calling this,
+        unless you want the first few bytes of the file. If you ask for a
+        segment number which turns out to be too large, the Deferred will
+        errback with BadSegmentNumberError.
+
+        The Deferred fires with the offset of the first byte of the data
+        segment, so that you can call get_segment() before knowing the
+        segment size, and still know which data you received.
+
+        The Deferred can also errback with other fatal problems, such as
+        NotEnoughSharesError, NoSharesError, or BadCiphertextHashError.
+        """
+        log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)",
+                si=base32.b2a(self._verifycap.storage_index)[:8],
+                segnum=segnum,
+                level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
+        self._download_status.add_segment_request(segnum, now())
+        d = defer.Deferred()
+        c = Cancel(self._cancel_request)
+        self._segment_requests.append( (segnum, d, c) )
+        self._start_new_segment()
+        return (d, c)
+
+    def get_segsize(self):
+        """Return a Deferred that fires when we know the real segment size."""
+        if self.segment_size:
+            return defer.succeed(self.segment_size)
+        # TODO: this downloads (and discards) the first segment of the file.
+        # We could make this more efficient by writing
+        # fetcher.SegmentSizeFetcher, with the job of finding a single valid
+        # share and extracting the UEB. We'd add Share.get_UEB() to request
+        # just the UEB.
+        (d,c) = self.get_segment(0)
+        # this ensures that an error during get_segment() will errback the
+        # caller, so Repair won't wait forever on completely missing files
+        d.addCallback(lambda ign: self._segsize_observers.when_fired())
+        return d
+
+    # things called by the Segmentation object used to transform
+    # arbitrary-sized read() calls into quantized segment fetches
+
+    def _start_new_segment(self):
+        if self._active_segment is None and self._segment_requests:
+            segnum = self._segment_requests[0][0]
+            k = self._verifycap.needed_shares
+            log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d",
+                    node=repr(self), segnum=segnum,
+                    level=log.NOISY, umid="wAlnHQ")
+            self._active_segment = fetcher = SegmentFetcher(self, segnum, k)
+            active_shares = [s for s in self._shares if s.is_alive()]
+            fetcher.add_shares(active_shares) # this triggers the loop
+
+
+    # called by our child ShareFinder
+    def got_shares(self, shares):
+        self._shares.update(shares)
+        if self._active_segment:
+            self._active_segment.add_shares(shares)
+    def no_more_shares(self):
+        self._no_more_shares = True
+        if self._active_segment:
+            self._active_segment.no_more_shares()
+
+    # things called by our Share instances
+
+    def validate_and_store_UEB(self, UEB_s):
+        log.msg("validate_and_store_UEB",
+                level=log.OPERATIONAL, parent=self._lp, umid="7sTrPw")
+        h = hashutil.uri_extension_hash(UEB_s)
+        if h != self._verifycap.uri_extension_hash:
+            raise BadHashError
+        UEB_dict = uri.unpack_extension(UEB_s)
+        self._parse_and_store_UEB(UEB_dict) # sets self._stuff
+        # TODO: a malformed (but authentic) UEB could throw an assertion in
+        # _parse_and_store_UEB, and we should abandon the download.
+        self.have_UEB = True
+
+    def _parse_and_store_UEB(self, d):
+        # Note: the UEB contains needed_shares and total_shares. These are
+        # redundant and inferior (the filecap contains the authoritative
+        # values). However, because it is possible to encode the same file in
+        # multiple ways, and the encoders might choose (poorly) to use the
+        # same key for both (therefore getting the same SI), we might
+        # encounter shares for both types. The UEB hashes will be different,
+        # however, and we'll disregard the "other" encoding's shares as
+        # corrupted.
+
+        # therefore, we ignore d['total_shares'] and d['needed_shares'].
+
+        log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s",
+                ueb=repr(d), vcap=self._verifycap.to_string(),
+                level=log.NOISY, parent=self._lp, umid="cVqZnA")
+
+        k, N = self._verifycap.needed_shares, self._verifycap.total_shares
+
+        self.segment_size = d['segment_size']
+        self._segsize_observers.fire(self.segment_size)
+
+        r = self._calculate_sizes(self.segment_size)
+        self.tail_segment_size = r["tail_segment_size"]
+        self.tail_segment_padded = r["tail_segment_padded"]
+        self.num_segments = r["num_segments"]
+        self.block_size = r["block_size"]
+        self.tail_block_size = r["tail_block_size"]
+        log.msg("actual sizes: %s" % (r,),
+                level=log.NOISY, parent=self._lp, umid="PY6P5Q")
+        if (self.segment_size == self.guessed_segment_size
+            and self.num_segments == self.guessed_num_segments):
+            log.msg("my guess was right!",
+                    level=log.NOISY, parent=self._lp, umid="x340Ow")
+        else:
+            log.msg("my guess was wrong! Extra round trips for me.",
+                    level=log.NOISY, parent=self._lp, umid="tb7RJw")
+
+        # zfec.Decode() instantiation is fast, but still, let's use the same
+        # codec instance for all but the last segment. 3-of-10 takes 15us on
+        # my laptop, 25-of-100 is 900us, 3-of-255 is 97us, 25-of-255 is
+        # 2.5ms, worst-case 254-of-255 is 9.3ms
+        self._codec = CRSDecoder()
+        self._codec.set_params(self.segment_size, k, N)
+
+
+        # Ciphertext hash tree root is mandatory, so that there is at most
+        # one ciphertext that matches this read-cap or verify-cap. The
+        # integrity check on the shares is not sufficient to prevent the
+        # original encoder from creating some shares of file A and other
+        # shares of file B. self.ciphertext_hash_tree was a guess before:
+        # this is where we create it for real.
+        self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments)
+        self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
+
+        self.share_hash_tree.set_hashes({0: d['share_root_hash']})
+
+        # Our job is a fast download, not verification, so we ignore any
+        # redundant fields. The Verifier uses a different code path which
+        # does not ignore them.
+
+    def _calculate_sizes(self, segment_size):
+        # segments of ciphertext
+        size = self._verifycap.size
+        k = self._verifycap.needed_shares
+
+        # this assert matches the one in encode.py:127 inside
+        # Encoded._got_all_encoding_parameters, where the UEB is constructed
+        assert segment_size % k == 0
+
+        # the last segment is usually short. We don't store a whole segsize,
+        # but we do pad the segment up to a multiple of k, because the
+        # encoder requires that.
+        tail_segment_size = size % segment_size
+        if tail_segment_size == 0:
+            tail_segment_size = segment_size
+        padded = mathutil.next_multiple(tail_segment_size, k)
+        tail_segment_padded = padded
+
+        num_segments = mathutil.div_ceil(size, segment_size)
+
+        # each segment is turned into N blocks. All but the last are of size
+        # block_size, and the last is of size tail_block_size
+        block_size = segment_size / k
+        tail_block_size = tail_segment_padded / k
+
+        return { "tail_segment_size": tail_segment_size,
+                 "tail_segment_padded": tail_segment_padded,
+                 "num_segments": num_segments,
+                 "block_size": block_size,
+                 "tail_block_size": tail_block_size,
+                 }
+
+
+    def process_share_hashes(self, share_hashes):
+        for hashnum in share_hashes:
+            if hashnum >= len(self.share_hash_tree):
+                # "BadHashError" is normally for e.g. a corrupt block. We
+                # sort of abuse it here to mean a badly numbered hash (which
+                # indicates corruption in the number bytes, rather than in
+                # the data bytes).
+                raise BadHashError("hashnum %d doesn't fit in hashtree(%d)"
+                                   % (hashnum, len(self.share_hash_tree)))
+        self.share_hash_tree.set_hashes(share_hashes)
+
+    def get_needed_ciphertext_hashes(self, segnum):
+        cht = self.ciphertext_hash_tree
+        return cht.needed_hashes(segnum, include_leaf=True)
+    def process_ciphertext_hashes(self, hashes):
+        assert self.num_segments is not None
+        # this may raise BadHashError or NotEnoughHashesError
+        self.ciphertext_hash_tree.set_hashes(hashes)
+
+
+    # called by our child SegmentFetcher
+
+    def want_more_shares(self):
+        self._sharefinder.hungry()
+
+    def fetch_failed(self, sf, f):
+        assert sf is self._active_segment
+        self._active_segment = None
+        # deliver error upwards
+        for (d,c) in self._extract_requests(sf.segnum):
+            eventually(self._deliver, d, c, f)
+
+    def process_blocks(self, segnum, blocks):
+        d = defer.maybeDeferred(self._decode_blocks, segnum, blocks)
+        d.addCallback(self._check_ciphertext_hash, segnum)
+        def _deliver(result):
+            ds = self._download_status
+            if isinstance(result, Failure):
+                ds.add_segment_error(segnum, now())
+            else:
+                (offset, segment, decodetime) = result
+                ds.add_segment_delivery(segnum, now(),
+                                        offset, len(segment), decodetime)
+            log.msg(format="delivering segment(%(segnum)d)",
+                    segnum=segnum,
+                    level=log.OPERATIONAL, parent=self._lp,
+                    umid="j60Ojg")
+            for (d,c) in self._extract_requests(segnum):
+                eventually(self._deliver, d, c, result)
+            self._active_segment = None
+            self._start_new_segment()
+        d.addBoth(_deliver)
+        d.addErrback(lambda f:
+                     log.err("unhandled error during process_blocks",
+                             failure=f, level=log.WEIRD,
+                             parent=self._lp, umid="MkEsCg"))
+
+    def _decode_blocks(self, segnum, blocks):
+        tail = (segnum == self.num_segments-1)
+        codec = self._codec
+        block_size = self.block_size
+        decoded_size = self.segment_size
+        if tail:
+            # account for the padding in the last segment
+            codec = CRSDecoder()
+            k, N = self._verifycap.needed_shares, self._verifycap.total_shares
+            codec.set_params(self.tail_segment_padded, k, N)
+            block_size = self.tail_block_size
+            decoded_size = self.tail_segment_padded
+
+        shares = []
+        shareids = []
+        for (shareid, share) in blocks.iteritems():
+            assert len(share) == block_size
+            shareids.append(shareid)
+            shares.append(share)
+        del blocks
+
+        start = now()
+        d = codec.decode(shares, shareids)   # segment
+        del shares
+        def _process(buffers):
+            decodetime = now() - start
+            segment = "".join(buffers)
+            assert len(segment) == decoded_size
+            del buffers
+            if tail:
+                segment = segment[:self.tail_segment_size]
+            return (segment, decodetime)
+        d.addCallback(_process)
+        return d
+
+    def _check_ciphertext_hash(self, (segment, decodetime), segnum):
+        assert self._active_segment.segnum == segnum
+        assert self.segment_size is not None
+        offset = segnum * self.segment_size
+
+        h = hashutil.crypttext_segment_hash(segment)
+        try:
+            self.ciphertext_hash_tree.set_hashes(leaves={segnum: h})
+            return (offset, segment, decodetime)
+        except (BadHashError, NotEnoughHashesError):
+            format = ("hash failure in ciphertext_hash_tree:"
+                      " segnum=%(segnum)d, SI=%(si)s")
+            log.msg(format=format, segnum=segnum, si=self._si_prefix,
+                    failure=Failure(),
+                    level=log.WEIRD, parent=self._lp, umid="MTwNnw")
+            # this is especially weird, because we made it past the share
+            # hash tree. It implies that we're using the wrong encoding, or
+            # that the uploader deliberately constructed a bad UEB.
+            msg = format % {"segnum": segnum, "si": self._si_prefix}
+            raise BadCiphertextHashError(msg)
+
+    def _deliver(self, d, c, result):
+        # this method exists to handle cancel() that occurs between
+        # _got_segment and _deliver
+        if not c.cancelled:
+            d.callback(result) # might actually be an errback
+
+    def _extract_requests(self, segnum):
+        """Remove matching requests and return their (d,c) tuples so that the
+        caller can retire them."""
+        retire = [(d,c) for (segnum0, d, c) in self._segment_requests
+                  if segnum0 == segnum]
+        self._segment_requests = [t for t in self._segment_requests
+                                  if t[0] != segnum]
+        return retire
+
+    def _cancel_request(self, c):
+        self._segment_requests = [t for t in self._segment_requests
+                                  if t[2] != c]
+        segnums = [segnum for (segnum,d,c) in self._segment_requests]
+        if self._active_segment.segnum not in segnums:
+            self._active_segment.stop()
+            self._active_segment = None
+            self._start_new_segment()
diff --git a/src/allmydata/immutable/downloader/segmentation.py b/src/allmydata/immutable/downloader/segmentation.py
new file mode 100644
index 00000000..4890195c
--- /dev/null
+++ b/src/allmydata/immutable/downloader/segmentation.py
@@ -0,0 +1,160 @@
+
+import time
+now = time.time
+from zope.interface import implements
+from twisted.internet import defer
+from twisted.internet.interfaces import IPushProducer
+from foolscap.api import eventually
+from allmydata.util import log
+from allmydata.util.spans import overlap
+
+from common import BadSegmentNumberError, WrongSegmentError, DownloadStopped
+
+class Segmentation:
+    """I am responsible for a single offset+size read of the file. I handle
+    segmentation: I figure out which segments are necessary, request them
+    (from my CiphertextDownloader) in order, and trim the segments down to
+    match the offset+size span. I use the Producer/Consumer interface to only
+    request one segment at a time.
+    """
+    implements(IPushProducer)
+    def __init__(self, node, offset, size, consumer, read_ev, logparent=None):
+        self._node = node
+        self._hungry = True
+        self._active_segnum = None
+        self._cancel_segment_request = None
+        # these are updated as we deliver data. At any given time, we still
+        # want to download file[offset:offset+size]
+        self._offset = offset
+        self._size = size
+        assert offset+size <= node._verifycap.size
+        self._consumer = consumer
+        self._read_ev = read_ev
+        self._start_pause = None
+        self._lp = logparent
+
+    def start(self):
+        self._alive = True
+        self._deferred = defer.Deferred()
+        self._consumer.registerProducer(self, True)
+        self._maybe_fetch_next()
+        return self._deferred
+
+    def _maybe_fetch_next(self):
+        if not self._alive or not self._hungry:
+            return
+        if self._active_segnum is not None:
+            return
+        self._fetch_next()
+
+    def _fetch_next(self):
+        if self._size == 0:
+            # done!
+            self._alive = False
+            self._hungry = False
+            self._consumer.unregisterProducer()
+            self._deferred.callback(self._consumer)
+            return
+        n = self._node
+        have_actual_segment_size = n.segment_size is not None
+        guess_s = ""
+        if not have_actual_segment_size:
+            guess_s = "probably "
+        segment_size = n.segment_size or n.guessed_segment_size
+        if self._offset == 0:
+            # great! we want segment0 for sure
+            wanted_segnum = 0
+        else:
+            # this might be a guess
+            wanted_segnum = self._offset // segment_size
+        log.msg(format="_fetch_next(offset=%(offset)d) %(guess)swants segnum=%(segnum)d",
+                offset=self._offset, guess=guess_s, segnum=wanted_segnum,
+                level=log.NOISY, parent=self._lp, umid="5WfN0w")
+        self._active_segnum = wanted_segnum
+        d,c = n.get_segment(wanted_segnum, self._lp)
+        self._cancel_segment_request = c
+        d.addBoth(self._request_retired)
+        d.addCallback(self._got_segment, wanted_segnum)
+        if not have_actual_segment_size:
+            # we can retry once
+            d.addErrback(self._retry_bad_segment)
+        d.addErrback(self._error)
+
+    def _request_retired(self, res):
+        self._active_segnum = None
+        self._cancel_segment_request = None
+        return res
+
+    def _got_segment(self, (segment_start,segment,decodetime), wanted_segnum):
+        self._cancel_segment_request = None
+        # we got file[segment_start:segment_start+len(segment)]
+        # we want file[self._offset:self._offset+self._size]
+        log.msg(format="Segmentation got data:"
+                " want [%(wantstart)d-%(wantend)d),"
+                " given [%(segstart)d-%(segend)d), for segnum=%(segnum)d",
+                wantstart=self._offset, wantend=self._offset+self._size,
+                segstart=segment_start, segend=segment_start+len(segment),
+                segnum=wanted_segnum,
+                level=log.OPERATIONAL, parent=self._lp, umid="32dHcg")
+
+        o = overlap(segment_start, len(segment),  self._offset, self._size)
+        # the overlap is file[o[0]:o[0]+o[1]]
+        if not o or o[0] != self._offset:
+            # we didn't get the first byte, so we can't use this segment
+            log.msg("Segmentation handed wrong data:"
+                    " want [%d-%d), given [%d-%d), for segnum=%d,"
+                    " for si=%s"
+                    % (self._offset, self._offset+self._size,
+                       segment_start, segment_start+len(segment),
+                       wanted_segnum, self._node._si_prefix),
+                    level=log.UNUSUAL, parent=self._lp, umid="STlIiA")
+            # we may retry if the segnum we asked was based on a guess
+            raise WrongSegmentError("I was given the wrong data.")
+        offset_in_segment = self._offset - segment_start
+        desired_data = segment[offset_in_segment:offset_in_segment+o[1]]
+
+        self._offset += len(desired_data)
+        self._size -= len(desired_data)
+        self._consumer.write(desired_data)
+        # the consumer might call our .pauseProducing() inside that write()
+        # call, setting self._hungry=False
+        self._read_ev.update(len(desired_data), 0, 0)
+        self._maybe_fetch_next()
+
+    def _retry_bad_segment(self, f):
+        f.trap(WrongSegmentError, BadSegmentNumberError)
+        # we guessed the segnum wrong: either one that doesn't overlap with
+        # the start of our desired region, or one that's beyond the end of
+        # the world. Now that we have the right information, we're allowed to
+        # retry once.
+        assert self._node.segment_size is not None
+        return self._maybe_fetch_next()
+
+    def _error(self, f):
+        log.msg("Error in Segmentation", failure=f,
+                level=log.WEIRD, parent=self._lp, umid="EYlXBg")
+        self._alive = False
+        self._hungry = False
+        self._consumer.unregisterProducer()
+        self._deferred.errback(f)
+
+    def stopProducing(self):
+        self._hungry = False
+        self._alive = False
+        # cancel any outstanding segment request
+        if self._cancel_segment_request:
+            self._cancel_segment_request.cancel()
+            self._cancel_segment_request = None
+        e = DownloadStopped("our Consumer called stopProducing()")
+        self._deferred.errback(e)
+
+    def pauseProducing(self):
+        self._hungry = False
+        self._start_pause = now()
+    def resumeProducing(self):
+        self._hungry = True
+        eventually(self._maybe_fetch_next)
+        if self._start_pause is not None:
+            paused = now() - self._start_pause
+            self._read_ev.update(0, 0, paused)
+            self._start_pause = None
diff --git a/src/allmydata/immutable/downloader/share.py b/src/allmydata/immutable/downloader/share.py
new file mode 100644
index 00000000..e3c90176
--- /dev/null
+++ b/src/allmydata/immutable/downloader/share.py
@@ -0,0 +1,848 @@
+
+import struct
+import time
+now = time.time
+
+from twisted.python.failure import Failure
+from foolscap.api import eventually
+from allmydata.util import base32, log, hashutil, mathutil
+from allmydata.util.spans import Spans, DataSpans
+from allmydata.interfaces import HASH_SIZE
+from allmydata.hashtree import IncompleteHashTree, BadHashError, \
+     NotEnoughHashesError
+
+from allmydata.immutable.layout import make_write_bucket_proxy
+from allmydata.util.observer import EventStreamObserver
+from common import COMPLETE, CORRUPT, DEAD, BADSEGNUM
+
+
+class LayoutInvalid(Exception):
+    pass
+class DataUnavailable(Exception):
+    pass
+
+class Share:
+    """I represent a single instance of a single share (e.g. I reference the
+    shnum2 for share SI=abcde on server xy12t, not the one on server ab45q).
+    I am associated with a CommonShare that remembers data that is held in
+    common among e.g. SI=abcde/shnum2 across all servers. I am also
+    associated with a CiphertextFileNode for e.g. SI=abcde (all shares, all
+    servers).
+    """
+    # this is a specific implementation of IShare for tahoe's native storage
+    # servers. A different backend would use a different class.
+
+    def __init__(self, rref, server_version, verifycap, commonshare, node,
+                 download_status, peerid, shnum, logparent):
+        self._rref = rref
+        self._server_version = server_version
+        self._node = node # holds share_hash_tree and UEB
+        self.actual_segment_size = node.segment_size # might still be None
+        # XXX change node.guessed_segment_size to
+        # node.best_guess_segment_size(), which should give us the real ones
+        # if known, else its guess.
+        self._guess_offsets(verifycap, node.guessed_segment_size)
+        self.actual_offsets = None
+        self._UEB_length = None
+        self._commonshare = commonshare # holds block_hash_tree
+        self._download_status = download_status
+        self._peerid = peerid
+        self._peerid_s = base32.b2a(peerid)[:5]
+        self._storage_index = verifycap.storage_index
+        self._si_prefix = base32.b2a(verifycap.storage_index)[:8]
+        self._shnum = shnum
+        # self._alive becomes False upon fatal corruption or server error
+        self._alive = True
+        self._lp = log.msg(format="%(share)s created", share=repr(self),
+                           level=log.NOISY, parent=logparent, umid="P7hv2w")
+
+        self._pending = Spans() # request sent but no response received yet
+        self._received = DataSpans() # ACK response received, with data
+        self._unavailable = Spans() # NAK response received, no data
+
+        # any given byte of the share can be in one of four states:
+        #  in: _wanted, _requested, _received
+        #      FALSE    FALSE       FALSE : don't care about it at all
+        #      TRUE     FALSE       FALSE : want it, haven't yet asked for it
+        #      TRUE     TRUE        FALSE : request is in-flight
+        #                                   or didn't get it
+        #      FALSE    TRUE        TRUE  : got it, haven't used it yet
+        #      FALSE    TRUE        FALSE : got it and used it
+        #      FALSE    FALSE       FALSE : block consumed, ready to ask again
+        #
+        # when we request data and get a NAK, we leave it in _requested
+        # to remind ourself to not ask for it again. We don't explicitly
+        # remove it from anything (maybe this should change).
+        #
+        # We retain the hashtrees in the Node, so we leave those spans in
+        # _requested (and never ask for them again, as long as the Node is
+        # alive). But we don't retain data blocks (too big), so when we
+        # consume a data block, we remove it from _requested, so a later
+        # download can re-fetch it.
+
+        self._requested_blocks = [] # (segnum, set(observer2..))
+        ver = server_version["http://allmydata.org/tahoe/protocols/storage/v1"]
+        self._overrun_ok = ver["tolerates-immutable-read-overrun"]
+        # If _overrun_ok and we guess the offsets correctly, we can get
+        # everything in one RTT. If _overrun_ok and we guess wrong, we might
+        # need two RTT (but we could get lucky and do it in one). If overrun
+        # is *not* ok (tahoe-1.3.0 or earlier), we need four RTT: 1=version,
+        # 2=offset table, 3=UEB_length and everything else (hashes, block),
+        # 4=UEB.
+
+        self.had_corruption = False # for unit tests
+
+    def __repr__(self):
+        return "Share(sh%d-on-%s)" % (self._shnum, self._peerid_s)
+
+    def is_alive(self):
+        # XXX: reconsider. If the share sees a single error, should it remain
+        # dead for all time? Or should the next segment try again? This DEAD
+        # state is stored elsewhere too (SegmentFetcher per-share states?)
+        # and needs to be consistent. We clear _alive in self._fail(), which
+        # is called upon a network error, or layout failure, or hash failure
+        # in the UEB or a hash tree. We do not _fail() for a hash failure in
+        # a block, but of course we still tell our callers about
+        # state=CORRUPT so they'll find a different share.
+        return self._alive
+
+    def _guess_offsets(self, verifycap, guessed_segment_size):
+        self.guessed_segment_size = guessed_segment_size
+        size = verifycap.size
+        k = verifycap.needed_shares
+        N = verifycap.total_shares
+        r = self._node._calculate_sizes(guessed_segment_size)
+        # num_segments, block_size/tail_block_size
+        # guessed_segment_size/tail_segment_size/tail_segment_padded
+        share_size = mathutil.div_ceil(size, k)
+        # share_size is the amount of block data that will be put into each
+        # share, summed over all segments. It does not include hashes, the
+        # UEB, or other overhead.
+
+        # use the upload-side code to get this as accurate as possible
+        ht = IncompleteHashTree(N)
+        num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
+        wbp = make_write_bucket_proxy(None, share_size, r["block_size"],
+                                      r["num_segments"], num_share_hashes, 0,
+                                      None)
+        self._fieldsize = wbp.fieldsize
+        self._fieldstruct = wbp.fieldstruct
+        self.guessed_offsets = wbp._offsets
+
+    # called by our client, the SegmentFetcher
+    def get_block(self, segnum):
+        """Add a block number to the list of requests. This will eventually
+        result in a fetch of the data necessary to validate the block, then
+        the block itself. The fetch order is generally
+        first-come-first-served, but requests may be answered out-of-order if
+        data becomes available sooner.
+
+        I return an EventStreamObserver, which has two uses. The first is to
+        call o.subscribe(), which gives me a place to send state changes and
+        eventually the data block. The second is o.cancel(), which removes
+        the request (if it is still active).
+
+        I will distribute the following events through my EventStreamObserver:
+         - state=OVERDUE: ?? I believe I should have had an answer by now.
+                          You may want to ask another share instead.
+         - state=BADSEGNUM: the segnum you asked for is too large. I must
+                            fetch a valid UEB before I can determine this,
+                            so the notification is asynchronous
+         - state=COMPLETE, block=data: here is a valid block
+         - state=CORRUPT: this share contains corrupted data
+         - state=DEAD, f=Failure: the server reported an error, this share
+                                  is unusable
+        """
+        log.msg("%s.get_block(%d)" % (repr(self), segnum),
+                level=log.NOISY, parent=self._lp, umid="RTo9MQ")
+        assert segnum >= 0
+        o = EventStreamObserver()
+        o.set_canceler(self, "_cancel_block_request")
+        for i,(segnum0,observers) in enumerate(self._requested_blocks):
+            if segnum0 == segnum:
+                observers.add(o)
+                break
+        else:
+            self._requested_blocks.append( (segnum, set([o])) )
+        eventually(self.loop)
+        return o
+
+    def _cancel_block_request(self, o):
+        new_requests = []
+        for e in self._requested_blocks:
+            (segnum0, observers) = e
+            observers.discard(o)
+            if observers:
+                new_requests.append(e)
+        self._requested_blocks = new_requests
+
+    # internal methods
+    def _active_segnum_and_observers(self):
+        if self._requested_blocks:
+            # we only retrieve information for one segment at a time, to
+            # minimize alacrity (first come, first served)
+            return self._requested_blocks[0]
+        return None, []
+
+    def loop(self):
+        try:
+            # if any exceptions occur here, kill the download
+            log.msg("%s.loop, reqs=[%s], pending=%s, received=%s,"
+                    " unavailable=%s" %
+                    (repr(self),
+                     ",".join([str(req[0]) for req in self._requested_blocks]),
+                     self._pending.dump(), self._received.dump(),
+                     self._unavailable.dump() ),
+                    level=log.NOISY, parent=self._lp, umid="BaL1zw")
+            self._do_loop()
+            # all exception cases call self._fail(), which clears self._alive
+        except (BadHashError, NotEnoughHashesError, LayoutInvalid), e:
+            # Abandon this share. We do this if we see corruption in the
+            # offset table, the UEB, or a hash tree. We don't abandon the
+            # whole share if we see corruption in a data block (we abandon
+            # just the one block, and still try to get data from other blocks
+            # on the same server). In theory, we could get good data from a
+            # share with a corrupt UEB (by first getting the UEB from some
+            # other share), or corrupt hash trees, but the logic to decide
+            # when this is safe is non-trivial. So for now, give up at the
+            # first sign of corruption.
+            #
+            # _satisfy_*() code which detects corruption should first call
+            # self._signal_corruption(), and then raise the exception.
+            log.msg(format="corruption detected in %(share)s",
+                    share=repr(self),
+                    level=log.UNUSUAL, parent=self._lp, umid="gWspVw")
+            self._fail(Failure(e), log.UNUSUAL)
+        except DataUnavailable, e:
+            # Abandon this share.
+            log.msg(format="need data that will never be available"
+                    " from %s: pending=%s, received=%s, unavailable=%s" %
+                    (repr(self),
+                     self._pending.dump(), self._received.dump(),
+                     self._unavailable.dump() ),
+                    level=log.UNUSUAL, parent=self._lp, umid="F7yJnQ")
+            self._fail(Failure(e), log.UNUSUAL)
+        except BaseException:
+            self._fail(Failure())
+            raise
+        log.msg("%s.loop done, reqs=[%s], pending=%s, received=%s,"
+                " unavailable=%s" %
+                (repr(self),
+                 ",".join([str(req[0]) for req in self._requested_blocks]),
+                 self._pending.dump(), self._received.dump(),
+                 self._unavailable.dump() ),
+                level=log.NOISY, parent=self._lp, umid="9lRaRA")
+
+    def _do_loop(self):
+        # we are (eventually) called after all state transitions:
+        #  new segments added to self._requested_blocks
+        #  new data received from servers (responses to our read() calls)
+        #  impatience timer fires (server appears slow)
+        if not self._alive:
+            return
+
+        # First, consume all of the information that we currently have, for
+        # all the segments people currently want.
+        while self._get_satisfaction():
+            pass
+
+        # When we get no satisfaction (from the data we've received so far),
+        # we determine what data we desire (to satisfy more requests). The
+        # number of segments is finite, so I can't get no satisfaction
+        # forever.
+        wanted, needed = self._desire()
+
+        # Finally, send out requests for whatever we need (desire minus
+        # have). You can't always get what you want, but if you try
+        # sometimes, you just might find, you get what you need.
+        self._send_requests(wanted + needed)
+
+        # and sometimes you can't even get what you need
+        disappointment = needed & self._unavailable
+        if len(disappointment):
+            self.had_corruption = True
+            raise DataUnavailable("need %s but will never get it" %
+                                  disappointment.dump())
+
+    def _get_satisfaction(self):
+        # return True if we retired a data block, and should therefore be
+        # called again. Return False if we don't retire a data block (even if
+        # we do retire some other data, like hash chains).
+
+        if self.actual_offsets is None:
+            if not self._satisfy_offsets():
+                # can't even look at anything without the offset table
+                return False
+
+        if not self._node.have_UEB:
+            if not self._satisfy_UEB():
+                # can't check any hashes without the UEB
+                return False
+        self.actual_segment_size = self._node.segment_size # might be updated
+        assert self.actual_segment_size is not None
+
+        # knowing the UEB means knowing num_segments. Despite the redundancy,
+        # this is the best place to set this. CommonShare.set_numsegs will
+        # ignore duplicate calls.
+        assert self._node.num_segments is not None
+        cs = self._commonshare
+        cs.set_numsegs(self._node.num_segments)
+
+        segnum, observers = self._active_segnum_and_observers()
+        # if segnum is None, we don't really need to do anything (we have no
+        # outstanding readers right now), but we'll fill in the bits that
+        # aren't tied to any particular segment.
+
+        if segnum is not None and segnum >= self._node.num_segments:
+            for o in observers:
+                o.notify(state=BADSEGNUM)
+            self._requested_blocks.pop(0)
+            return True
+
+        if self._node.share_hash_tree.needed_hashes(self._shnum):
+            if not self._satisfy_share_hash_tree():
+                # can't check block_hash_tree without a root
+                return False
+
+        if cs.need_block_hash_root():
+            block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum)
+            cs.set_block_hash_root(block_hash_root)
+
+        if segnum is None:
+            return False # we don't want any particular segment right now
+
+        # block_hash_tree
+        needed_hashes = self._commonshare.get_needed_block_hashes(segnum)
+        if needed_hashes:
+            if not self._satisfy_block_hash_tree(needed_hashes):
+                # can't check block without block_hash_tree
+                return False
+
+        # ciphertext_hash_tree
+        needed_hashes = self._node.get_needed_ciphertext_hashes(segnum)
+        if needed_hashes:
+            if not self._satisfy_ciphertext_hash_tree(needed_hashes):
+                # can't check decoded blocks without ciphertext_hash_tree
+                return False
+
+        # data blocks
+        return self._satisfy_data_block(segnum, observers)
+
+    def _satisfy_offsets(self):
+        version_s = self._received.get(0, 4)
+        if version_s is None:
+            return False
+        (version,) = struct.unpack(">L", version_s)
+        if version == 1:
+            table_start = 0x0c
+            self._fieldsize = 0x4
+            self._fieldstruct = "L"
+        elif version == 2:
+            table_start = 0x14
+            self._fieldsize = 0x8
+            self._fieldstruct = "Q"
+        else:
+            self.had_corruption = True
+            raise LayoutInvalid("unknown version %d (I understand 1 and 2)"
+                                % version)
+        offset_table_size = 6 * self._fieldsize
+        table_s = self._received.pop(table_start, offset_table_size)
+        if table_s is None:
+            return False
+        fields = struct.unpack(">"+6*self._fieldstruct, table_s)
+        offsets = {}
+        for i,field in enumerate(['data',
+                                  'plaintext_hash_tree', # UNUSED
+                                  'crypttext_hash_tree',
+                                  'block_hashes',
+                                  'share_hashes',
+                                  'uri_extension',
+                                  ] ):
+            offsets[field] = fields[i]
+        self.actual_offsets = offsets
+        log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields))
+        self._received.remove(0, 4) # don't need this anymore
+
+        # validate the offsets a bit
+        share_hashes_size = offsets["uri_extension"] - offsets["share_hashes"]
+        if share_hashes_size < 0 or share_hashes_size % (2+HASH_SIZE) != 0:
+            # the share hash chain is stored as (hashnum,hash) pairs
+            self.had_corruption = True
+            raise LayoutInvalid("share hashes malformed -- should be a"
+                                " multiple of %d bytes -- not %d" %
+                                (2+HASH_SIZE, share_hashes_size))
+        block_hashes_size = offsets["share_hashes"] - offsets["block_hashes"]
+        if block_hashes_size < 0 or block_hashes_size % (HASH_SIZE) != 0:
+            # the block hash tree is stored as a list of hashes
+            self.had_corruption = True
+            raise LayoutInvalid("block hashes malformed -- should be a"
+                                " multiple of %d bytes -- not %d" %
+                                (HASH_SIZE, block_hashes_size))
+        # we only look at 'crypttext_hash_tree' if the UEB says we're
+        # actually using it. Same with 'plaintext_hash_tree'. This gives us
+        # some wiggle room: a place to stash data for later extensions.
+
+        return True
+
+    def _satisfy_UEB(self):
+        o = self.actual_offsets
+        fsize = self._fieldsize
+        UEB_length_s = self._received.get(o["uri_extension"], fsize)
+        if not UEB_length_s:
+            return False
+        (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s)
+        UEB_s = self._received.pop(o["uri_extension"]+fsize, UEB_length)
+        if not UEB_s:
+            return False
+        self._received.remove(o["uri_extension"], fsize)
+        try:
+            self._node.validate_and_store_UEB(UEB_s)
+            return True
+        except (LayoutInvalid, BadHashError), e:
+            # TODO: if this UEB was bad, we'll keep trying to validate it
+            # over and over again. Only log.err on the first one, or better
+            # yet skip all but the first
+            f = Failure(e)
+            self._signal_corruption(f, o["uri_extension"], fsize+UEB_length)
+            self.had_corruption = True
+            raise
+
+    def _satisfy_share_hash_tree(self):
+        # the share hash chain is stored as (hashnum,hash) tuples, so you
+        # can't fetch just the pieces you need, because you don't know
+        # exactly where they are. So fetch everything, and parse the results
+        # later.
+        o = self.actual_offsets
+        hashlen = o["uri_extension"] - o["share_hashes"]
+        assert hashlen % (2+HASH_SIZE) == 0
+        hashdata = self._received.get(o["share_hashes"], hashlen)
+        if not hashdata:
+            return False
+        share_hashes = {}
+        for i in range(0, hashlen, 2+HASH_SIZE):
+            (hashnum,) = struct.unpack(">H", hashdata[i:i+2])
+            hashvalue = hashdata[i+2:i+2+HASH_SIZE]
+            share_hashes[hashnum] = hashvalue
+        # TODO: if they give us an empty set of hashes,
+        # process_share_hashes() won't fail. We must ensure that this
+        # situation doesn't allow unverified shares through. Manual testing
+        # shows that set_block_hash_root() throws an assert because an
+        # internal node is None instead of an actual hash, but we want
+        # something better. It's probably best to add a method to
+        # IncompleteHashTree which takes a leaf number and raises an
+        # exception unless that leaf is present and fully validated.
+        try:
+            self._node.process_share_hashes(share_hashes)
+            # adds to self._node.share_hash_tree
+        except (BadHashError, NotEnoughHashesError), e:
+            f = Failure(e)
+            self._signal_corruption(f, o["share_hashes"], hashlen)
+            self.had_corruption = True
+            raise
+        self._received.remove(o["share_hashes"], hashlen)
+        return True
+
+    def _signal_corruption(self, f, start, offset):
+        # there was corruption somewhere in the given range
+        reason = "corruption in share[%d-%d): %s" % (start, start+offset,
+                                                     str(f.value))
+        self._rref.callRemoteOnly("advise_corrupt_share", reason)
+
+    def _satisfy_block_hash_tree(self, needed_hashes):
+        o_bh = self.actual_offsets["block_hashes"]
+        block_hashes = {}
+        for hashnum in needed_hashes:
+            hashdata = self._received.get(o_bh+hashnum*HASH_SIZE, HASH_SIZE)
+            if hashdata:
+                block_hashes[hashnum] = hashdata
+            else:
+                return False # missing some hashes
+        # note that we don't submit any hashes to the block_hash_tree until
+        # we've gotten them all, because the hash tree will throw an
+        # exception if we only give it a partial set (which it therefore
+        # cannot validate)
+        try:
+            self._commonshare.process_block_hashes(block_hashes)
+        except (BadHashError, NotEnoughHashesError), e:
+            f = Failure(e)
+            hashnums = ",".join([str(n) for n in sorted(block_hashes.keys())])
+            log.msg(format="hash failure in block_hashes=(%(hashnums)s),"
+                    " from %(share)s",
+                    hashnums=hashnums, shnum=self._shnum, share=repr(self),
+                    failure=f, level=log.WEIRD, parent=self._lp, umid="yNyFdA")
+            hsize = max(0, max(needed_hashes)) * HASH_SIZE
+            self._signal_corruption(f, o_bh, hsize)
+            self.had_corruption = True
+            raise
+        for hashnum in needed_hashes:
+            self._received.remove(o_bh+hashnum*HASH_SIZE, HASH_SIZE)
+        return True
+
+    def _satisfy_ciphertext_hash_tree(self, needed_hashes):
+        start = self.actual_offsets["crypttext_hash_tree"]
+        hashes = {}
+        for hashnum in needed_hashes:
+            hashdata = self._received.get(start+hashnum*HASH_SIZE, HASH_SIZE)
+            if hashdata:
+                hashes[hashnum] = hashdata
+            else:
+                return False # missing some hashes
+        # we don't submit any hashes to the ciphertext_hash_tree until we've
+        # gotten them all
+        try:
+            self._node.process_ciphertext_hashes(hashes)
+        except (BadHashError, NotEnoughHashesError), e:
+            f = Failure(e)
+            hashnums = ",".join([str(n) for n in sorted(hashes.keys())])
+            log.msg(format="hash failure in ciphertext_hashes=(%(hashnums)s),"
+                    " from %(share)s",
+                    hashnums=hashnums, share=repr(self), failure=f,
+                    level=log.WEIRD, parent=self._lp, umid="iZI0TA")
+            hsize = max(0, max(needed_hashes))*HASH_SIZE
+            self._signal_corruption(f, start, hsize)
+            self.had_corruption = True
+            raise
+        for hashnum in needed_hashes:
+            self._received.remove(start+hashnum*HASH_SIZE, HASH_SIZE)
+        return True
+
+    def _satisfy_data_block(self, segnum, observers):
+        tail = (segnum == self._node.num_segments-1)
+        datastart = self.actual_offsets["data"]
+        blockstart = datastart + segnum * self._node.block_size
+        blocklen = self._node.block_size
+        if tail:
+            blocklen = self._node.tail_block_size
+
+        block = self._received.pop(blockstart, blocklen)
+        if not block:
+            log.msg("no data for block %s (want [%d:+%d])" % (repr(self),
+                                                              blockstart, blocklen))
+            return False
+        log.msg(format="%(share)s._satisfy_data_block [%(start)d:+%(length)d]",
+                share=repr(self), start=blockstart, length=blocklen,
+                level=log.NOISY, parent=self._lp, umid="uTDNZg")
+        # this block is being retired, either as COMPLETE or CORRUPT, since
+        # no further data reads will help
+        assert self._requested_blocks[0][0] == segnum
+        try:
+            self._commonshare.check_block(segnum, block)
+            # hurrah, we have a valid block. Deliver it.
+            for o in observers:
+                # goes to SegmentFetcher._block_request_activity
+                o.notify(state=COMPLETE, block=block)
+        except (BadHashError, NotEnoughHashesError), e:
+            # rats, we have a corrupt block. Notify our clients that they
+            # need to look elsewhere, and advise the server. Unlike
+            # corruption in other parts of the share, this doesn't cause us
+            # to abandon the whole share.
+            f = Failure(e)
+            log.msg(format="hash failure in block %(segnum)d, from %(share)s",
+                    segnum=segnum, share=repr(self), failure=f,
+                    level=log.WEIRD, parent=self._lp, umid="mZjkqA")
+            for o in observers:
+                o.notify(state=CORRUPT)
+            self._signal_corruption(f, blockstart, blocklen)
+            self.had_corruption = True
+        # in either case, we've retired this block
+        self._requested_blocks.pop(0)
+        # popping the request keeps us from turning around and wanting the
+        # block again right away
+        return True # got satisfaction
+
+    def _desire(self):
+        segnum, observers = self._active_segnum_and_observers() # maybe None
+
+        # 'want_it' is for data we merely want: we know that we don't really
+        # need it. This includes speculative reads, like the first 1KB of the
+        # share (for the offset table) and the first 2KB of the UEB.
+        #
+        # 'need_it' is for data that, if we have the real offset table, we'll
+        # need. If we are only guessing at the offset table, it's merely
+        # wanted. (The share is abandoned if we can't get data that we really
+        # need).
+        #
+        # 'gotta_gotta_have_it' is for data that we absolutely need,
+        # independent of whether we're still guessing about the offset table:
+        # the version number and the offset table itself.
+        #
+        # Mr. Popeil, I'm in trouble, need your assistance on the double. Aww..
+
+        desire = Spans(), Spans(), Spans()
+        (want_it, need_it, gotta_gotta_have_it) = desire
+
+        self.actual_segment_size = self._node.segment_size # might be updated
+        o = self.actual_offsets or self.guessed_offsets
+        segsize = self.actual_segment_size or self.guessed_segment_size
+        r = self._node._calculate_sizes(segsize)
+
+        if not self.actual_offsets:
+            # all _desire functions add bits to the three desire[] spans
+            self._desire_offsets(desire)
+
+        # we can use guessed offsets as long as this server tolerates
+        # overrun. Otherwise, we must wait for the offsets to arrive before
+        # we try to read anything else.
+        if self.actual_offsets or self._overrun_ok:
+            if not self._node.have_UEB:
+                self._desire_UEB(desire, o)
+            # They might ask for a segment that doesn't look right.
+            # _satisfy() will catch+reject bad segnums once we know the UEB
+            # (and therefore segsize and numsegs), so we'll only fail this
+            # test if we're still guessing. We want to avoid asking the
+            # hashtrees for needed_hashes() for bad segnums. So don't enter
+            # _desire_hashes or _desire_data unless the segnum looks
+            # reasonable.
+            if segnum < r["num_segments"]:
+                # XXX somehow we're getting here for sh5. we don't yet know
+                # the actual_segment_size, we're still working off the guess.
+                # the ciphertext_hash_tree has been corrected, but the
+                # commonshare._block_hash_tree is still in the guessed state.
+                self._desire_share_hashes(desire, o)
+                if segnum is not None:
+                    self._desire_block_hashes(desire, o, segnum)
+                    self._desire_data(desire, o, r, segnum, segsize)
+            else:
+                log.msg("_desire: segnum(%d) looks wrong (numsegs=%d)"
+                        % (segnum, r["num_segments"]),
+                        level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ")
+
+        log.msg("end _desire: want_it=%s need_it=%s gotta=%s"
+                % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()))
+        if self.actual_offsets:
+            return (want_it, need_it+gotta_gotta_have_it)
+        else:
+            return (want_it+need_it, gotta_gotta_have_it)
+
+    def _desire_offsets(self, desire):
+        (want_it, need_it, gotta_gotta_have_it) = desire
+        if self._overrun_ok:
+            # easy! this includes version number, sizes, and offsets
+            want_it.add(0, 1024)
+            return
+
+        # v1 has an offset table that lives [0x0,0x24). v2 lives [0x0,0x44).
+        # To be conservative, only request the data that we know lives there,
+        # even if that means more roundtrips.
+
+        gotta_gotta_have_it.add(0, 4)  # version number, always safe
+        version_s = self._received.get(0, 4)
+        if not version_s:
+            return
+        (version,) = struct.unpack(">L", version_s)
+        # The code in _satisfy_offsets will have checked this version
+        # already. There is no code path to get this far with version>2.
+        assert 1 <= version <= 2, "can't get here, version=%d" % version
+        if version == 1:
+            table_start = 0x0c
+            fieldsize = 0x4
+        elif version == 2:
+            table_start = 0x14
+            fieldsize = 0x8
+        offset_table_size = 6 * fieldsize
+        gotta_gotta_have_it.add(table_start, offset_table_size)
+
+    def _desire_UEB(self, desire, o):
+        (want_it, need_it, gotta_gotta_have_it) = desire
+
+        # UEB data is stored as (length,data).
+        if self._overrun_ok:
+            # We can pre-fetch 2kb, which should probably cover it. If it
+            # turns out to be larger, we'll come back here later with a known
+            # length and fetch the rest.
+            want_it.add(o["uri_extension"], 2048)
+            # now, while that is probably enough to fetch the whole UEB, it
+            # might not be, so we need to do the next few steps as well. In
+            # most cases, the following steps will not actually add anything
+            # to need_it
+
+        need_it.add(o["uri_extension"], self._fieldsize)
+        # only use a length if we're sure it's correct, otherwise we'll
+        # probably fetch a huge number
+        if not self.actual_offsets:
+            return
+        UEB_length_s = self._received.get(o["uri_extension"], self._fieldsize)
+        if UEB_length_s:
+            (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s)
+            # we know the length, so make sure we grab everything
+            need_it.add(o["uri_extension"]+self._fieldsize, UEB_length)
+
+    def _desire_share_hashes(self, desire, o):
+        (want_it, need_it, gotta_gotta_have_it) = desire
+
+        if self._node.share_hash_tree.needed_hashes(self._shnum):
+            hashlen = o["uri_extension"] - o["share_hashes"]
+            need_it.add(o["share_hashes"], hashlen)
+
+    def _desire_block_hashes(self, desire, o, segnum):
+        (want_it, need_it, gotta_gotta_have_it) = desire
+
+        # block hash chain
+        for hashnum in self._commonshare.get_needed_block_hashes(segnum):
+            need_it.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
+
+        # ciphertext hash chain
+        for hashnum in self._node.get_needed_ciphertext_hashes(segnum):
+            need_it.add(o["crypttext_hash_tree"]+hashnum*HASH_SIZE, HASH_SIZE)
+
+    def _desire_data(self, desire, o, r, segnum, segsize):
+        (want_it, need_it, gotta_gotta_have_it) = desire
+        tail = (segnum == r["num_segments"]-1)
+        datastart = o["data"]
+        blockstart = datastart + segnum * r["block_size"]
+        blocklen = r["block_size"]
+        if tail:
+            blocklen = r["tail_block_size"]
+        need_it.add(blockstart, blocklen)
+
+    def _send_requests(self, desired):
+        ask = desired - self._pending - self._received.get_spans()
+        log.msg("%s._send_requests, desired=%s, pending=%s, ask=%s" %
+                (repr(self), desired.dump(), self._pending.dump(), ask.dump()),
+                level=log.NOISY, parent=self._lp, umid="E94CVA")
+        # XXX At one time, this code distinguished between data blocks and
+        # hashes, and made sure to send (small) requests for hashes before
+        # sending (big) requests for blocks. The idea was to make sure that
+        # all hashes arrive before the blocks, so the blocks can be consumed
+        # and released in a single turn. I removed this for simplicity.
+        # Reconsider the removal: maybe bring it back.
+        ds = self._download_status
+
+        for (start, length) in ask:
+            # TODO: quantize to reasonably-large blocks
+            self._pending.add(start, length)
+            lp = log.msg(format="%(share)s._send_request"
+                         " [%(start)d:+%(length)d]",
+                         share=repr(self),
+                         start=start, length=length,
+                         level=log.NOISY, parent=self._lp, umid="sgVAyA")
+            req_ev = ds.add_request_sent(self._peerid, self._shnum,
+                                         start, length, now())
+            d = self._send_request(start, length)
+            d.addCallback(self._got_data, start, length, req_ev, lp)
+            d.addErrback(self._got_error, start, length, req_ev, lp)
+            d.addCallback(self._trigger_loop)
+            d.addErrback(lambda f:
+                         log.err(format="unhandled error during send_request",
+                                 failure=f, parent=self._lp,
+                                 level=log.WEIRD, umid="qZu0wg"))
+
+    def _send_request(self, start, length):
+        return self._rref.callRemote("read", start, length)
+
+    def _got_data(self, data, start, length, req_ev, lp):
+        req_ev.finished(len(data), now())
+        if not self._alive:
+            return
+        log.msg(format="%(share)s._got_data [%(start)d:+%(length)d] -> %(datalen)d",
+                share=repr(self), start=start, length=length, datalen=len(data),
+                level=log.NOISY, parent=lp, umid="5Qn6VQ")
+        self._pending.remove(start, length)
+        self._received.add(start, data)
+
+        # if we ask for [a:c], and we get back [a:b] (b<c), that means we're
+        # never going to get [b:c]. If we really need that data, this block
+        # will never complete. The easiest way to get into this situation is
+        # to hit a share with a corrupted offset table, or one that's somehow
+        # been truncated. On the other hand, when overrun_ok is true, we ask
+        # for data beyond the end of the share all the time (it saves some
+        # RTT when we don't know the length of the share ahead of time). So
+        # not every asked-for-but-not-received byte is fatal.
+        if len(data) < length:
+            self._unavailable.add(start+len(data), length-len(data))
+
+        # XXX if table corruption causes our sections to overlap, then one
+        # consumer (i.e. block hash tree) will pop/remove the data that
+        # another consumer (i.e. block data) mistakenly thinks it needs. It
+        # won't ask for that data again, because the span is in
+        # self._requested. But that span won't be in self._unavailable
+        # because we got it back from the server. TODO: handle this properly
+        # (raise DataUnavailable). Then add sanity-checking
+        # no-overlaps-allowed tests to the offset-table unpacking code to
+        # catch this earlier. XXX
+
+        # accumulate a wanted/needed span (not as self._x, but passed into
+        # desire* functions). manage a pending/in-flight list. when the
+        # requests are sent out, empty/discard the wanted/needed span and
+        # populate/augment the pending list. when the responses come back,
+        # augment either received+data or unavailable.
+
+        # if a corrupt offset table results in double-usage, we'll send
+        # double requests.
+
+        # the wanted/needed span is only "wanted" for the first pass. Once
+        # the offset table arrives, it's all "needed".
+
+    def _got_error(self, f, start, length, req_ev, lp):
+        req_ev.finished("error", now())
+        log.msg(format="error requesting %(start)d+%(length)d"
+                " from %(server)s for si %(si)s",
+                start=start, length=length,
+                server=self._peerid_s, si=self._si_prefix,
+                failure=f, parent=lp, level=log.UNUSUAL, umid="BZgAJw")
+        # retire our observers, assuming we won't be able to make any
+        # further progress
+        self._fail(f, log.UNUSUAL)
+
+    def _trigger_loop(self, res):
+        if self._alive:
+            eventually(self.loop)
+        return res
+
+    def _fail(self, f, level=log.WEIRD):
+        log.msg(format="abandoning %(share)s",
+                share=repr(self), failure=f,
+                level=level, parent=self._lp, umid="JKM2Og")
+        self._alive = False
+        for (segnum, observers) in self._requested_blocks:
+            for o in observers:
+                o.notify(state=DEAD, f=f)
+
+
+class CommonShare:
+    """I hold data that is common across all instances of a single share,
+    like sh2 on both servers A and B. This is just the block hash tree.
+    """
+    def __init__(self, guessed_numsegs, si_prefix, shnum, logparent):
+        self.si_prefix = si_prefix
+        self.shnum = shnum
+        # in the beginning, before we have the real UEB, we can only guess at
+        # the number of segments. But we want to ask for block hashes early.
+        # So if we're asked for which block hashes are needed before we know
+        # numsegs for sure, we return a guess.
+        self._block_hash_tree = IncompleteHashTree(guessed_numsegs)
+        self._know_numsegs = False
+        self._logparent = logparent
+
+    def set_numsegs(self, numsegs):
+        if self._know_numsegs:
+            return
+        self._block_hash_tree = IncompleteHashTree(numsegs)
+        self._know_numsegs = True
+
+    def need_block_hash_root(self):
+        return bool(not self._block_hash_tree[0])
+
+    def set_block_hash_root(self, roothash):
+        assert self._know_numsegs
+        self._block_hash_tree.set_hashes({0: roothash})
+
+    def get_needed_block_hashes(self, segnum):
+        # XXX: include_leaf=True needs thought: how did the old downloader do
+        # it? I think it grabbed *all* block hashes and set them all at once.
+        # Since we want to fetch less data, we either need to fetch the leaf
+        # too, or wait to set the block hashes until we've also received the
+        # block itself, so we can hash it too, and set the chain+leaf all at
+        # the same time.
+        return self._block_hash_tree.needed_hashes(segnum, include_leaf=True)
+
+    def process_block_hashes(self, block_hashes):
+        assert self._know_numsegs
+        # this may raise BadHashError or NotEnoughHashesError
+        self._block_hash_tree.set_hashes(block_hashes)
+
+    def check_block(self, segnum, block):
+        assert self._know_numsegs
+        h = hashutil.block_hash(block)
+        # this may raise BadHashError or NotEnoughHashesError
+        self._block_hash_tree.set_hashes(leaves={segnum: h})
diff --git a/src/allmydata/immutable/downloader/status.py b/src/allmydata/immutable/downloader/status.py
new file mode 100644
index 00000000..5d60db0d
--- /dev/null
+++ b/src/allmydata/immutable/downloader/status.py
@@ -0,0 +1,170 @@
+
+import itertools
+from zope.interface import implements
+from allmydata.interfaces import IDownloadStatus
+
+class RequestEvent:
+    def __init__(self, download_status, tag):
+        self._download_status = download_status
+        self._tag = tag
+    def finished(self, received, when):
+        self._download_status.add_request_finished(self._tag, received, when)
+
+class DYHBEvent:
+    def __init__(self, download_status, tag):
+        self._download_status = download_status
+        self._tag = tag
+    def finished(self, shnums, when):
+        self._download_status.add_dyhb_finished(self._tag, shnums, when)
+
+class ReadEvent:
+    def __init__(self, download_status, tag):
+        self._download_status = download_status
+        self._tag = tag
+    def update(self, bytes, decrypttime, pausetime):
+        self._download_status.update_read_event(self._tag, bytes,
+                                                decrypttime, pausetime)
+    def finished(self, finishtime):
+        self._download_status.finish_read_event(self._tag, finishtime)
+
+class DownloadStatus:
+    # There is one DownloadStatus for each CiphertextFileNode. The status
+    # object will keep track of all activity for that node.
+    implements(IDownloadStatus)
+    statusid_counter = itertools.count(0)
+
+    def __init__(self, storage_index, size):
+        self.storage_index = storage_index
+        self.size = size
+        self.counter = self.statusid_counter.next()
+        self.helper = False
+        self.started = None
+        # self.dyhb_requests tracks "do you have a share" requests and
+        # responses. It maps serverid to a tuple of:
+        #  send time
+        #  tuple of response shnums (None if response hasn't arrived, "error")
+        #  response time (None if response hasn't arrived yet)
+        self.dyhb_requests = {}
+
+        # self.requests tracks share-data requests and responses. It maps
+        # serverid to a tuple of:
+        #  shnum,
+        #  start,length,  (of data requested)
+        #  send time
+        #  response length (None if reponse hasn't arrived yet, or "error")
+        #  response time (None if response hasn't arrived)
+        self.requests = {}
+
+        # self.segment_events tracks segment requests and delivery. It is a
+        # list of:
+        #  type ("request", "delivery", "error")
+        #  segment number
+        #  event time
+        #  segment start (file offset of first byte, None except in "delivery")
+        #  segment length (only in "delivery")
+        #  time spent in decode (only in "delivery")
+        self.segment_events = []
+
+        # self.read_events tracks read() requests. It is a list of:
+        #  start,length  (of data requested)
+        #  request time
+        #  finish time (None until finished)
+        #  bytes returned (starts at 0, grows as segments are delivered)
+        #  time spent in decrypt (None for ciphertext-only reads)
+        #  time spent paused
+        self.read_events = []
+
+        self.known_shares = [] # (serverid, shnum)
+        self.problems = []
+
+
+    def add_dyhb_sent(self, serverid, when):
+        r = (when, None, None)
+        if serverid not in self.dyhb_requests:
+            self.dyhb_requests[serverid] = []
+        self.dyhb_requests[serverid].append(r)
+        tag = (serverid, len(self.dyhb_requests[serverid])-1)
+        return DYHBEvent(self, tag)
+
+    def add_dyhb_finished(self, tag, shnums, when):
+        # received="error" on error, else tuple(shnums)
+        (serverid, index) = tag
+        r = self.dyhb_requests[serverid][index]
+        (sent, _, _) = r
+        r = (sent, shnums, when)
+        self.dyhb_requests[serverid][index] = r
+
+    def add_request_sent(self, serverid, shnum, start, length, when):
+        r = (shnum, start, length, when, None, None)
+        if serverid not in self.requests:
+            self.requests[serverid] = []
+        self.requests[serverid].append(r)
+        tag = (serverid, len(self.requests[serverid])-1)
+        return RequestEvent(self, tag)
+
+    def add_request_finished(self, tag, received, when):
+        # received="error" on error, else len(data)
+        (serverid, index) = tag
+        r = self.requests[serverid][index]
+        (shnum, start, length, sent, _, _) = r
+        r = (shnum, start, length, sent, received, when)
+        self.requests[serverid][index] = r
+
+    def add_segment_request(self, segnum, when):
+        if self.started is None:
+            self.started = when
+        r = ("request", segnum, when, None, None, None)
+        self.segment_events.append(r)
+    def add_segment_delivery(self, segnum, when, start, length, decodetime):
+        r = ("delivery", segnum, when, start, length, decodetime)
+        self.segment_events.append(r)
+    def add_segment_error(self, segnum, when):
+        r = ("error", segnum, when, None, None, None)
+        self.segment_events.append(r)
+
+    def add_read_event(self, start, length, when):
+        if self.started is None:
+            self.started = when
+        r = (start, length, when, None, 0, 0, 0)
+        self.read_events.append(r)
+        tag = len(self.read_events)-1
+        return ReadEvent(self, tag)
+    def update_read_event(self, tag, bytes_d, decrypt_d, paused_d):
+        r = self.read_events[tag]
+        (start, length, requesttime, finishtime, bytes, decrypt, paused) = r
+        bytes += bytes_d
+        decrypt += decrypt_d
+        paused += paused_d
+        r = (start, length, requesttime, finishtime, bytes, decrypt, paused)
+        self.read_events[tag] = r
+    def finish_read_event(self, tag, finishtime):
+        r = self.read_events[tag]
+        (start, length, requesttime, _, bytes, decrypt, paused) = r
+        r = (start, length, requesttime, finishtime, bytes, decrypt, paused)
+        self.read_events[tag] = r
+
+    def add_known_share(self, serverid, shnum):
+        self.known_shares.append( (serverid, shnum) )
+
+    def add_problem(self, p):
+        self.problems.append(p)
+
+    # IDownloadStatus methods
+    def get_counter(self):
+        return self.counter
+    def get_storage_index(self):
+        return self.storage_index
+    def get_size(self):
+        return self.size
+    def get_status(self):
+        return "not impl yet" # TODO
+    def get_progress(self):
+        return 0.1 # TODO
+    def using_helper(self):
+        return False
+    def get_active(self):
+        return False # TODO
+    def get_started(self):
+        return self.started
+    def get_results(self):
+        return None # TODO
-- 
2.45.2