From 1576c35d387531930424a2c1a9b179b65c493de2 Mon Sep 17 00:00:00 2001
From: Kevan Carstensen <kevan@isnotajoke.com>
Date: Mon, 1 Aug 2011 18:39:31 -0700
Subject: [PATCH] mutable/publish: teach the publisher how to publish MDMF
 mutable files

Like the downloader, the publisher needs some substantial changes to handle multiple segment mutable files.
---
 src/allmydata/mutable/publish.py | 1288 +++++++++++++++++++++---------
 1 file changed, 904 insertions(+), 384 deletions(-)

diff --git a/src/allmydata/mutable/publish.py b/src/allmydata/mutable/publish.py
index 580682b6..066ee904 100644
--- a/src/allmydata/mutable/publish.py
+++ b/src/allmydata/mutable/publish.py
@@ -1,11 +1,13 @@
 
 
-import os, struct, time
+import os, time
+from StringIO import StringIO
 from itertools import count
 from zope.interface import implements
 from twisted.internet import defer
 from twisted.python import failure
-from allmydata.interfaces import IPublishStatus
+from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION, \
+                                 IMutableUploadable
 from allmydata.util import base32, hashutil, mathutil, idlib, log
 from allmydata.util.dictutil import DictOfSets
 from allmydata import hashtree, codec
@@ -16,8 +18,14 @@ from foolscap.api import eventually, fireEventually
 from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, \
      UncoordinatedWriteError, NotEnoughServersError
 from allmydata.mutable.servermap import ServerMap
-from allmydata.mutable.layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
-     unpack_checkstring, SIGNED_PREFIX
+from allmydata.mutable.layout import unpack_checkstring, MDMFSlotWriteProxy, \
+                                     SDMFSlotWriteProxy
+
+KiB = 1024
+DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
+PUSHING_BLOCKS_STATE = 0
+PUSHING_EVERYTHING_ELSE_STATE = 1
+DONE_STATE = 2
 
 class PublishStatus:
     implements(IPublishStatus)
@@ -101,12 +109,16 @@ class Publish:
         self._log_number = num
         self._running = True
         self._first_write_error = None
+        self._last_failure = None
 
         self._status = PublishStatus()
         self._status.set_storage_index(self._storage_index)
         self._status.set_helper(False)
         self._status.set_progress(0.0)
         self._status.set_active(True)
+        self._version = self._node.get_version()
+        assert self._version in (SDMF_VERSION, MDMF_VERSION)
+
 
     def get_status(self):
         return self._status
@@ -118,6 +130,212 @@ class Publish:
             kwargs["facility"] = "tahoe.mutable.publish"
         return log.msg(*args, **kwargs)
 
+
+    def update(self, data, offset, blockhashes, version):
+        """
+        I replace the contents of this file with the contents of data,
+        starting at offset. I return a Deferred that fires with None
+        when the replacement has been completed, or with an error if
+        something went wrong during the process.
+
+        Note that this process will not upload new shares. If the file
+        being updated is in need of repair, callers will have to repair
+        it on their own.
+        """
+        # How this works:
+        # 1: Make peer assignments. We'll assign each share that we know
+        # about on the grid to that peer that currently holds that
+        # share, and will not place any new shares.
+        # 2: Setup encoding parameters. Most of these will stay the same
+        # -- datalength will change, as will some of the offsets.
+        # 3. Upload the new segments.
+        # 4. Be done.
+        assert IMutableUploadable.providedBy(data)
+
+        self.data = data
+
+        # XXX: Use the MutableFileVersion instead.
+        self.datalength = self._node.get_size()
+        if data.get_size() > self.datalength:
+            self.datalength = data.get_size()
+
+        self.log("starting update")
+        self.log("adding new data of length %d at offset %d" % \
+                    (data.get_size(), offset))
+        self.log("new data length is %d" % self.datalength)
+        self._status.set_size(self.datalength)
+        self._status.set_status("Started")
+        self._started = time.time()
+
+        self.done_deferred = defer.Deferred()
+
+        self._writekey = self._node.get_writekey()
+        assert self._writekey, "need write capability to publish"
+
+        # first, which servers will we publish to? We require that the
+        # servermap was updated in MODE_WRITE, so we can depend upon the
+        # peerlist computed by that process instead of computing our own.
+        assert self._servermap
+        assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
+        # we will push a version that is one larger than anything present
+        # in the grid, according to the servermap.
+        self._new_seqnum = self._servermap.highest_seqnum() + 1
+        self._status.set_servermap(self._servermap)
+
+        self.log(format="new seqnum will be %(seqnum)d",
+                 seqnum=self._new_seqnum, level=log.NOISY)
+
+        # We're updating an existing file, so all of the following
+        # should be available.
+        self.readkey = self._node.get_readkey()
+        self.required_shares = self._node.get_required_shares()
+        assert self.required_shares is not None
+        self.total_shares = self._node.get_total_shares()
+        assert self.total_shares is not None
+        self._status.set_encoding(self.required_shares, self.total_shares)
+
+        self._pubkey = self._node.get_pubkey()
+        assert self._pubkey
+        self._privkey = self._node.get_privkey()
+        assert self._privkey
+        self._encprivkey = self._node.get_encprivkey()
+
+        sb = self._storage_broker
+        full_peerlist = [(s.get_serverid(), s.get_rref())
+                         for s in sb.get_servers_for_psi(self._storage_index)]
+        self.full_peerlist = full_peerlist # for use later, immutable
+        self.bad_peers = set() # peerids who have errbacked/refused requests
+
+        # This will set self.segment_size, self.num_segments, and
+        # self.fec. TODO: Does it know how to do the offset? Probably
+        # not. So do that part next.
+        self.setup_encoding_parameters(offset=offset)
+
+        # if we experience any surprises (writes which were rejected because
+        # our test vector did not match, or shares which we didn't expect to
+        # see), we set this flag and report an UncoordinatedWriteError at the
+        # end of the publish process.
+        self.surprised = False
+
+        # we keep track of three tables. The first is our goal: which share
+        # we want to see on which servers. This is initially populated by the
+        # existing servermap.
+        self.goal = set() # pairs of (peerid, shnum) tuples
+
+        # the second table is our list of outstanding queries: those which
+        # are in flight and may or may not be delivered, accepted, or
+        # acknowledged. Items are added to this table when the request is
+        # sent, and removed when the response returns (or errbacks).
+        self.outstanding = set() # (peerid, shnum) tuples
+
+        # the third is a table of successes: share which have actually been
+        # placed. These are populated when responses come back with success.
+        # When self.placed == self.goal, we're done.
+        self.placed = set() # (peerid, shnum) tuples
+
+        # we also keep a mapping from peerid to RemoteReference. Each time we
+        # pull a connection out of the full peerlist, we add it to this for
+        # use later.
+        self.connections = {}
+
+        self.bad_share_checkstrings = {}
+
+        # This is set at the last step of the publishing process.
+        self.versioninfo = ""
+
+        # we use the servermap to populate the initial goal: this way we will
+        # try to update each existing share in place. Since we're
+        # updating, we ignore damaged and missing shares -- callers must
+        # do a repair to repair and recreate these.
+        for (peerid, shnum) in self._servermap.servermap:
+            self.goal.add( (peerid, shnum) )
+            self.connections[peerid] = self._servermap.connections[peerid]
+        self.writers = {}
+
+        # SDMF files are updated differently.
+        self._version = MDMF_VERSION
+        writer_class = MDMFSlotWriteProxy
+
+        # For each (peerid, shnum) in self.goal, we make a
+        # write proxy for that peer. We'll use this to write
+        # shares to the peer.
+        for key in self.goal:
+            peerid, shnum = key
+            write_enabler = self._node.get_write_enabler(peerid)
+            renew_secret = self._node.get_renewal_secret(peerid)
+            cancel_secret = self._node.get_cancel_secret(peerid)
+            secrets = (write_enabler, renew_secret, cancel_secret)
+
+            self.writers[shnum] =  writer_class(shnum,
+                                                self.connections[peerid],
+                                                self._storage_index,
+                                                secrets,
+                                                self._new_seqnum,
+                                                self.required_shares,
+                                                self.total_shares,
+                                                self.segment_size,
+                                                self.datalength)
+            self.writers[shnum].peerid = peerid
+            assert (peerid, shnum) in self._servermap.servermap
+            old_versionid, old_timestamp = self._servermap.servermap[key]
+            (old_seqnum, old_root_hash, old_salt, old_segsize,
+             old_datalength, old_k, old_N, old_prefix,
+             old_offsets_tuple) = old_versionid
+            self.writers[shnum].set_checkstring(old_seqnum,
+                                                old_root_hash,
+                                                old_salt)
+
+        # Our remote shares will not have a complete checkstring until
+        # after we are done writing share data and have started to write
+        # blocks. In the meantime, we need to know what to look for when
+        # writing, so that we can detect UncoordinatedWriteErrors.
+        self._checkstring = self.writers.values()[0].get_checkstring()
+
+        # Now, we start pushing shares.
+        self._status.timings["setup"] = time.time() - self._started
+        # First, we encrypt, encode, and publish the shares that we need
+        # to encrypt, encode, and publish.
+
+        # Our update process fetched these for us. We need to update
+        # them in place as publishing happens.
+        self.blockhashes = {} # (shnum, [blochashes])
+        for (i, bht) in blockhashes.iteritems():
+            # We need to extract the leaves from our old hash tree.
+            old_segcount = mathutil.div_ceil(version[4],
+                                             version[3])
+            h = hashtree.IncompleteHashTree(old_segcount)
+            bht = dict(enumerate(bht))
+            h.set_hashes(bht)
+            leaves = h[h.get_leaf_index(0):]
+            for j in xrange(self.num_segments - len(leaves)):
+                leaves.append(None)
+
+            assert len(leaves) >= self.num_segments
+            self.blockhashes[i] = leaves
+            # This list will now be the leaves that were set during the
+            # initial upload + enough empty hashes to make it a
+            # power-of-two. If we exceed a power of two boundary, we
+            # should be encoding the file over again, and should not be
+            # here. So, we have
+            #assert len(self.blockhashes[i]) == \
+            #    hashtree.roundup_pow2(self.num_segments), \
+            #        len(self.blockhashes[i])
+            # XXX: Except this doesn't work. Figure out why.
+
+        # These are filled in later, after we've modified the block hash
+        # tree suitably.
+        self.sharehash_leaves = None # eventually [sharehashes]
+        self.sharehashes = {} # shnum -> [sharehash leaves necessary to 
+                              # validate the share]
+
+        self.log("Starting push")
+
+        self._state = PUSHING_BLOCKS_STATE
+        self._push()
+
+        return self.done_deferred
+
+
     def publish(self, newdata):
         """Publish the filenode's current contents.  Returns a Deferred that
         fires (with None) when the publish has done as much work as it's ever
@@ -125,17 +343,19 @@ class Publish:
         simultaneous write.
         """
 
-        # 1: generate shares (SDMF: files are small, so we can do it in RAM)
-        # 2: perform peer selection, get candidate servers
-        #  2a: send queries to n+epsilon servers, to determine current shares
-        #  2b: based upon responses, create target map
-        # 3: send slot_testv_and_readv_and_writev messages
-        # 4: as responses return, update share-dispatch table
-        # 4a: may need to run recovery algorithm
-        # 5: when enough responses are back, we're done
-
-        self.log("starting publish, datalen is %s" % len(newdata))
-        self._status.set_size(len(newdata))
+        # 0. Setup encoding parameters, encoder, and other such things.
+        # 1. Encrypt, encode, and publish segments.
+        assert IMutableUploadable.providedBy(newdata)
+
+        self.data = newdata
+        self.datalength = newdata.get_size()
+        #if self.datalength >= DEFAULT_MAX_SEGMENT_SIZE:
+        #    self._version = MDMF_VERSION
+        #else:
+        #    self._version = SDMF_VERSION
+
+        self.log("starting publish, datalen is %s" % self.datalength)
+        self._status.set_size(self.datalength)
         self._status.set_status("Started")
         self._started = time.time()
 
@@ -184,9 +404,8 @@ class Publish:
         self.full_peerlist = full_peerlist # for use later, immutable
         self.bad_peers = set() # peerids who have errbacked/refused requests
 
-        self.newdata = newdata
-        self.salt = os.urandom(16)
-
+        # This will set self.segment_size, self.num_segments, and
+        # self.fec.
         self.setup_encoding_parameters()
 
         # if we experience any surprises (writes which were rejected because
@@ -195,10 +414,6 @@ class Publish:
         # end of the publish process.
         self.surprised = False
 
-        # as a failsafe, refuse to iterate through self.loop more than a
-        # thousand times.
-        self.looplimit = 1000
-
         # we keep track of three tables. The first is our goal: which share
         # we want to see on which servers. This is initially populated by the
         # existing servermap.
@@ -222,6 +437,9 @@ class Publish:
 
         self.bad_share_checkstrings = {}
 
+        # This is set at the last step of the publishing process.
+        self.versioninfo = ""
+
         # we use the servermap to populate the initial goal: this way we will
         # try to update each existing share in place.
         for (peerid, shnum) in self._servermap.servermap:
@@ -235,87 +453,439 @@ class Publish:
             self.bad_share_checkstrings[key] = old_checkstring
             self.connections[peerid] = self._servermap.connections[peerid]
 
-        # create the shares. We'll discard these as they are delivered. SDMF:
-        # we're allowed to hold everything in memory.
+        # TODO: Make this part do peer selection.
+        self.update_goal()
+        self.writers = {}
+        if self._version == MDMF_VERSION:
+            writer_class = MDMFSlotWriteProxy
+        else:
+            writer_class = SDMFSlotWriteProxy
+
+        # For each (peerid, shnum) in self.goal, we make a
+        # write proxy for that peer. We'll use this to write
+        # shares to the peer.
+        for key in self.goal:
+            peerid, shnum = key
+            write_enabler = self._node.get_write_enabler(peerid)
+            renew_secret = self._node.get_renewal_secret(peerid)
+            cancel_secret = self._node.get_cancel_secret(peerid)
+            secrets = (write_enabler, renew_secret, cancel_secret)
 
+            self.writers[shnum] =  writer_class(shnum,
+                                                self.connections[peerid],
+                                                self._storage_index,
+                                                secrets,
+                                                self._new_seqnum,
+                                                self.required_shares,
+                                                self.total_shares,
+                                                self.segment_size,
+                                                self.datalength)
+            self.writers[shnum].peerid = peerid
+            if (peerid, shnum) in self._servermap.servermap:
+                old_versionid, old_timestamp = self._servermap.servermap[key]
+                (old_seqnum, old_root_hash, old_salt, old_segsize,
+                 old_datalength, old_k, old_N, old_prefix,
+                 old_offsets_tuple) = old_versionid
+                self.writers[shnum].set_checkstring(old_seqnum,
+                                                    old_root_hash,
+                                                    old_salt)
+            elif (peerid, shnum) in self.bad_share_checkstrings:
+                old_checkstring = self.bad_share_checkstrings[(peerid, shnum)]
+                self.writers[shnum].set_checkstring(old_checkstring)
+
+        # Our remote shares will not have a complete checkstring until
+        # after we are done writing share data and have started to write
+        # blocks. In the meantime, we need to know what to look for when
+        # writing, so that we can detect UncoordinatedWriteErrors.
+        self._checkstring = self.writers.values()[0].get_checkstring()
+
+        # Now, we start pushing shares.
         self._status.timings["setup"] = time.time() - self._started
-        d = self._encrypt_and_encode()
-        d.addCallback(self._generate_shares)
-        def _start_pushing(res):
-            self._started_pushing = time.time()
-            return res
-        d.addCallback(_start_pushing)
-        d.addCallback(self.loop) # trigger delivery
-        d.addErrback(self._fatal_error)
+        # First, we encrypt, encode, and publish the shares that we need
+        # to encrypt, encode, and publish.
+
+        # This will eventually hold the block hash chain for each share
+        # that we publish. We define it this way so that empty publishes
+        # will still have something to write to the remote slot.
+        self.blockhashes = dict([(i, []) for i in xrange(self.total_shares)])
+        for i in xrange(self.total_shares):
+            blocks = self.blockhashes[i]
+            for j in xrange(self.num_segments):
+                blocks.append(None)
+        self.sharehash_leaves = None # eventually [sharehashes]
+        self.sharehashes = {} # shnum -> [sharehash leaves necessary to 
+                              # validate the share]
+
+        self.log("Starting push")
+
+        self._state = PUSHING_BLOCKS_STATE
+        self._push()
 
         return self.done_deferred
 
-    def setup_encoding_parameters(self):
-        segment_size = len(self.newdata)
+
+    def _update_status(self):
+        self._status.set_status("Sending Shares: %d placed out of %d, "
+                                "%d messages outstanding" %
+                                (len(self.placed),
+                                 len(self.goal),
+                                 len(self.outstanding)))
+        self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
+
+
+    def setup_encoding_parameters(self, offset=0):
+        if self._version == MDMF_VERSION:
+            segment_size = DEFAULT_MAX_SEGMENT_SIZE # 128 KiB by default
+        else:
+            segment_size = self.datalength # SDMF is only one segment
         # this must be a multiple of self.required_shares
         segment_size = mathutil.next_multiple(segment_size,
                                               self.required_shares)
         self.segment_size = segment_size
+
+        # Calculate the starting segment for the upload.
         if segment_size:
-            self.num_segments = mathutil.div_ceil(len(self.newdata),
+            # We use div_ceil instead of integer division here because
+            # it is semantically correct.
+            # If datalength isn't an even multiple of segment_size, but
+            # is larger than segment_size, datalength // segment_size
+            # will be the largest number such that num <= datalength and
+            # num % segment_size == 0. But that's not what we want,
+            # because it ignores the extra data. div_ceil will give us
+            # the right number of segments for the data that we're
+            # given.
+            self.num_segments = mathutil.div_ceil(self.datalength,
                                                   segment_size)
+
+            self.starting_segment = offset // segment_size
+
         else:
             self.num_segments = 0
-        assert self.num_segments in [0, 1,] # SDMF restrictions
+            self.starting_segment = 0
 
-    def _fatal_error(self, f):
-        self.log("error during loop", failure=f, level=log.UNUSUAL)
-        self._done(f)
 
-    def _update_status(self):
-        self._status.set_status("Sending Shares: %d placed out of %d, "
-                                "%d messages outstanding" %
-                                (len(self.placed),
-                                 len(self.goal),
-                                 len(self.outstanding)))
-        self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
+        self.log("building encoding parameters for file")
+        self.log("got segsize %d" % self.segment_size)
+        self.log("got %d segments" % self.num_segments)
 
-    def loop(self, ignored=None):
-        self.log("entering loop", level=log.NOISY)
-        if not self._running:
-            return
+        if self._version == SDMF_VERSION:
+            assert self.num_segments in (0, 1) # SDMF
+        # calculate the tail segment size.
+
+        if segment_size and self.datalength:
+            self.tail_segment_size = self.datalength % segment_size
+            self.log("got tail segment size %d" % self.tail_segment_size)
+        else:
+            self.tail_segment_size = 0
+
+        if self.tail_segment_size == 0 and segment_size:
+            # The tail segment is the same size as the other segments.
+            self.tail_segment_size = segment_size
 
-        self.looplimit -= 1
-        if self.looplimit <= 0:
-            raise LoopLimitExceededError("loop limit exceeded")
+        # Make FEC encoders
+        fec = codec.CRSEncoder()
+        fec.set_params(self.segment_size,
+                       self.required_shares, self.total_shares)
+        self.piece_size = fec.get_block_size()
+        self.fec = fec
 
-        if self.surprised:
-            # don't send out any new shares, just wait for the outstanding
-            # ones to be retired.
-            self.log("currently surprised, so don't send any new shares",
-                     level=log.NOISY)
+        if self.tail_segment_size == self.segment_size:
+            self.tail_fec = self.fec
         else:
-            self.update_goal()
-            # how far are we from our goal?
-            needed = self.goal - self.placed - self.outstanding
-            self._update_status()
-
-            if needed:
-                # we need to send out new shares
-                self.log(format="need to send %(needed)d new shares",
-                         needed=len(needed), level=log.NOISY)
-                self._send_shares(needed)
-                return
-
-        if self.outstanding:
-            # queries are still pending, keep waiting
-            self.log(format="%(outstanding)d queries still outstanding",
-                     outstanding=len(self.outstanding),
-                     level=log.NOISY)
-            return
+            tail_fec = codec.CRSEncoder()
+            tail_fec.set_params(self.tail_segment_size,
+                                self.required_shares,
+                                self.total_shares)
+            self.tail_fec = tail_fec
+
+        self._current_segment = self.starting_segment
+        self.end_segment = self.num_segments - 1
+        # Now figure out where the last segment should be.
+        if self.data.get_size() != self.datalength:
+            # We're updating a few segments in the middle of a mutable
+            # file, so we don't want to republish the whole thing.
+            # (we don't have enough data to do that even if we wanted
+            # to)
+            end = self.data.get_size()
+            self.end_segment = end // segment_size
+            if end % segment_size == 0:
+                self.end_segment -= 1
+
+        self.log("got start segment %d" % self.starting_segment)
+        self.log("got end segment %d" % self.end_segment)
+
+
+    def _push(self, ignored=None):
+        """
+        I manage state transitions. In particular, I see that we still
+        have a good enough number of writers to complete the upload
+        successfully.
+        """
+        # Can we still successfully publish this file?
+        # TODO: Keep track of outstanding queries before aborting the
+        #       process.
+        if len(self.writers) < self.required_shares or self.surprised:
+            return self._failure()
+
+        # Figure out what we need to do next. Each of these needs to
+        # return a deferred so that we don't block execution when this
+        # is first called in the upload method.
+        if self._state == PUSHING_BLOCKS_STATE:
+            return self.push_segment(self._current_segment)
+
+        elif self._state == PUSHING_EVERYTHING_ELSE_STATE:
+            return self.push_everything_else()
+
+        # If we make it to this point, we were successful in placing the
+        # file.
+        return self._done()
+
+
+    def push_segment(self, segnum):
+        if self.num_segments == 0 and self._version == SDMF_VERSION:
+            self._add_dummy_salts()
+
+        if segnum > self.end_segment:
+            # We don't have any more segments to push.
+            self._state = PUSHING_EVERYTHING_ELSE_STATE
+            return self._push()
+
+        d = self._encode_segment(segnum)
+        d.addCallback(self._push_segment, segnum)
+        def _increment_segnum(ign):
+            self._current_segment += 1
+        # XXX: I don't think we need to do addBoth here -- any errBacks
+        # should be handled within push_segment.
+        d.addCallback(_increment_segnum)
+        d.addCallback(self._turn_barrier)
+        d.addCallback(self._push)
+        d.addErrback(self._failure)
+
+
+    def _turn_barrier(self, result):
+        """
+        I help the publish process avoid the recursion limit issues
+        described in #237.
+        """
+        return fireEventually(result)
+
+
+    def _add_dummy_salts(self):
+        """
+        SDMF files need a salt even if they're empty, or the signature
+        won't make sense. This method adds a dummy salt to each of our
+        SDMF writers so that they can write the signature later.
+        """
+        salt = os.urandom(16)
+        assert self._version == SDMF_VERSION
+
+        for writer in self.writers.itervalues():
+            writer.put_salt(salt)
+
+
+    def _encode_segment(self, segnum):
+        """
+        I encrypt and encode the segment segnum.
+        """
+        started = time.time()
+
+        if segnum + 1 == self.num_segments:
+            segsize = self.tail_segment_size
+        else:
+            segsize = self.segment_size
+
+
+        self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments))
+        data = self.data.read(segsize)
+        # XXX: This is dumb. Why return a list?
+        data = "".join(data)
+
+        assert len(data) == segsize, len(data)
+
+        salt = os.urandom(16)
+
+        key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
+        self._status.set_status("Encrypting")
+        enc = AES(key)
+        crypttext = enc.process(data)
+        assert len(crypttext) == len(data)
 
-        # no queries outstanding, no placements needed: we're done
-        self.log("no queries outstanding, no placements needed: done",
-                 level=log.OPERATIONAL)
         now = time.time()
-        elapsed = now - self._started_pushing
-        self._status.timings["push"] = elapsed
-        return self._done(None)
+        self._status.timings["encrypt"] = now - started
+        started = now
+
+        # now apply FEC
+        if segnum + 1 == self.num_segments:
+            fec = self.tail_fec
+        else:
+            fec = self.fec
+
+        self._status.set_status("Encoding")
+        crypttext_pieces = [None] * self.required_shares
+        piece_size = fec.get_block_size()
+        for i in range(len(crypttext_pieces)):
+            offset = i * piece_size
+            piece = crypttext[offset:offset+piece_size]
+            piece = piece + "\x00"*(piece_size - len(piece)) # padding
+            crypttext_pieces[i] = piece
+            assert len(piece) == piece_size
+        d = fec.encode(crypttext_pieces)
+        def _done_encoding(res):
+            elapsed = time.time() - started
+            self._status.timings["encode"] = elapsed
+            return (res, salt)
+        d.addCallback(_done_encoding)
+        return d
+
+
+    def _push_segment(self, encoded_and_salt, segnum):
+        """
+        I push (data, salt) as segment number segnum.
+        """
+        results, salt = encoded_and_salt
+        shares, shareids = results
+        self._status.set_status("Pushing segment")
+        for i in xrange(len(shares)):
+            sharedata = shares[i]
+            shareid = shareids[i]
+            if self._version == MDMF_VERSION:
+                hashed = salt + sharedata
+            else:
+                hashed = sharedata
+            block_hash = hashutil.block_hash(hashed)
+            self.blockhashes[shareid][segnum] = block_hash
+            # find the writer for this share
+            writer = self.writers[shareid]
+            writer.put_block(sharedata, segnum, salt)
+
+
+    def push_everything_else(self):
+        """
+        I put everything else associated with a share.
+        """
+        self._pack_started = time.time()
+        self.push_encprivkey()
+        self.push_blockhashes()
+        self.push_sharehashes()
+        self.push_toplevel_hashes_and_signature()
+        d = self.finish_publishing()
+        def _change_state(ignored):
+            self._state = DONE_STATE
+        d.addCallback(_change_state)
+        d.addCallback(self._push)
+        return d
+
+
+    def push_encprivkey(self):
+        encprivkey = self._encprivkey
+        self._status.set_status("Pushing encrypted private key")
+        for writer in self.writers.itervalues():
+            writer.put_encprivkey(encprivkey)
+
+
+    def push_blockhashes(self):
+        self.sharehash_leaves = [None] * len(self.blockhashes)
+        self._status.set_status("Building and pushing block hash tree")
+        for shnum, blockhashes in self.blockhashes.iteritems():
+            t = hashtree.HashTree(blockhashes)
+            self.blockhashes[shnum] = list(t)
+            # set the leaf for future use.
+            self.sharehash_leaves[shnum] = t[0]
+
+            writer = self.writers[shnum]
+            writer.put_blockhashes(self.blockhashes[shnum])
+
+
+    def push_sharehashes(self):
+        self._status.set_status("Building and pushing share hash chain")
+        share_hash_tree = hashtree.HashTree(self.sharehash_leaves)
+        for shnum in xrange(len(self.sharehash_leaves)):
+            needed_indices = share_hash_tree.needed_hashes(shnum)
+            self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
+                                             for i in needed_indices] )
+            writer = self.writers[shnum]
+            writer.put_sharehashes(self.sharehashes[shnum])
+        self.root_hash = share_hash_tree[0]
+
+
+    def push_toplevel_hashes_and_signature(self):
+        # We need to to three things here:
+        #   - Push the root hash and salt hash
+        #   - Get the checkstring of the resulting layout; sign that.
+        #   - Push the signature
+        self._status.set_status("Pushing root hashes and signature")
+        for shnum in xrange(self.total_shares):
+            writer = self.writers[shnum]
+            writer.put_root_hash(self.root_hash)
+        self._update_checkstring()
+        self._make_and_place_signature()
+
+
+    def _update_checkstring(self):
+        """
+        After putting the root hash, MDMF files will have the
+        checkstring written to the storage server. This means that we
+        can update our copy of the checkstring so we can detect
+        uncoordinated writes. SDMF files will have the same checkstring,
+        so we need not do anything.
+        """
+        self._checkstring = self.writers.values()[0].get_checkstring()
+
+
+    def _make_and_place_signature(self):
+        """
+        I create and place the signature.
+        """
+        started = time.time()
+        self._status.set_status("Signing prefix")
+        signable = self.writers[0].get_signable()
+        self.signature = self._privkey.sign(signable)
+
+        for (shnum, writer) in self.writers.iteritems():
+            writer.put_signature(self.signature)
+        self._status.timings['sign'] = time.time() - started
+
+
+    def finish_publishing(self):
+        # We're almost done -- we just need to put the verification key
+        # and the offsets
+        started = time.time()
+        self._status.set_status("Pushing shares")
+        self._started_pushing = started
+        ds = []
+        verification_key = self._pubkey.serialize()
+
+
+        # TODO: Bad, since we remove from this same dict. We need to
+        # make a copy, or just use a non-iterated value.
+        for (shnum, writer) in self.writers.iteritems():
+            writer.put_verification_key(verification_key)
+            d = writer.finish_publishing()
+            # Add the (peerid, shnum) tuple to our list of outstanding
+            # queries. This gets used by _loop if some of our queries
+            # fail to place shares.
+            self.outstanding.add((writer.peerid, writer.shnum))
+            d.addCallback(self._got_write_answer, writer, started)
+            d.addErrback(self._connection_problem, writer)
+            ds.append(d)
+        self._record_verinfo()
+        self._status.timings['pack'] = time.time() - started
+        return defer.DeferredList(ds)
+
+
+    def _record_verinfo(self):
+        self.versioninfo = self.writers.values()[0].get_verinfo()
+
+
+    def _connection_problem(self, f, writer):
+        """
+        We ran into a connection problem while working with writer, and
+        need to deal with that.
+        """
+        self.log("found problem: %s" % str(f))
+        self._last_failure = f
+        del(self.writers[writer.shnum])
+
 
     def log_goal(self, goal, message=""):
         logmsg = [message]
@@ -398,286 +968,52 @@ class Publish:
             self.log_goal(self.goal, "after update: ")
 
 
+    def _got_write_answer(self, answer, writer, started):
+        if not answer:
+            # SDMF writers only pretend to write when readers set their
+            # blocks, salts, and so on -- they actually just write once,
+            # at the end of the upload process. In fake writes, they
+            # return defer.succeed(None). If we see that, we shouldn't
+            # bother checking it.
+            return
 
-    def _encrypt_and_encode(self):
-        # this returns a Deferred that fires with a list of (sharedata,
-        # sharenum) tuples. TODO: cache the ciphertext, only produce the
-        # shares that we care about.
-        self.log("_encrypt_and_encode")
-
-        self._status.set_status("Encrypting")
-        started = time.time()
-
-        key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
-        enc = AES(key)
-        crypttext = enc.process(self.newdata)
-        assert len(crypttext) == len(self.newdata)
+        peerid = writer.peerid
+        lp = self.log("_got_write_answer from %s, share %d" %
+                      (idlib.shortnodeid_b2a(peerid), writer.shnum))
 
         now = time.time()
-        self._status.timings["encrypt"] = now - started
-        started = now
-
-        # now apply FEC
-
-        self._status.set_status("Encoding")
-        fec = codec.CRSEncoder()
-        fec.set_params(self.segment_size,
-                       self.required_shares, self.total_shares)
-        piece_size = fec.get_block_size()
-        crypttext_pieces = [None] * self.required_shares
-        for i in range(len(crypttext_pieces)):
-            offset = i * piece_size
-            piece = crypttext[offset:offset+piece_size]
-            piece = piece + "\x00"*(piece_size - len(piece)) # padding
-            crypttext_pieces[i] = piece
-            assert len(piece) == piece_size
-
-        d = fec.encode(crypttext_pieces)
-        def _done_encoding(res):
-            elapsed = time.time() - started
-            self._status.timings["encode"] = elapsed
-            return res
-        d.addCallback(_done_encoding)
-        return d
-
-    def _generate_shares(self, shares_and_shareids):
-        # this sets self.shares and self.root_hash
-        self.log("_generate_shares")
-        self._status.set_status("Generating Shares")
-        started = time.time()
-
-        # we should know these by now
-        privkey = self._privkey
-        encprivkey = self._encprivkey
-        pubkey = self._pubkey
-
-        (shares, share_ids) = shares_and_shareids
-
-        assert len(shares) == len(share_ids)
-        assert len(shares) == self.total_shares
-        all_shares = {}
-        block_hash_trees = {}
-        share_hash_leaves = [None] * len(shares)
-        for i in range(len(shares)):
-            share_data = shares[i]
-            shnum = share_ids[i]
-            all_shares[shnum] = share_data
-
-            # build the block hash tree. SDMF has only one leaf.
-            leaves = [hashutil.block_hash(share_data)]
-            t = hashtree.HashTree(leaves)
-            block_hash_trees[shnum] = list(t)
-            share_hash_leaves[shnum] = t[0]
-        for leaf in share_hash_leaves:
-            assert leaf is not None
-        share_hash_tree = hashtree.HashTree(share_hash_leaves)
-        share_hash_chain = {}
-        for shnum in range(self.total_shares):
-            needed_hashes = share_hash_tree.needed_hashes(shnum)
-            share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
-                                              for i in needed_hashes ] )
-        root_hash = share_hash_tree[0]
-        assert len(root_hash) == 32
-        self.log("my new root_hash is %s" % base32.b2a(root_hash))
-        self._new_version_info = (self._new_seqnum, root_hash, self.salt)
-
-        prefix = pack_prefix(self._new_seqnum, root_hash, self.salt,
-                             self.required_shares, self.total_shares,
-                             self.segment_size, len(self.newdata))
-
-        # now pack the beginning of the share. All shares are the same up
-        # to the signature, then they have divergent share hash chains,
-        # then completely different block hash trees + salt + share data,
-        # then they all share the same encprivkey at the end. The sizes
-        # of everything are the same for all shares.
-
-        sign_started = time.time()
-        signature = privkey.sign(prefix)
-        self._status.timings["sign"] = time.time() - sign_started
-
-        verification_key = pubkey.serialize()
-
-        final_shares = {}
-        for shnum in range(self.total_shares):
-            final_share = pack_share(prefix,
-                                     verification_key,
-                                     signature,
-                                     share_hash_chain[shnum],
-                                     block_hash_trees[shnum],
-                                     all_shares[shnum],
-                                     encprivkey)
-            final_shares[shnum] = final_share
-        elapsed = time.time() - started
-        self._status.timings["pack"] = elapsed
-        self.shares = final_shares
-        self.root_hash = root_hash
-
-        # we also need to build up the version identifier for what we're
-        # pushing. Extract the offsets from one of our shares.
-        assert final_shares
-        offsets = unpack_header(final_shares.values()[0])[-1]
-        offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
-        verinfo = (self._new_seqnum, root_hash, self.salt,
-                   self.segment_size, len(self.newdata),
-                   self.required_shares, self.total_shares,
-                   prefix, offsets_tuple)
-        self.versioninfo = verinfo
-
-
-
-    def _send_shares(self, needed):
-        self.log("_send_shares")
-
-        # we're finally ready to send out our shares. If we encounter any
-        # surprises here, it's because somebody else is writing at the same
-        # time. (Note: in the future, when we remove the _query_peers() step
-        # and instead speculate about [or remember] which shares are where,
-        # surprises here are *not* indications of UncoordinatedWriteError,
-        # and we'll need to respond to them more gracefully.)
-
-        # needed is a set of (peerid, shnum) tuples. The first thing we do is
-        # organize it by peerid.
-
-        peermap = DictOfSets()
-        for (peerid, shnum) in needed:
-            peermap.add(peerid, shnum)
-
-        # the next thing is to build up a bunch of test vectors. The
-        # semantics of Publish are that we perform the operation if the world
-        # hasn't changed since the ServerMap was constructed (more or less).
-        # For every share we're trying to place, we create a test vector that
-        # tests to see if the server*share still corresponds to the
-        # map.
-
-        all_tw_vectors = {} # maps peerid to tw_vectors
-        sm = self._servermap.servermap
-
-        for key in needed:
-            (peerid, shnum) = key
-
-            if key in sm:
-                # an old version of that share already exists on the
-                # server, according to our servermap. We will create a
-                # request that attempts to replace it.
-                old_versionid, old_timestamp = sm[key]
-                (old_seqnum, old_root_hash, old_salt, old_segsize,
-                 old_datalength, old_k, old_N, old_prefix,
-                 old_offsets_tuple) = old_versionid
-                old_checkstring = pack_checkstring(old_seqnum,
-                                                   old_root_hash,
-                                                   old_salt)
-                testv = (0, len(old_checkstring), "eq", old_checkstring)
-
-            elif key in self.bad_share_checkstrings:
-                old_checkstring = self.bad_share_checkstrings[key]
-                testv = (0, len(old_checkstring), "eq", old_checkstring)
-
-            else:
-                # add a testv that requires the share not exist
-
-                # Unfortunately, foolscap-0.2.5 has a bug in the way inbound
-                # constraints are handled. If the same object is referenced
-                # multiple times inside the arguments, foolscap emits a
-                # 'reference' token instead of a distinct copy of the
-                # argument. The bug is that these 'reference' tokens are not
-                # accepted by the inbound constraint code. To work around
-                # this, we need to prevent python from interning the
-                # (constant) tuple, by creating a new copy of this vector
-                # each time.
-
-                # This bug is fixed in foolscap-0.2.6, and even though this
-                # version of Tahoe requires foolscap-0.3.1 or newer, we are
-                # supposed to be able to interoperate with older versions of
-                # Tahoe which are allowed to use older versions of foolscap,
-                # including foolscap-0.2.5 . In addition, I've seen other
-                # foolscap problems triggered by 'reference' tokens (see #541
-                # for details). So we must keep this workaround in place.
-
-                #testv = (0, 1, 'eq', "")
-                testv = tuple([0, 1, 'eq', ""])
-
-            testvs = [testv]
-            # the write vector is simply the share
-            writev = [(0, self.shares[shnum])]
-
-            if peerid not in all_tw_vectors:
-                all_tw_vectors[peerid] = {}
-                # maps shnum to (testvs, writevs, new_length)
-            assert shnum not in all_tw_vectors[peerid]
-
-            all_tw_vectors[peerid][shnum] = (testvs, writev, None)
-
-        # we read the checkstring back from each share, however we only use
-        # it to detect whether there was a new share that we didn't know
-        # about. The success or failure of the write will tell us whether
-        # there was a collision or not. If there is a collision, the first
-        # thing we'll do is update the servermap, which will find out what
-        # happened. We could conceivably reduce a roundtrip by using the
-        # readv checkstring to populate the servermap, but really we'd have
-        # to read enough data to validate the signatures too, so it wouldn't
-        # be an overall win.
-        read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
-
-        # ok, send the messages!
-        self.log("sending %d shares" % len(all_tw_vectors), level=log.NOISY)
-        started = time.time()
-        for (peerid, tw_vectors) in all_tw_vectors.items():
-
-            write_enabler = self._node.get_write_enabler(peerid)
-            renew_secret = self._node.get_renewal_secret(peerid)
-            cancel_secret = self._node.get_cancel_secret(peerid)
-            secrets = (write_enabler, renew_secret, cancel_secret)
-            shnums = tw_vectors.keys()
-
-            for shnum in shnums:
-                self.outstanding.add( (peerid, shnum) )
+        elapsed = now - started
 
-            d = self._do_testreadwrite(peerid, secrets,
-                                       tw_vectors, read_vector)
-            d.addCallbacks(self._got_write_answer, self._got_write_error,
-                           callbackArgs=(peerid, shnums, started),
-                           errbackArgs=(peerid, shnums, started))
-            # tolerate immediate errback, like with DeadReferenceError
-            d.addBoth(fireEventually)
-            d.addCallback(self.loop)
-            d.addErrback(self._fatal_error)
+        self._status.add_per_server_time(peerid, elapsed)
 
-        self._update_status()
-        self.log("%d shares sent" % len(all_tw_vectors), level=log.NOISY)
-
-    def _do_testreadwrite(self, peerid, secrets,
-                          tw_vectors, read_vector):
-        storage_index = self._storage_index
-        ss = self.connections[peerid]
-
-        #print "SS[%s] is %s" % (idlib.shortnodeid_b2a(peerid), ss), ss.tracker.interfaceName
-        d = ss.callRemote("slot_testv_and_readv_and_writev",
-                          storage_index,
-                          secrets,
-                          tw_vectors,
-                          read_vector)
-        return d
+        wrote, read_data = answer
 
-    def _got_write_answer(self, answer, peerid, shnums, started):
-        lp = self.log("_got_write_answer from %s" %
-                      idlib.shortnodeid_b2a(peerid))
-        for shnum in shnums:
-            self.outstanding.discard( (peerid, shnum) )
+        surprise_shares = set(read_data.keys()) - set([writer.shnum])
 
-        now = time.time()
-        elapsed = now - started
-        self._status.add_per_server_time(peerid, elapsed)
+        # We need to remove from surprise_shares any shares that we are
+        # knowingly also writing to that peer from other writers.
 
-        wrote, read_data = answer
+        # TODO: Precompute this.
+        known_shnums = [x.shnum for x in self.writers.values()
+                        if x.peerid == peerid]
+        surprise_shares -= set(known_shnums)
+        self.log("found the following surprise shares: %s" %
+                 str(surprise_shares))
 
-        surprise_shares = set(read_data.keys()) - set(shnums)
+        # Now surprise shares contains all of the shares that we did not
+        # expect to be there.
 
         surprised = False
         for shnum in surprise_shares:
             # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
             checkstring = read_data[shnum][0]
-            their_version_info = unpack_checkstring(checkstring)
-            if their_version_info == self._new_version_info:
+            # What we want to do here is to see if their (seqnum,
+            # roothash, salt) is the same as our (seqnum, roothash,
+            # salt), or the equivalent for MDMF. The best way to do this
+            # is to store a packed representation of our checkstring
+            # somewhere, then not bother unpacking the other
+            # checkstring.
+            if checkstring == self._checkstring:
                 # they have the right share, somehow
 
                 if (peerid,shnum) in self.goal:
@@ -756,7 +1092,7 @@ class Publish:
             self.log("our testv failed, so the write did not happen",
                      parent=lp, level=log.WEIRD, umid="8sc26g")
             self.surprised = True
-            self.bad_peers.add(peerid) # don't ask them again
+            self.bad_peers.add(writer) # don't ask them again
             # use the checkstring to add information to the log message
             for (shnum,readv) in read_data.items():
                 checkstring = readv[0]
@@ -778,53 +1114,237 @@ class Publish:
                 # if expected_version==None, then we didn't expect to see a
                 # share on that peer, and the 'surprise_shares' clause above
                 # will have logged it.
-            # self.loop() will take care of finding new homes
             return
 
-        for shnum in shnums:
-            self.placed.add( (peerid, shnum) )
-            # and update the servermap
-            self._servermap.add_new_share(peerid, shnum,
+        # and update the servermap
+        # self.versioninfo is set during the last phase of publishing.
+        # If we get there, we know that responses correspond to placed
+        # shares, and can safely execute these statements.
+        if self.versioninfo:
+            self.log("wrote successfully: adding new share to servermap")
+            self._servermap.add_new_share(peerid, writer.shnum,
                                           self.versioninfo, started)
-
-        # self.loop() will take care of checking to see if we're done
-        return
-
-    def _got_write_error(self, f, peerid, shnums, started):
-        for shnum in shnums:
-            self.outstanding.discard( (peerid, shnum) )
-        self.bad_peers.add(peerid)
-        if self._first_write_error is None:
-            self._first_write_error = f
-        self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s",
-                 shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid),
-                 failure=f,
-                 level=log.UNUSUAL)
-        # self.loop() will take care of checking to see if we're done
+            self.placed.add( (peerid, writer.shnum) )
+        self._update_status()
+        # the next method in the deferred chain will check to see if
+        # we're done and successful.
         return
 
 
-    def _done(self, res):
+    def _done(self):
         if not self._running:
             return
         self._running = False
         now = time.time()
         self._status.timings["total"] = now - self._started
+
+        elapsed = now - self._started_pushing
+        self._status.timings['push'] = elapsed
+
         self._status.set_active(False)
-        if isinstance(res, failure.Failure):
-            self.log("Publish done, with failure", failure=res,
-                     level=log.WEIRD, umid="nRsR9Q")
-            self._status.set_status("Failed")
-        elif self.surprised:
-            self.log("Publish done, UncoordinatedWriteError", level=log.UNUSUAL)
-            self._status.set_status("UncoordinatedWriteError")
-            # deliver a failure
-            res = failure.Failure(UncoordinatedWriteError())
-            # TODO: recovery
+        self.log("Publish done, success")
+        self._status.set_status("Finished")
+        self._status.set_progress(1.0)
+        # Get k and segsize, then give them to the caller.
+        hints = {}
+        hints['segsize'] = self.segment_size
+        hints['k'] = self.required_shares
+        self._node.set_downloader_hints(hints)
+        eventually(self.done_deferred.callback, None)
+
+    def _failure(self, f=None):
+        if f:
+            self._last_failure = f
+
+        if not self.surprised:
+            # We ran out of servers
+            msg = "Publish ran out of good servers"
+            if self._last_failure:
+                msg += ", last failure was: %s" % str(self._last_failure)
+            self.log(msg)
+            e = NotEnoughServersError(msg)
+
         else:
-            self.log("Publish done, success")
-            self._status.set_status("Finished")
-            self._status.set_progress(1.0)
-        eventually(self.done_deferred.callback, res)
+            # We ran into shares that we didn't recognize, which means
+            # that we need to return an UncoordinatedWriteError.
+            self.log("Publish failed with UncoordinatedWriteError")
+            e = UncoordinatedWriteError()
+        f = failure.Failure(e)
+        eventually(self.done_deferred.callback, f)
+
+
+class MutableFileHandle:
+    """
+    I am a mutable uploadable built around a filehandle-like object,
+    usually either a StringIO instance or a handle to an actual file.
+    """
+    implements(IMutableUploadable)
+
+    def __init__(self, filehandle):
+        # The filehandle is defined as a generally file-like object that
+        # has these two methods. We don't care beyond that.
+        assert hasattr(filehandle, "read")
+        assert hasattr(filehandle, "close")
+
+        self._filehandle = filehandle
+        # We must start reading at the beginning of the file, or we risk
+        # encountering errors when the data read does not match the size
+        # reported to the uploader.
+        self._filehandle.seek(0)
+
+        # We have not yet read anything, so our position is 0.
+        self._marker = 0
+
+
+    def get_size(self):
+        """
+        I return the amount of data in my filehandle.
+        """
+        if not hasattr(self, "_size"):
+            old_position = self._filehandle.tell()
+            # Seek to the end of the file by seeking 0 bytes from the
+            # file's end
+            self._filehandle.seek(0, 2) # 2 == os.SEEK_END in 2.5+
+            self._size = self._filehandle.tell()
+            # Restore the previous position, in case this was called
+            # after a read.
+            self._filehandle.seek(old_position)
+            assert self._filehandle.tell() == old_position
+
+        assert hasattr(self, "_size")
+        return self._size
+
+
+    def pos(self):
+        """
+        I return the position of my read marker -- i.e., how much data I
+        have already read and returned to callers.
+        """
+        return self._marker
 
 
+    def read(self, length):
+        """
+        I return some data (up to length bytes) from my filehandle.
+
+        In most cases, I return length bytes, but sometimes I won't --
+        for example, if I am asked to read beyond the end of a file, or
+        an error occurs.
+        """
+        results = self._filehandle.read(length)
+        self._marker += len(results)
+        return [results]
+
+
+    def close(self):
+        """
+        I close the underlying filehandle. Any further operations on the
+        filehandle fail at this point.
+        """
+        self._filehandle.close()
+
+
+class MutableData(MutableFileHandle):
+    """
+    I am a mutable uploadable built around a string, which I then cast
+    into a StringIO and treat as a filehandle.
+    """
+
+    def __init__(self, s):
+        # Take a string and return a file-like uploadable.
+        assert isinstance(s, str)
+
+        MutableFileHandle.__init__(self, StringIO(s))
+
+
+class TransformingUploadable:
+    """
+    I am an IMutableUploadable that wraps another IMutableUploadable,
+    and some segments that are already on the grid. When I am called to
+    read, I handle merging of boundary segments.
+    """
+    implements(IMutableUploadable)
+
+
+    def __init__(self, data, offset, segment_size, start, end):
+        assert IMutableUploadable.providedBy(data)
+
+        self._newdata = data
+        self._offset = offset
+        self._segment_size = segment_size
+        self._start = start
+        self._end = end
+
+        self._read_marker = 0
+
+        self._first_segment_offset = offset % segment_size
+
+        num = self.log("TransformingUploadable: starting", parent=None)
+        self._log_number = num
+        self.log("got fso: %d" % self._first_segment_offset)
+        self.log("got offset: %d" % self._offset)
+
+
+    def log(self, *args, **kwargs):
+        if 'parent' not in kwargs:
+            kwargs['parent'] = self._log_number
+        if "facility" not in kwargs:
+            kwargs["facility"] = "tahoe.mutable.transforminguploadable"
+        return log.msg(*args, **kwargs)
+
+
+    def get_size(self):
+        return self._offset + self._newdata.get_size()
+
+
+    def read(self, length):
+        # We can get data from 3 sources here. 
+        #   1. The first of the segments provided to us.
+        #   2. The data that we're replacing things with.
+        #   3. The last of the segments provided to us.
+
+        # are we in state 0?
+        self.log("reading %d bytes" % length)
+
+        old_start_data = ""
+        old_data_length = self._first_segment_offset - self._read_marker
+        if old_data_length > 0:
+            if old_data_length > length:
+                old_data_length = length
+            self.log("returning %d bytes of old start data" % old_data_length)
+
+            old_data_end = old_data_length + self._read_marker
+            old_start_data = self._start[self._read_marker:old_data_end]
+            length -= old_data_length
+        else:
+            # otherwise calculations later get screwed up.
+            old_data_length = 0
+
+        # Is there enough new data to satisfy this read? If not, we need
+        # to pad the end of the data with data from our last segment.
+        old_end_length = length - \
+            (self._newdata.get_size() - self._newdata.pos())
+        old_end_data = ""
+        if old_end_length > 0:
+            self.log("reading %d bytes of old end data" % old_end_length)
+
+            # TODO: We're not explicitly checking for tail segment size
+            # here. Is that a problem?
+            old_data_offset = (length - old_end_length + \
+                               old_data_length) % self._segment_size
+            self.log("reading at offset %d" % old_data_offset)
+            old_end = old_data_offset + old_end_length
+            old_end_data = self._end[old_data_offset:old_end]
+            length -= old_end_length
+            assert length == self._newdata.get_size() - self._newdata.pos()
+
+        self.log("reading %d bytes of new data" % length)
+        new_data = self._newdata.read(length)
+        new_data = "".join(new_data)
+
+        self._read_marker += len(old_start_data + new_data + old_end_data)
+
+        return old_start_data + new_data + old_end_data
+
+    def close(self):
+        pass
-- 
2.45.2