Rewrite immutable downloader (#798). This patch rearranges the rest of src/allmydata...
authorBrian Warner <warner@lothar.com>
Wed, 4 Aug 2010 07:26:39 +0000 (00:26 -0700)
committerBrian Warner <warner@lothar.com>
Wed, 4 Aug 2010 07:26:39 +0000 (00:26 -0700)
src/allmydata/immutable/checker.py
src/allmydata/immutable/filenode.py
src/allmydata/immutable/layout.py
src/allmydata/immutable/literal.py [new file with mode: 0644]
src/allmydata/immutable/repairer.py
src/allmydata/immutable/upload.py

index 2f2d8f120cc0790536aa64414723917809c350aa..cd5c5568dae4cdb671042472cb598b23a20e29fb 100644 (file)
+from zope.interface import implements
+from twisted.internet import defer
 from foolscap.api import DeadReferenceError, RemoteException
+from allmydata import hashtree, codec, uri
+from allmydata.interfaces import IValidatedThingProxy, IVerifierURI
 from allmydata.hashtree import IncompleteHashTree
 from allmydata.check_results import CheckResults
-from allmydata.immutable import download
 from allmydata.uri import CHKFileVerifierURI
 from allmydata.util.assertutil import precondition
-from allmydata.util import base32, idlib, deferredutil, dictutil, log
+from allmydata.util import base32, idlib, deferredutil, dictutil, log, mathutil
 from allmydata.util.hashutil import file_renewal_secret_hash, \
      file_cancel_secret_hash, bucket_renewal_secret_hash, \
-     bucket_cancel_secret_hash
+     bucket_cancel_secret_hash, uri_extension_hash, CRYPTO_VAL_SIZE, \
+     block_hash
 
 from allmydata.immutable import layout
 
+class IntegrityCheckReject(Exception):
+    pass
+class BadURIExtension(IntegrityCheckReject):
+    pass
+class BadURIExtensionHashValue(IntegrityCheckReject):
+    pass
+class BadOrMissingHash(IntegrityCheckReject):
+    pass
+class UnsupportedErasureCodec(BadURIExtension):
+    pass
+
+class ValidatedExtendedURIProxy:
+    implements(IValidatedThingProxy)
+    """ I am a front-end for a remote UEB (using a local ReadBucketProxy),
+    responsible for retrieving and validating the elements from the UEB."""
+
+    def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
+        # fetch_failures is for debugging -- see test_encode.py
+        self._fetch_failures = fetch_failures
+        self._readbucketproxy = readbucketproxy
+        precondition(IVerifierURI.providedBy(verifycap), verifycap)
+        self._verifycap = verifycap
+
+        # required
+        self.segment_size = None
+        self.crypttext_root_hash = None
+        self.share_root_hash = None
+
+        # computed
+        self.block_size = None
+        self.share_size = None
+        self.num_segments = None
+        self.tail_data_size = None
+        self.tail_segment_size = None
+
+        # optional
+        self.crypttext_hash = None
+
+    def __str__(self):
+        return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
+
+    def _check_integrity(self, data):
+        h = uri_extension_hash(data)
+        if h != self._verifycap.uri_extension_hash:
+            msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
+                   (self._readbucketproxy,
+                    base32.b2a(self._verifycap.uri_extension_hash),
+                    base32.b2a(h)))
+            if self._fetch_failures is not None:
+                self._fetch_failures["uri_extension"] += 1
+            raise BadURIExtensionHashValue(msg)
+        else:
+            return data
+
+    def _parse_and_validate(self, data):
+        self.share_size = mathutil.div_ceil(self._verifycap.size,
+                                            self._verifycap.needed_shares)
+
+        d = uri.unpack_extension(data)
+
+        # There are several kinds of things that can be found in a UEB.
+        # First, things that we really need to learn from the UEB in order to
+        # do this download. Next: things which are optional but not redundant
+        # -- if they are present in the UEB they will get used. Next, things
+        # that are optional and redundant. These things are required to be
+        # consistent: they don't have to be in the UEB, but if they are in
+        # the UEB then they will be checked for consistency with the
+        # already-known facts, and if they are inconsistent then an exception
+        # will be raised. These things aren't actually used -- they are just
+        # tested for consistency and ignored. Finally: things which are
+        # deprecated -- they ought not be in the UEB at all, and if they are
+        # present then a warning will be logged but they are otherwise
+        # ignored.
+
+        # First, things that we really need to learn from the UEB:
+        # segment_size, crypttext_root_hash, and share_root_hash.
+        self.segment_size = d['segment_size']
+
+        self.block_size = mathutil.div_ceil(self.segment_size,
+                                            self._verifycap.needed_shares)
+        self.num_segments = mathutil.div_ceil(self._verifycap.size,
+                                              self.segment_size)
+
+        self.tail_data_size = self._verifycap.size % self.segment_size
+        if not self.tail_data_size:
+            self.tail_data_size = self.segment_size
+        # padding for erasure code
+        self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
+                                                        self._verifycap.needed_shares)
+
+        # Ciphertext hash tree root is mandatory, so that there is at most
+        # one ciphertext that matches this read-cap or verify-cap. The
+        # integrity check on the shares is not sufficient to prevent the
+        # original encoder from creating some shares of file A and other
+        # shares of file B.
+        self.crypttext_root_hash = d['crypttext_root_hash']
+
+        self.share_root_hash = d['share_root_hash']
+
+
+        # Next: things that are optional and not redundant: crypttext_hash
+        if d.has_key('crypttext_hash'):
+            self.crypttext_hash = d['crypttext_hash']
+            if len(self.crypttext_hash) != CRYPTO_VAL_SIZE:
+                raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
+
+
+        # Next: things that are optional, redundant, and required to be
+        # consistent: codec_name, codec_params, tail_codec_params,
+        # num_segments, size, needed_shares, total_shares
+        if d.has_key('codec_name'):
+            if d['codec_name'] != "crs":
+                raise UnsupportedErasureCodec(d['codec_name'])
+
+        if d.has_key('codec_params'):
+            ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
+            if ucpss != self.segment_size:
+                raise BadURIExtension("inconsistent erasure code params: "
+                                      "ucpss: %s != self.segment_size: %s" %
+                                      (ucpss, self.segment_size))
+            if ucpns != self._verifycap.needed_shares:
+                raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
+                                      "self._verifycap.needed_shares: %s" %
+                                      (ucpns, self._verifycap.needed_shares))
+            if ucpts != self._verifycap.total_shares:
+                raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
+                                      "self._verifycap.total_shares: %s" %
+                                      (ucpts, self._verifycap.total_shares))
+
+        if d.has_key('tail_codec_params'):
+            utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
+            if utcpss != self.tail_segment_size:
+                raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
+                                      "self.tail_segment_size: %s, self._verifycap.size: %s, "
+                                      "self.segment_size: %s, self._verifycap.needed_shares: %s"
+                                      % (utcpss, self.tail_segment_size, self._verifycap.size,
+                                         self.segment_size, self._verifycap.needed_shares))
+            if utcpns != self._verifycap.needed_shares:
+                raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
+                                      "self._verifycap.needed_shares: %s" % (utcpns,
+                                                                             self._verifycap.needed_shares))
+            if utcpts != self._verifycap.total_shares:
+                raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
+                                      "self._verifycap.total_shares: %s" % (utcpts,
+                                                                            self._verifycap.total_shares))
+
+        if d.has_key('num_segments'):
+            if d['num_segments'] != self.num_segments:
+                raise BadURIExtension("inconsistent num_segments: size: %s, "
+                                      "segment_size: %s, computed_num_segments: %s, "
+                                      "ueb_num_segments: %s" % (self._verifycap.size,
+                                                                self.segment_size,
+                                                                self.num_segments, d['num_segments']))
+
+        if d.has_key('size'):
+            if d['size'] != self._verifycap.size:
+                raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
+                                      (self._verifycap.size, d['size']))
+
+        if d.has_key('needed_shares'):
+            if d['needed_shares'] != self._verifycap.needed_shares:
+                raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
+                                      "needed shares: %s" % (self._verifycap.total_shares,
+                                                             d['needed_shares']))
+
+        if d.has_key('total_shares'):
+            if d['total_shares'] != self._verifycap.total_shares:
+                raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
+                                      "total shares: %s" % (self._verifycap.total_shares,
+                                                            d['total_shares']))
+
+        # Finally, things that are deprecated and ignored: plaintext_hash,
+        # plaintext_root_hash
+        if d.get('plaintext_hash'):
+            log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
+                    "and is no longer used.  Ignoring.  %s" % (self,))
+        if d.get('plaintext_root_hash'):
+            log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
+                    "reasons and is no longer used.  Ignoring.  %s" % (self,))
+
+        return self
+
+    def start(self):
+        """Fetch the UEB from bucket, compare its hash to the hash from
+        verifycap, then parse it. Returns a deferred which is called back
+        with self once the fetch is successful, or is erred back if it
+        fails."""
+        d = self._readbucketproxy.get_uri_extension()
+        d.addCallback(self._check_integrity)
+        d.addCallback(self._parse_and_validate)
+        return d
+
+class ValidatedReadBucketProxy(log.PrefixingLogMixin):
+    """I am a front-end for a remote storage bucket, responsible for
+    retrieving and validating data from that bucket.
+
+    My get_block() method is used by BlockDownloaders.
+    """
+
+    def __init__(self, sharenum, bucket, share_hash_tree, num_blocks,
+                 block_size, share_size):
+        """ share_hash_tree is required to have already been initialized with
+        the root hash (the number-0 hash), using the share_root_hash from the
+        UEB"""
+        precondition(share_hash_tree[0] is not None, share_hash_tree)
+        prefix = "%d-%s-%s" % (sharenum, bucket,
+                               base32.b2a_l(share_hash_tree[0][:8], 60))
+        log.PrefixingLogMixin.__init__(self,
+                                       facility="tahoe.immutable.download",
+                                       prefix=prefix)
+        self.sharenum = sharenum
+        self.bucket = bucket
+        self.share_hash_tree = share_hash_tree
+        self.num_blocks = num_blocks
+        self.block_size = block_size
+        self.share_size = share_size
+        self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
+
+    def get_all_sharehashes(self):
+        """Retrieve and validate all the share-hash-tree nodes that are
+        included in this share, regardless of whether we need them to
+        validate the share or not. Each share contains a minimal Merkle tree
+        chain, but there is lots of overlap, so usually we'll be using hashes
+        from other shares and not reading every single hash from this share.
+        The Verifier uses this function to read and validate every single
+        hash from this share.
+
+        Call this (and wait for the Deferred it returns to fire) before
+        calling get_block() for the first time: this lets us check that the
+        share share contains enough hashes to validate its own data, and
+        avoids downloading any share hash twice.
+
+        I return a Deferred which errbacks upon failure, probably with
+        BadOrMissingHash."""
+
+        d = self.bucket.get_share_hashes()
+        def _got_share_hashes(sh):
+            sharehashes = dict(sh)
+            try:
+                self.share_hash_tree.set_hashes(sharehashes)
+            except IndexError, le:
+                raise BadOrMissingHash(le)
+            except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
+                raise BadOrMissingHash(le)
+        d.addCallback(_got_share_hashes)
+        return d
+
+    def get_all_blockhashes(self):
+        """Retrieve and validate all the block-hash-tree nodes that are
+        included in this share. Each share contains a full Merkle tree, but
+        we usually only fetch the minimal subset necessary for any particular
+        block. This function fetches everything at once. The Verifier uses
+        this function to validate the block hash tree.
+
+        Call this (and wait for the Deferred it returns to fire) after
+        calling get_all_sharehashes() and before calling get_block() for the
+        first time: this lets us check that the share contains all block
+        hashes and avoids downloading them multiple times.
+
+        I return a Deferred which errbacks upon failure, probably with
+        BadOrMissingHash.
+        """
+
+        # get_block_hashes(anything) currently always returns everything
+        needed = list(range(len(self.block_hash_tree)))
+        d = self.bucket.get_block_hashes(needed)
+        def _got_block_hashes(blockhashes):
+            if len(blockhashes) < len(self.block_hash_tree):
+                raise BadOrMissingHash()
+            bh = dict(enumerate(blockhashes))
+
+            try:
+                self.block_hash_tree.set_hashes(bh)
+            except IndexError, le:
+                raise BadOrMissingHash(le)
+            except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
+                raise BadOrMissingHash(le)
+        d.addCallback(_got_block_hashes)
+        return d
+
+    def get_all_crypttext_hashes(self, crypttext_hash_tree):
+        """Retrieve and validate all the crypttext-hash-tree nodes that are
+        in this share. Normally we don't look at these at all: the download
+        process fetches them incrementally as needed to validate each segment
+        of ciphertext. But this is a convenient place to give the Verifier a
+        function to validate all of these at once.
+
+        Call this with a new hashtree object for each share, initialized with
+        the crypttext hash tree root. I return a Deferred which errbacks upon
+        failure, probably with BadOrMissingHash.
+        """
+
+        # get_crypttext_hashes() always returns everything
+        d = self.bucket.get_crypttext_hashes()
+        def _got_crypttext_hashes(hashes):
+            if len(hashes) < len(crypttext_hash_tree):
+                raise BadOrMissingHash()
+            ct_hashes = dict(enumerate(hashes))
+            try:
+                crypttext_hash_tree.set_hashes(ct_hashes)
+            except IndexError, le:
+                raise BadOrMissingHash(le)
+            except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
+                raise BadOrMissingHash(le)
+        d.addCallback(_got_crypttext_hashes)
+        return d
+
+    def get_block(self, blocknum):
+        # the first time we use this bucket, we need to fetch enough elements
+        # of the share hash tree to validate it from our share hash up to the
+        # hashroot.
+        if self.share_hash_tree.needed_hashes(self.sharenum):
+            d1 = self.bucket.get_share_hashes()
+        else:
+            d1 = defer.succeed([])
+
+        # We might need to grab some elements of our block hash tree, to
+        # validate the requested block up to the share hash.
+        blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
+        # We don't need the root of the block hash tree, as that comes in the
+        # share tree.
+        blockhashesneeded.discard(0)
+        d2 = self.bucket.get_block_hashes(blockhashesneeded)
+
+        if blocknum < self.num_blocks-1:
+            thisblocksize = self.block_size
+        else:
+            thisblocksize = self.share_size % self.block_size
+            if thisblocksize == 0:
+                thisblocksize = self.block_size
+        d3 = self.bucket.get_block_data(blocknum,
+                                        self.block_size, thisblocksize)
+
+        dl = deferredutil.gatherResults([d1, d2, d3])
+        dl.addCallback(self._got_data, blocknum)
+        return dl
+
+    def _got_data(self, results, blocknum):
+        precondition(blocknum < self.num_blocks,
+                     self, blocknum, self.num_blocks)
+        sharehashes, blockhashes, blockdata = results
+        try:
+            sharehashes = dict(sharehashes)
+        except ValueError, le:
+            le.args = tuple(le.args + (sharehashes,))
+            raise
+        blockhashes = dict(enumerate(blockhashes))
+
+        candidate_share_hash = None # in case we log it in the except block below
+        blockhash = None # in case we log it in the except block below
+
+        try:
+            if self.share_hash_tree.needed_hashes(self.sharenum):
+                # This will raise exception if the values being passed do not
+                # match the root node of self.share_hash_tree.
+                try:
+                    self.share_hash_tree.set_hashes(sharehashes)
+                except IndexError, le:
+                    # Weird -- sharehashes contained index numbers outside of
+                    # the range that fit into this hash tree.
+                    raise BadOrMissingHash(le)
+
+            # To validate a block we need the root of the block hash tree,
+            # which is also one of the leafs of the share hash tree, and is
+            # called "the share hash".
+            if not self.block_hash_tree[0]: # empty -- no root node yet
+                # Get the share hash from the share hash tree.
+                share_hash = self.share_hash_tree.get_leaf(self.sharenum)
+                if not share_hash:
+                    # No root node in block_hash_tree and also the share hash
+                    # wasn't sent by the server.
+                    raise hashtree.NotEnoughHashesError
+                self.block_hash_tree.set_hashes({0: share_hash})
+
+            if self.block_hash_tree.needed_hashes(blocknum):
+                self.block_hash_tree.set_hashes(blockhashes)
+
+            blockhash = block_hash(blockdata)
+            self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
+            #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
+            #        "%r .. %r: %s" %
+            #        (self.sharenum, blocknum, len(blockdata),
+            #         blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
+
+        except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
+            # log.WEIRD: indicates undetected disk/network error, or more
+            # likely a programming error
+            self.log("hash failure in block=%d, shnum=%d on %s" %
+                    (blocknum, self.sharenum, self.bucket))
+            if self.block_hash_tree.needed_hashes(blocknum):
+                self.log(""" failure occurred when checking the block_hash_tree.
+                This suggests that either the block data was bad, or that the
+                block hashes we received along with it were bad.""")
+            else:
+                self.log(""" the failure probably occurred when checking the
+                share_hash_tree, which suggests that the share hashes we
+                received from the remote peer were bad.""")
+            self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
+            self.log(" block length: %d" % len(blockdata))
+            self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
+            if len(blockdata) < 100:
+                self.log(" block data: %r" % (blockdata,))
+            else:
+                self.log(" block data start/end: %r .. %r" %
+                        (blockdata[:50], blockdata[-50:]))
+            self.log(" share hash tree:\n" + self.share_hash_tree.dump())
+            self.log(" block hash tree:\n" + self.block_hash_tree.dump())
+            lines = []
+            for i,h in sorted(sharehashes.items()):
+                lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
+            self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
+            lines = []
+            for i,h in blockhashes.items():
+                lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
+            log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
+            raise BadOrMissingHash(le)
+
+        # If we made it here, the block is good. If the hash trees didn't
+        # like what they saw, they would have raised a BadHashError, causing
+        # our caller to see a Failure and thus ignore this block (as well as
+        # dropping this bucket).
+        return blockdata
+
+
 class Checker(log.PrefixingLogMixin):
     """I query all servers to see if M uniquely-numbered shares are
     available.
@@ -85,7 +513,9 @@ class Checker(log.PrefixingLogMixin):
             level = log.WEIRD
             if f.check(DeadReferenceError):
                 level = log.UNUSUAL
-            self.log("failure from server on 'get_buckets' the REMOTE failure was:", facility="tahoe.immutable.checker", failure=f, level=level, umid="3uuBUQ")
+            self.log("failure from server on 'get_buckets' the REMOTE failure was:",
+                     facility="tahoe.immutable.checker",
+                     failure=f, level=level, umid="AX7wZQ")
             return ({}, serverid, False)
 
         d.addCallbacks(_wrap_results, _trap_errs)
@@ -146,18 +576,18 @@ class Checker(log.PrefixingLogMixin):
 
         vcap = self._verifycap
         b = layout.ReadBucketProxy(bucket, serverid, vcap.get_storage_index())
-        veup = download.ValidatedExtendedURIProxy(b, vcap)
+        veup = ValidatedExtendedURIProxy(b, vcap)
         d = veup.start()
 
         def _got_ueb(vup):
             share_hash_tree = IncompleteHashTree(vcap.total_shares)
             share_hash_tree.set_hashes({0: vup.share_root_hash})
 
-            vrbp = download.ValidatedReadBucketProxy(sharenum, b,
-                                                     share_hash_tree,
-                                                     vup.num_segments,
-                                                     vup.block_size,
-                                                     vup.share_size)
+            vrbp = ValidatedReadBucketProxy(sharenum, b,
+                                            share_hash_tree,
+                                            vup.num_segments,
+                                            vup.block_size,
+                                            vup.share_size)
 
             # note: normal download doesn't use get_all_sharehashes(),
             # because it gets more data than necessary. We've discussed the
@@ -216,8 +646,8 @@ class Checker(log.PrefixingLogMixin):
                 return (False, sharenum, 'incompatible')
             elif f.check(layout.LayoutInvalid,
                          layout.RidiculouslyLargeURIExtensionBlock,
-                         download.BadOrMissingHash,
-                         download.BadURIExtensionHashValue):
+                         BadOrMissingHash,
+                         BadURIExtensionHashValue):
                 return (False, sharenum, 'corrupt')
 
             # if it wasn't one of those reasons, re-raise the error
index 70044a7ddeb9120d1d930221ff62200be0967f62..1d5be94867bce4a639f8e203172163530e274db7 100644 (file)
-import copy, os.path, stat
-from cStringIO import StringIO
+
+import binascii
+import copy
+import time
+now = time.time
 from zope.interface import implements
 from twisted.internet import defer
-from twisted.internet.interfaces import IPushProducer
-from twisted.protocols import basic
-from foolscap.api import eventually
-from allmydata.interfaces import IImmutableFileNode, ICheckable, \
-     IDownloadTarget, IUploadResults
-from allmydata.util import dictutil, log, base32
-from allmydata.uri import CHKFileURI, LiteralFileURI
-from allmydata.immutable.checker import Checker
-from allmydata.check_results import CheckResults, CheckAndRepairResults
-from allmydata.immutable.repairer import Repairer
-from allmydata.immutable import download
-
-class _ImmutableFileNodeBase(object):
-    implements(IImmutableFileNode, ICheckable)
-
-    def get_write_uri(self):
-        return None
-
-    def get_readonly_uri(self):
-        return self.get_uri()
-
-    def is_mutable(self):
-        return False
-
-    def is_readonly(self):
-        return True
-
-    def is_unknown(self):
-        return False
-
-    def is_allowed_in_immutable_directory(self):
-        return True
-
-    def raise_error(self):
-        pass
-
-    def __hash__(self):
-        return self.u.__hash__()
-    def __eq__(self, other):
-        if isinstance(other, _ImmutableFileNodeBase):
-            return self.u.__eq__(other.u)
-        else:
-            return False
-    def __ne__(self, other):
-        if isinstance(other, _ImmutableFileNodeBase):
-            return self.u.__eq__(other.u)
-        else:
-            return True
-
-class PortionOfFile:
-    # like a list slice (things[2:14]), but for a file on disk
-    def __init__(self, fn, offset=0, size=None):
-        self.f = open(fn, "rb")
-        self.f.seek(offset)
-        self.bytes_left = size
-
-    def read(self, size=None):
-        # bytes_to_read = min(size, self.bytes_left), but None>anything
-        if size is None:
-            bytes_to_read = self.bytes_left
-        elif self.bytes_left is None:
-            bytes_to_read = size
-        else:
-            bytes_to_read = min(size, self.bytes_left)
-        data = self.f.read(bytes_to_read)
-        if self.bytes_left is not None:
-            self.bytes_left -= len(data)
-        return data
-
-class DownloadCache:
-    implements(IDownloadTarget)
-
-    def __init__(self, filecap, storage_index, downloader,
-                 cachedirectorymanager):
-        self._downloader = downloader
-        self._uri = filecap
-        self._storage_index = storage_index
-        self.milestones = set() # of (offset,size,Deferred)
-        self.cachedirectorymanager = cachedirectorymanager
-        self.cachefile = None
-        self.download_in_progress = False
-        # five states:
-        #  new ImmutableFileNode, no downloads ever performed
-        #  new ImmutableFileNode, leftover file (partial)
-        #  new ImmutableFileNode, leftover file (whole)
-        #  download in progress, not yet complete
-        #  download complete
-
-    def when_range_available(self, offset, size):
-        assert isinstance(offset, (int,long))
-        assert isinstance(size, (int,long))
-
-        d = defer.Deferred()
-        self.milestones.add( (offset,size,d) )
-        self._check_milestones()
-        if self.milestones and not self.download_in_progress:
-            self.download_in_progress = True
-            log.msg(format=("immutable filenode read [%(si)s]: " +
-                            "starting download"),
-                    si=base32.b2a(self._storage_index),
-                    umid="h26Heg", level=log.OPERATIONAL)
-            d2 = self._downloader.download(self._uri, self)
-            d2.addBoth(self._download_done)
-            d2.addErrback(self._download_failed)
-            d2.addErrback(log.err, umid="cQaM9g")
-        return d
-
-    def read(self, consumer, offset, size):
-        assert offset+size <= self.get_filesize()
-        if not self.cachefile:
-            self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
-        f = PortionOfFile(self.cachefile.get_filename(), offset, size)
-        d = basic.FileSender().beginFileTransfer(f, consumer)
-        d.addCallback(lambda lastSent: consumer)
-        return d
-
-    def _download_done(self, res):
-        # clear download_in_progress, so failed downloads can be re-tried
-        self.download_in_progress = False
-        return res
-
-    def _download_failed(self, f):
-        # tell anyone who's waiting that we failed
-        for m in self.milestones:
-            (offset,size,d) = m
-            eventually(d.errback, f)
-        self.milestones.clear()
-
-    def _check_milestones(self):
-        current_size = self.get_filesize()
-        for m in list(self.milestones):
-            (offset,size,d) = m
-            if offset+size <= current_size:
-                log.msg(format=("immutable filenode read [%(si)s] " +
-                                "%(offset)d+%(size)d vs %(filesize)d: " +
-                                "done"),
-                        si=base32.b2a(self._storage_index),
-                        offset=offset, size=size, filesize=current_size,
-                        umid="nuedUg", level=log.NOISY)
-                self.milestones.discard(m)
-                eventually(d.callback, None)
-            else:
-                log.msg(format=("immutable filenode read [%(si)s] " +
-                                "%(offset)d+%(size)d vs %(filesize)d: " +
-                                "still waiting"),
-                        si=base32.b2a(self._storage_index),
-                        offset=offset, size=size, filesize=current_size,
-                        umid="8PKOhg", level=log.NOISY)
-
-    def get_filesize(self):
-        if not self.cachefile:
-            self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
-        try:
-            filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE]
-        except OSError:
-            filesize = 0
-        return filesize
-
-
-    def open(self, size):
-        if not self.cachefile:
-            self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
-        self.f = open(self.cachefile.get_filename(), "wb")
-
-    def write(self, data):
-        self.f.write(data)
-        self._check_milestones()
-
-    def close(self):
-        self.f.close()
-        self._check_milestones()
-
-    def fail(self, why):
-        pass
-    def register_canceller(self, cb):
-        pass
-    def finish(self):
-        return None
-    # The following methods are just because the target might be a
-    # repairer.DownUpConnector, and just because the current CHKUpload object
-    # expects to find the storage index and encoding parameters in its
-    # Uploadable.
-    def set_storageindex(self, storageindex):
-        pass
-    def set_encodingparams(self, encodingparams):
-        pass
+from twisted.internet.interfaces import IConsumer
 
+from allmydata.interfaces import IImmutableFileNode, IUploadResults
+from allmydata import uri
+from allmydata.check_results import CheckResults, CheckAndRepairResults
+from allmydata.util.dictutil import DictOfSets
+from pycryptopp.cipher.aes import AES
 
-class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
-    def __init__(self, filecap, storage_broker, secret_holder,
-                 downloader, history, cachedirectorymanager):
-        assert isinstance(filecap, CHKFileURI)
-        self.u = filecap
+# local imports
+from allmydata.immutable.checker import Checker
+from allmydata.immutable.repairer import Repairer
+from allmydata.immutable.downloader.node import DownloadNode
+from allmydata.immutable.downloader.status import DownloadStatus
+
+class CiphertextFileNode:
+    def __init__(self, verifycap, storage_broker, secret_holder,
+                 terminator, history, download_status=None):
+        assert isinstance(verifycap, uri.CHKFileVerifierURI)
+        self._verifycap = verifycap
         self._storage_broker = storage_broker
         self._secret_holder = secret_holder
-        self._downloader = downloader
-        self._history = history
-        storage_index = self.get_storage_index()
-        self.download_cache = DownloadCache(filecap, storage_index, downloader,
-                                            cachedirectorymanager)
-        prefix = self.u.get_verify_cap().to_string()
-        log.PrefixingLogMixin.__init__(self, "allmydata.immutable.filenode", prefix=prefix)
-        self.log("starting", level=log.OPERATIONAL)
+        if download_status is None:
+            ds = DownloadStatus(verifycap.storage_index, verifycap.size)
+            if history:
+                history.add_download(ds)
+            download_status = ds
+        self._node = DownloadNode(verifycap, storage_broker, secret_holder,
+                                  terminator, history, download_status)
+
+    def read(self, consumer, offset=0, size=None, read_ev=None):
+        """I am the main entry point, from which FileNode.read() can get
+        data. I feed the consumer with the desired range of ciphertext. I
+        return a Deferred that fires (with the consumer) when the read is
+        finished."""
+        return self._node.read(consumer, offset, size, read_ev)
+
+    def get_segment(self, segnum):
+        """Begin downloading a segment. I return a tuple (d, c): 'd' is a
+        Deferred that fires with (offset,data) when the desired segment is
+        available, and c is an object on which c.cancel() can be called to
+        disavow interest in the segment (after which 'd' will never fire).
+
+        You probably need to know the segment size before calling this,
+        unless you want the first few bytes of the file. If you ask for a
+        segment number which turns out to be too large, the Deferred will
+        errback with BadSegmentNumberError.
+
+        The Deferred fires with the offset of the first byte of the data
+        segment, so that you can call get_segment() before knowing the
+        segment size, and still know which data you received.
+        """
+        return self._node.get_segment(segnum)
+
+    def get_segment_size(self):
+        # return a Deferred that fires with the file's real segment size
+        return self._node.get_segsize()
 
-    def get_size(self):
-        return self.u.get_size()
-    def get_current_size(self):
-        return defer.succeed(self.get_size())
-
-    def get_cap(self):
-        return self.u
-    def get_readcap(self):
-        return self.u.get_readonly()
+    def get_storage_index(self):
+        return self._verifycap.storage_index
     def get_verify_cap(self):
-        return self.u.get_verify_cap()
-    def get_repair_cap(self):
-        # CHK files can be repaired with just the verifycap
-        return self.u.get_verify_cap()
+        return self._verifycap
+    def get_size(self):
+        return self._verifycap.size
 
-    def get_uri(self):
-        return self.u.to_string()
+    def raise_error(self):
+        pass
 
-    def get_storage_index(self):
-        return self.u.get_storage_index()
 
     def check_and_repair(self, monitor, verify=False, add_lease=False):
-        verifycap = self.get_verify_cap()
+        verifycap = self._verifycap
+        storage_index = verifycap.storage_index
         sb = self._storage_broker
         servers = sb.get_all_servers()
         sh = self._secret_holder
@@ -238,7 +85,7 @@ class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
                     monitor=monitor)
         d = c.start()
         def _maybe_repair(cr):
-            crr = CheckAndRepairResults(self.u.get_storage_index())
+            crr = CheckAndRepairResults(storage_index)
             crr.pre_repair_results = cr
             if cr.is_healthy():
                 crr.post_repair_results = cr
@@ -248,24 +95,25 @@ class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
                 crr.repair_successful = False # until proven successful
                 def _gather_repair_results(ur):
                     assert IUploadResults.providedBy(ur), ur
-                    # clone the cr -- check results to form the basic of the prr -- post-repair results
+                    # clone the cr (check results) to form the basis of the
+                    # prr (post-repair results)
                     prr = CheckResults(cr.uri, cr.storage_index)
                     prr.data = copy.deepcopy(cr.data)
 
                     sm = prr.data['sharemap']
-                    assert isinstance(sm, dictutil.DictOfSets), sm
+                    assert isinstance(sm, DictOfSets), sm
                     sm.update(ur.sharemap)
                     servers_responding = set(prr.data['servers-responding'])
                     servers_responding.union(ur.sharemap.iterkeys())
                     prr.data['servers-responding'] = list(servers_responding)
                     prr.data['count-shares-good'] = len(sm)
                     prr.data['count-good-share-hosts'] = len(sm)
-                    is_healthy = bool(len(sm) >= self.u.total_shares)
-                    is_recoverable = bool(len(sm) >= self.u.needed_shares)
+                    is_healthy = bool(len(sm) >= verifycap.total_shares)
+                    is_recoverable = bool(len(sm) >= verifycap.needed_shares)
                     prr.set_healthy(is_healthy)
                     prr.set_recoverable(is_recoverable)
                     crr.repair_successful = is_healthy
-                    prr.set_needs_rebalancing(len(sm) >= self.u.total_shares)
+                    prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
 
                     crr.post_repair_results = prr
                     return crr
@@ -275,8 +123,8 @@ class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
                     crr.repair_successful = False
                     crr.repair_failure = f
                     return f
-                r = Repairer(storage_broker=sb, secret_holder=sh,
-                             verifycap=verifycap, monitor=monitor)
+                r = Repairer(self, storage_broker=sb, secret_holder=sh,
+                             monitor=monitor)
                 d = r.start()
                 d.addCallbacks(_gather_repair_results, _repair_error)
                 return d
@@ -285,7 +133,7 @@ class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
         return d
 
     def check(self, monitor, verify=False, add_lease=False):
-        verifycap = self.get_verify_cap()
+        verifycap = self._verifycap
         sb = self._storage_broker
         servers = sb.get_all_servers()
         sh = self._secret_holder
@@ -295,81 +143,130 @@ class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
                     monitor=monitor)
         return v.start()
 
+
+class DecryptingConsumer:
+    """I sit between a CiphertextDownloader (which acts as a Producer) and
+    the real Consumer, decrypting everything that passes by. The real
+    Consumer sees the real Producer, but the Producer sees us instead of the
+    real consumer."""
+    implements(IConsumer)
+
+    def __init__(self, consumer, readkey, offset, read_event):
+        self._consumer = consumer
+        self._read_event = read_event
+        # TODO: pycryptopp CTR-mode needs random-access operations: I want
+        # either a=AES(readkey, offset) or better yet both of:
+        #  a=AES(readkey, offset=0)
+        #  a.process(ciphertext, offset=xyz)
+        # For now, we fake it with the existing iv= argument.
+        offset_big = offset // 16
+        offset_small = offset % 16
+        iv = binascii.unhexlify("%032x" % offset_big)
+        self._decryptor = AES(readkey, iv=iv)
+        self._decryptor.process("\x00"*offset_small)
+
+    def registerProducer(self, producer, streaming):
+        # this passes through, so the real consumer can flow-control the real
+        # producer. Therefore we don't need to provide any IPushProducer
+        # methods. We implement all the IConsumer methods as pass-throughs,
+        # and only intercept write() to perform decryption.
+        self._consumer.registerProducer(producer, streaming)
+    def unregisterProducer(self):
+        self._consumer.unregisterProducer()
+    def write(self, ciphertext):
+        started = now()
+        plaintext = self._decryptor.process(ciphertext)
+        elapsed = now() - started
+        self._read_event.update(0, elapsed, 0)
+        self._consumer.write(plaintext)
+
+class ImmutableFileNode:
+    implements(IImmutableFileNode)
+
+    # I wrap a CiphertextFileNode with a decryption key
+    def __init__(self, filecap, storage_broker, secret_holder, terminator,
+                 history):
+        assert isinstance(filecap, uri.CHKFileURI)
+        verifycap = filecap.get_verify_cap()
+        ds = DownloadStatus(verifycap.storage_index, verifycap.size)
+        if history:
+            history.add_download(ds)
+        self._download_status = ds
+        self._cnode = CiphertextFileNode(verifycap, storage_broker,
+                                         secret_holder, terminator, history, ds)
+        assert isinstance(filecap, uri.CHKFileURI)
+        self.u = filecap
+        self._readkey = filecap.key
+
+    # TODO: I'm not sure about this.. what's the use case for node==node? If
+    # we keep it here, we should also put this on CiphertextFileNode
+    def __hash__(self):
+        return self.u.__hash__()
+    def __eq__(self, other):
+        if isinstance(other, ImmutableFileNode):
+            return self.u.__eq__(other.u)
+        else:
+            return False
+    def __ne__(self, other):
+        if isinstance(other, ImmutableFileNode):
+            return self.u.__eq__(other.u)
+        else:
+            return True
+
     def read(self, consumer, offset=0, size=None):
-        self.log("read", offset=offset, size=size,
-                 umid="UPP8FA", level=log.OPERATIONAL)
-        if size is None:
-            size = self.get_size() - offset
-        size = min(size, self.get_size() - offset)
-
-        if offset == 0 and size == self.get_size():
-            # don't use the cache, just do a normal streaming download
-            self.log("doing normal full download", umid="VRSBwg", level=log.OPERATIONAL)
-            target = download.ConsumerAdapter(consumer)
-            return self._downloader.download(self.get_cap(), target,
-                                             self._parentmsgid,
-                                             history=self._history)
-
-        d = self.download_cache.when_range_available(offset, size)
-        d.addCallback(lambda res:
-                      self.download_cache.read(consumer, offset, size))
+        actual_size = size
+        if actual_size == None:
+            actual_size = self.u.size
+        actual_size = actual_size - offset
+        read_ev = self._download_status.add_read_event(offset,actual_size,
+                                                       now())
+        decryptor = DecryptingConsumer(consumer, self._readkey, offset, read_ev)
+        d = self._cnode.read(decryptor, offset, size, read_ev)
+        d.addCallback(lambda dc: consumer)
         return d
 
-class LiteralProducer:
-    implements(IPushProducer)
-    def resumeProducing(self):
-        pass
-    def stopProducing(self):
+    def raise_error(self):
         pass
 
+    def get_write_uri(self):
+        return None
 
-class LiteralFileNode(_ImmutableFileNodeBase):
-
-    def __init__(self, filecap):
-        assert isinstance(filecap, LiteralFileURI)
-        self.u = filecap
-
-    def get_size(self):
-        return len(self.u.data)
-    def get_current_size(self):
-        return defer.succeed(self.get_size())
+    def get_readonly_uri(self):
+        return self.get_uri()
 
+    def get_uri(self):
+        return self.u.to_string()
     def get_cap(self):
         return self.u
     def get_readcap(self):
-        return self.u
+        return self.u.get_readonly()
     def get_verify_cap(self):
-        return None
+        return self.u.get_verify_cap()
     def get_repair_cap(self):
-        return None
-
-    def get_uri(self):
-        return self.u.to_string()
+        # CHK files can be repaired with just the verifycap
+        return self.u.get_verify_cap()
 
     def get_storage_index(self):
-        return None
+        return self.u.get_storage_index()
 
-    def check(self, monitor, verify=False, add_lease=False):
-        return defer.succeed(None)
+    def get_size(self):
+        return self.u.get_size()
+    def get_current_size(self):
+        return defer.succeed(self.get_size())
 
-    def check_and_repair(self, monitor, verify=False, add_lease=False):
-        return defer.succeed(None)
+    def is_mutable(self):
+        return False
 
-    def read(self, consumer, offset=0, size=None):
-        if size is None:
-            data = self.u.data[offset:]
-        else:
-            data = self.u.data[offset:offset+size]
-
-        # We use twisted.protocols.basic.FileSender, which only does
-        # non-streaming, i.e. PullProducer, where the receiver/consumer must
-        # ask explicitly for each chunk of data. There are only two places in
-        # the Twisted codebase that can't handle streaming=False, both of
-        # which are in the upload path for an FTP/SFTP server
-        # (protocols.ftp.FileConsumer and
-        # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
-        # likely to be used as the target for a Tahoe download.
-
-        d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
-        d.addCallback(lambda lastSent: consumer)
-        return d
+    def is_readonly(self):
+        return True
+
+    def is_unknown(self):
+        return False
+
+    def is_allowed_in_immutable_directory(self):
+        return True
+
+    def check_and_repair(self, monitor, verify=False, add_lease=False):
+        return self._cnode.check_and_repair(monitor, verify, add_lease)
+    def check(self, monitor, verify=False, add_lease=False):
+        return self._cnode.check(monitor, verify, add_lease)
index 6e07da7be2e9bc3f092d417e3cc7a16fe48c153c..27fb84450abba9723d874b2fc2e1ffadab9dcf0e 100644 (file)
@@ -74,12 +74,16 @@ limitations described in #346.
 # they are still provided when writing so that older versions of Tahoe can
 # read them.
 
+FORCE_V2 = False # set briefly by unit tests to make small-sized V2 shares
+
 def make_write_bucket_proxy(rref, data_size, block_size, num_segments,
                             num_share_hashes, uri_extension_size_max, nodeid):
     # Use layout v1 for small files, so they'll be readable by older versions
     # (<tahoe-1.3.0). Use layout v2 for large files; they'll only be readable
     # by tahoe-1.3.0 or later.
     try:
+        if FORCE_V2:
+            raise FileTooLargeError
         wbp = WriteBucketProxy(rref, data_size, block_size, num_segments,
                                num_share_hashes, uri_extension_size_max, nodeid)
     except FileTooLargeError:
diff --git a/src/allmydata/immutable/literal.py b/src/allmydata/immutable/literal.py
new file mode 100644 (file)
index 0000000..09466cb
--- /dev/null
@@ -0,0 +1,104 @@
+from cStringIO import StringIO
+from zope.interface import implements
+from twisted.internet import defer
+from twisted.internet.interfaces import IPushProducer
+from twisted.protocols import basic
+from allmydata.interfaces import IImmutableFileNode, ICheckable
+from allmydata.uri import LiteralFileURI
+
+class _ImmutableFileNodeBase(object):
+    implements(IImmutableFileNode, ICheckable)
+
+    def get_write_uri(self):
+        return None
+
+    def get_readonly_uri(self):
+        return self.get_uri()
+
+    def is_mutable(self):
+        return False
+
+    def is_readonly(self):
+        return True
+
+    def is_unknown(self):
+        return False
+
+    def is_allowed_in_immutable_directory(self):
+        return True
+
+    def raise_error(self):
+        pass
+
+    def __hash__(self):
+        return self.u.__hash__()
+    def __eq__(self, other):
+        if isinstance(other, _ImmutableFileNodeBase):
+            return self.u.__eq__(other.u)
+        else:
+            return False
+    def __ne__(self, other):
+        if isinstance(other, _ImmutableFileNodeBase):
+            return self.u.__eq__(other.u)
+        else:
+            return True
+
+
+class LiteralProducer:
+    implements(IPushProducer)
+    def resumeProducing(self):
+        pass
+    def stopProducing(self):
+        pass
+
+
+class LiteralFileNode(_ImmutableFileNodeBase):
+
+    def __init__(self, filecap):
+        assert isinstance(filecap, LiteralFileURI)
+        self.u = filecap
+
+    def get_size(self):
+        return len(self.u.data)
+    def get_current_size(self):
+        return defer.succeed(self.get_size())
+
+    def get_cap(self):
+        return self.u
+    def get_readcap(self):
+        return self.u
+    def get_verify_cap(self):
+        return None
+    def get_repair_cap(self):
+        return None
+
+    def get_uri(self):
+        return self.u.to_string()
+
+    def get_storage_index(self):
+        return None
+
+    def check(self, monitor, verify=False, add_lease=False):
+        return defer.succeed(None)
+
+    def check_and_repair(self, monitor, verify=False, add_lease=False):
+        return defer.succeed(None)
+
+    def read(self, consumer, offset=0, size=None):
+        if size is None:
+            data = self.u.data[offset:]
+        else:
+            data = self.u.data[offset:offset+size]
+
+        # We use twisted.protocols.basic.FileSender, which only does
+        # non-streaming, i.e. PullProducer, where the receiver/consumer must
+        # ask explicitly for each chunk of data. There are only two places in
+        # the Twisted codebase that can't handle streaming=False, both of
+        # which are in the upload path for an FTP/SFTP server
+        # (protocols.ftp.FileConsumer and
+        # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
+        # likely to be used as the target for a Tahoe download.
+
+        d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
+        d.addCallback(lambda lastSent: consumer)
+        return d
index fa6a604ef41f13edc076476f94d6ce12244859ce..64fb9a1986b94c41ce4dcd8beefc2b4039fb1b98 100644 (file)
@@ -1,17 +1,14 @@
 from zope.interface import implements
 from twisted.internet import defer
 from allmydata.storage.server import si_b2a
-from allmydata.util import log, observer
-from allmydata.util.assertutil import precondition, _assert
-from allmydata.uri import CHKFileVerifierURI
-from allmydata.interfaces import IEncryptedUploadable, IDownloadTarget
-from twisted.internet.interfaces import IConsumer
+from allmydata.util import log, consumer
+from allmydata.util.assertutil import precondition
+from allmydata.interfaces import IEncryptedUploadable
 
-from allmydata.immutable import download, upload
-
-import collections
+from allmydata.immutable import upload
 
 class Repairer(log.PrefixingLogMixin):
+    implements(IEncryptedUploadable)
     """I generate any shares which were not available and upload them to
     servers.
 
@@ -43,195 +40,51 @@ class Repairer(log.PrefixingLogMixin):
     cancelled (by invoking its raise_if_cancelled() method).
     """
 
-    def __init__(self, storage_broker, secret_holder, verifycap, monitor):
-        assert precondition(isinstance(verifycap, CHKFileVerifierURI))
-
-        logprefix = si_b2a(verifycap.get_storage_index())[:5]
+    def __init__(self, filenode, storage_broker, secret_holder, monitor):
+        logprefix = si_b2a(filenode.get_storage_index())[:5]
         log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer",
                                        prefix=logprefix)
-
+        self._filenode = filenode
         self._storage_broker = storage_broker
         self._secret_holder = secret_holder
-        self._verifycap = verifycap
         self._monitor = monitor
+        self._offset = 0
 
     def start(self):
         self.log("starting repair")
-        duc = DownUpConnector()
-        dl = download.CiphertextDownloader(self._storage_broker,
-                                           self._verifycap, target=duc,
-                                           monitor=self._monitor)
-        ul = upload.CHKUploader(self._storage_broker, self._secret_holder)
-
-        d = defer.Deferred()
-
-        # If the upload or the download fails or is stopped, then the repair
-        # failed.
-        def _errb(f):
-            d.errback(f)
-            return None
-
-        # If the upload succeeds, then the repair has succeeded.
-        def _cb(res):
-            d.callback(res)
-        ul.start(duc).addCallbacks(_cb, _errb)
-
-        # If the download fails or is stopped, then the repair failed.
-        d2 = dl.start()
-        d2.addErrback(_errb)
-
-        # We ignore the callback from d2.  Is this right?  Ugh.
-
+        d = self._filenode.get_segment_size()
+        def _got_segsize(segsize):
+            vcap = self._filenode.get_verify_cap()
+            k = vcap.needed_shares
+            N = vcap.total_shares
+            happy = upload.BaseUploadable.default_encoding_param_happy
+            self._encodingparams = (k, happy, N, segsize)
+            ul = upload.CHKUploader(self._storage_broker, self._secret_holder)
+            return ul.start(self) # I am the IEncryptedUploadable
+        d.addCallback(_got_segsize)
         return d
 
-class DownUpConnector(log.PrefixingLogMixin):
-    implements(IEncryptedUploadable, IDownloadTarget, IConsumer)
-    """I act like an 'encrypted uploadable' -- something that a local
-    uploader can read ciphertext from in order to upload the ciphertext.
-    However, unbeknownst to the uploader, I actually download the ciphertext
-    from a CiphertextDownloader instance as it is needed.
-
-    On the other hand, I act like a 'download target' -- something that a
-    local downloader can write ciphertext to as it downloads the ciphertext.
-    That downloader doesn't realize, of course, that I'm just turning around
-    and giving the ciphertext to the uploader."""
-
-    # The theory behind this class is nice: just satisfy two separate
-    # interfaces. The implementation is slightly horrible, because of
-    # "impedance mismatch" -- the downloader expects to be able to
-    # synchronously push data in, and the uploader expects to be able to read
-    # data out with a "read(THIS_SPECIFIC_LENGTH)" which returns a deferred.
-    # The two interfaces have different APIs for pausing/unpausing. The
-    # uploader requests metadata like size and encodingparams which the
-    # downloader provides either eventually or not at all (okay I just now
-    # extended the downloader to provide encodingparams). Most of this
-    # slightly horrible code would disappear if CiphertextDownloader just
-    # used this object as an IConsumer (plus maybe a couple of other methods)
-    # and if the Uploader simply expected to be treated as an IConsumer (plus
-    # maybe a couple of other things).
-
-    def __init__(self, buflim=2**19):
-        """If we're already holding at least buflim bytes, then tell the
-        downloader to pause until we have less than buflim bytes."""
-        log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer")
-        self.buflim = buflim
-        self.bufs = collections.deque() # list of strings
-        self.bufsiz = 0 # how many bytes total in bufs
-
-        # list of deferreds which will fire with the requested ciphertext
-        self.next_read_ds = collections.deque()
-
-        # how many bytes of ciphertext were requested by each deferred
-        self.next_read_lens = collections.deque()
-
-        self._size_osol = observer.OneShotObserverList()
-        self._encodingparams_osol = observer.OneShotObserverList()
-        self._storageindex_osol = observer.OneShotObserverList()
-        self._closed_to_pusher = False
-
-        # once seg size is available, the following attribute will be created
-        # to hold it:
-
-        # self.encodingparams # (provided by the object which is pushing data
-        # into me, required by the object which is pulling data out of me)
-
-        # open() will create the following attribute:
-        # self.size # size of the whole file (provided by the object which is
-        # pushing data into me, required by the object which is pulling data
-        # out of me)
-
-        # set_upload_status() will create the following attribute:
-
-        # self.upload_status # XXX do we need to actually update this? Is
-        # anybody watching the results during a repair?
-
-    def _satisfy_reads_if_possible(self):
-        assert bool(self.next_read_ds) == bool(self.next_read_lens)
-        while self.next_read_ds and ((self.bufsiz >= self.next_read_lens[0])
-                                     or self._closed_to_pusher):
-            nrd = self.next_read_ds.popleft()
-            nrl = self.next_read_lens.popleft()
-
-            # Pick out the requested number of bytes from self.bufs, turn it
-            # into a string, and callback the deferred with that.
-            res = []
-            ressize = 0
-            while ressize < nrl and self.bufs:
-                nextbuf = self.bufs.popleft()
-                res.append(nextbuf)
-                ressize += len(nextbuf)
-                if ressize > nrl:
-                    extra = ressize - nrl
-                    self.bufs.appendleft(nextbuf[:-extra])
-                    res[-1] = nextbuf[:-extra]
-            assert _assert(sum(len(x) for x in res) <= nrl, [len(x) for x in res], nrl)
-            assert _assert(sum(len(x) for x in res) == nrl or self._closed_to_pusher, [len(x) for x in res], nrl)
-            self.bufsiz -= nrl
-            if self.bufsiz < self.buflim and self.producer:
-                self.producer.resumeProducing()
-            nrd.callback(res)
-
-    # methods to satisfy the IConsumer and IDownloadTarget interfaces. (From
-    # the perspective of a downloader I am an IDownloadTarget and an
-    # IConsumer.)
-    def registerProducer(self, producer, streaming):
-        assert streaming # We know how to handle only streaming producers.
-        self.producer = producer # the downloader
-    def unregisterProducer(self):
-        self.producer = None
-    def open(self, size):
-        self.size = size
-        self._size_osol.fire(self.size)
-    def set_encodingparams(self, encodingparams):
-        self.encodingparams = encodingparams
-        self._encodingparams_osol.fire(self.encodingparams)
-    def set_storageindex(self, storageindex):
-        self.storageindex = storageindex
-        self._storageindex_osol.fire(self.storageindex)
-    def write(self, data):
-        precondition(data) # please don't write empty strings
-        self.bufs.append(data)
-        self.bufsiz += len(data)
-        self._satisfy_reads_if_possible()
-        if self.bufsiz >= self.buflim and self.producer:
-            self.producer.pauseProducing()
-    def finish(self):
-        pass
-    def close(self):
-        self._closed_to_pusher = True
-        # Any reads which haven't been satisfied by now are going to
-        # have to be satisfied with short reads.
-        self._satisfy_reads_if_possible()
 
     # methods to satisfy the IEncryptedUploader interface
     # (From the perspective of an uploader I am an IEncryptedUploadable.)
     def set_upload_status(self, upload_status):
         self.upload_status = upload_status
     def get_size(self):
-        if hasattr(self, 'size'): # attribute created by self.open()
-            return defer.succeed(self.size)
-        else:
-            return self._size_osol.when_fired()
+        size = self._filenode.get_size()
+        assert size is not None
+        return defer.succeed(size)
     def get_all_encoding_parameters(self):
-        # We have to learn the encoding params from pusher.
-        if hasattr(self, 'encodingparams'):
-            # attribute created by self.set_encodingparams()
-            return defer.succeed(self.encodingparams)
-        else:
-            return self._encodingparams_osol.when_fired()
+        return defer.succeed(self._encodingparams)
     def read_encrypted(self, length, hash_only):
-        """Returns a deferred which eventually fired with the requested
-        ciphertext."""
+        """Returns a deferred which eventually fires with the requested
+        ciphertext, as a list of strings."""
         precondition(length) # please don't ask to read 0 bytes
-        d = defer.Deferred()
-        self.next_read_ds.append(d)
-        self.next_read_lens.append(length)
-        self._satisfy_reads_if_possible()
+        mc = consumer.MemoryConsumer()
+        d = self._filenode.read(mc, self._offset, length)
+        self._offset += length
+        d.addCallback(lambda ign: mc.chunks)
         return d
     def get_storage_index(self):
-        # We have to learn the storage index from pusher.
-        if hasattr(self, 'storageindex'):
-            # attribute created by self.set_storageindex()
-            return defer.succeed(self.storageindex)
-        else:
-            return self._storageindex.when_fired()
+        return self._filenode.get_storage_index()
+    def close(self):
+        pass
index f0d3b0bb05d81a43f2da2d34a1d9a700b9423e2e..5dd257bd761a81e243e7b66711ae1374d8c4970c 100644 (file)
@@ -20,7 +20,8 @@ from allmydata.util.assertutil import precondition
 from allmydata.util.rrefutil import add_version_to_remote_reference
 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
-     NoServersError, InsufficientVersionError, UploadUnhappinessError
+     NoServersError, InsufficientVersionError, UploadUnhappinessError, \
+     DEFAULT_MAX_SEGMENT_SIZE
 from allmydata.immutable import layout
 from pycryptopp.cipher.aes import AES
 
@@ -1234,7 +1235,8 @@ class AssistedUploader:
         return self._upload_status
 
 class BaseUploadable:
-    default_max_segment_size = 128*KiB # overridden by max_segment_size
+    # this is overridden by max_segment_size
+    default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
     default_encoding_param_k = 3 # overridden by encoding_parameters
     default_encoding_param_happy = 7
     default_encoding_param_n = 10