From: Brian Warner <warner@allmydata.com>
Date: Tue, 8 Apr 2008 02:01:28 +0000 (-0700)
Subject: mutable.py: checkpointing #303 work part 2, Publish is sketched out
X-Git-Tag: allmydata-tahoe-1.1.0~248
X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/uri/?a=commitdiff_plain;h=b53bbaa43cd6c734891a695d1ffc9989d2dcc754;p=tahoe-lafs%2Ftahoe-lafs.git

mutable.py: checkpointing #303 work part 2, Publish is sketched out
---

diff --git a/src/allmydata/mutable.py b/src/allmydata/mutable.py
index b5d002f9..5fa05816 100644
--- a/src/allmydata/mutable.py
+++ b/src/allmydata/mutable.py
@@ -273,7 +273,10 @@ class ServerMap:
         # datalength, k, N, signed_prefix, offsets) tuple
         self.servermap = DictOfSets()
         self.connections = {} # maps peerid to a RemoteReference
+        self.unreachable_peers = set() # peerids that didn't respond to queries
         self.problems = [] # mostly for debugging
+        self.last_update_mode = None
+        self.last_update_time = 0
 
     def make_versionmap(self):
         """Return a dict that maps versionid to sets of (shnum, peerid,
@@ -505,6 +508,13 @@ class ServermapUpdater:
             self.num_peers_to_query = N + self.EPSILON
             initial_peers_to_query, must_query = self._build_initial_querylist()
             self.required_num_empty_peers = self.EPSILON
+
+            # TODO: also populate self._filenode._privkey
+
+            # TODO: arrange to read 3KB from one peer who is likely to hold a
+            # share, so we can avoid the latency of that extra roundtrip. 3KB
+            # would get us the encprivkey from a dirnode with up to 7
+            # entries, allowing us to make an update in 2 RTT instead of 3.
         else:
             initial_peers_to_query, must_query = self._build_initial_querylist()
 
@@ -583,6 +593,51 @@ class ServermapUpdater:
         verifier = rsa.create_verifying_key_from_string(pubkey_s)
         return verifier
 
+    def _try_to_extract_privkey(self, data, peerid, shnum):
+        try:
+            r = unpack_share(data)
+        except NeedMoreDataError, e:
+            # this share won't help us. oh well.
+            offset = e.encprivkey_offset
+            length = e.encprivkey_length
+            self.log("shnum %d on peerid %s: share was too short (%dB) "
+                     "to get the encprivkey; [%d:%d] ought to hold it" %
+                     (shnum, idlib.shortnodeid_b2a(peerid), len(data),
+                      offset, offset+length))
+            # NOTE: if uncoordinated writes are taking place, someone might
+            # change the share (and most probably move the encprivkey) before
+            # we get a chance to do one of these reads and fetch it. This
+            # will cause us to see a NotEnoughPeersError(unable to fetch
+            # privkey) instead of an UncoordinatedWriteError . This is a
+            # nuisance, but it will go away when we move to DSA-based mutable
+            # files (since the privkey will be small enough to fit in the
+            # write cap).
+
+            self._encprivkey_shares.append( (peerid, shnum, offset, length))
+            return
+
+        (seqnum, root_hash, IV, k, N, segsize, datalen,
+         pubkey, signature, share_hash_chain, block_hash_tree,
+         share_data, enc_privkey) = r
+
+        return self._try_to_validate_privkey(enc_privkey, peerid, shnum)
+
+    def _try_to_validate_privkey(self, enc_privkey, peerid, shnum):
+        alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
+        alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
+        if alleged_writekey != self._writekey:
+            self.log("invalid privkey from %s shnum %d" %
+                     (idlib.nodeid_b2a(peerid)[:8], shnum), level=log.WEIRD)
+            return
+
+        # it's good
+        self.log("got valid privkey from shnum %d on peerid %s" %
+                 (shnum, idlib.shortnodeid_b2a(peerid)))
+        self._privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
+        self._encprivkey = enc_privkey
+        self._node._populate_encprivkey(self._encprivkey)
+        self._node._populate_privkey(self._privkey)
+
     def _got_results(self, datavs, peerid, readsize, stuff, started):
         self.log(format="got result from [%(peerid)s], %(numshares)d shares",
                  peerid=idlib.shortnodeid_b2a(peerid),
@@ -671,6 +726,7 @@ class ServermapUpdater:
         self._queries_outstanding.discard(peerid)
         self._bad_peers.add(peerid)
         self._servermap.problems.append(f)
+        self._servermap.unreachable_peers.add(peerid) # TODO: overkill?
         self._queries_completed += 1
         self._last_failure = f
 
@@ -839,6 +895,8 @@ class ServermapUpdater:
         if not self._running:
             return
         self._running = False
+        self._servermap.last_update_mode = self._mode
+        self._servermap.last_update_time = self._started
         # the servermap will not be touched after this
         eventually(self._done_deferred.callback, self._servermap)
 
@@ -1280,24 +1338,16 @@ class PublishStatus:
         self.active = value
 
 class Publish:
-    """I represent a single act of publishing the mutable file to the grid."""
+    """I represent a single act of publishing the mutable file to the grid. I
+    will only publish my data if the servermap I am using still represents
+    the current state of the world.
 
-    def __init__(self, filenode):
-        self._node = filenode
-        self._storage_index = self._node.get_storage_index()
-        self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
-        num = self._node._client.log("Publish(%s): starting" % prefix)
-        self._log_number = num
-        self._status = PublishStatus()
-        self._status.set_storage_index(self._storage_index)
-        self._status.set_helper(False)
-        self._status.set_progress(0.0)
-        self._status.set_active(True)
-        self._started = time.time()
+    To make the initial publish, set servermap to None.
+    """
 
-    def new__init__(self, filenode, servermap):
+    def __init__(self, filenode, servermap):
         self._node = filenode
-        self._servermap =servermap
+        self._servermap = servermap
         self._storage_index = self._node.get_storage_index()
         self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
         num = self._node._client.log("Publish(%s): starting" % prefix)
@@ -1330,93 +1380,43 @@ class Publish:
         # 5: when enough responses are back, we're done
 
         self.log("starting publish, datalen is %s" % len(newdata))
-        self._status.set_size(len(newdata))
 
         self._writekey = self._node.get_writekey()
         assert self._writekey, "need write capability to publish"
 
-        old_roothash = self._node._current_roothash
-        old_seqnum = self._node._current_seqnum
-        assert old_seqnum is not None, "must read before replace"
-        self._new_seqnum = old_seqnum + 1
-
-        # read-before-replace also guarantees these fields are available
-        readkey = self._node.get_readkey()
-        required_shares = self._node.get_required_shares()
-        total_shares = self._node.get_total_shares()
+        # first, which servers will we publish to? We require that the
+        # servermap was updated in MODE_WRITE, so we can depend upon the
+        # peerlist computed by that process instead of computing our own.
+        if self._servermap:
+            assert self._servermap.last_update_mode == MODE_WRITE
+            # we will push a version that is one larger than anything present
+            # in the grid, according to the servermap.
+            self._new_seqnum = self._servermap.highest_seqnum() + 1
+        else:
+            # If we don't have a servermap, that's because we're doing the
+            # initial publish
+            self._new_seqnum = 1
+            self._servermap = ServerMap()
+
+        # having an up-to-date servermap (or using a filenode that was just
+        # created for the first time) also guarantees that the following
+        # fields are available
+        self.readkey = self._node.get_readkey()
+        self.required_shares = self._node.get_required_shares()
+        assert self.required_shares is not None
+        self.total_shares = self._node.get_total_shares()
+        assert self.total_shares is not None
         self._pubkey = self._node.get_pubkey()
-        self._status.set_encoding(required_shares, total_shares)
-
-        # these two may not be, we might have to get them from the first peer
+        assert self._pubkey
         self._privkey = self._node.get_privkey()
+        assert self._privkey
         self._encprivkey = self._node.get_encprivkey()
 
-        IV = os.urandom(16)
-
-        # we read only 1KB because all we generally care about is the seqnum
-        # ("prefix") info, so we know which shares are where. We need to get
-        # the privkey from somebody, which means reading more like 3KB, but
-        # the code in _obtain_privkey will ensure that we manage that even if
-        # we need an extra roundtrip. TODO: arrange to read 3KB from one peer
-        # who is likely to hold a share, so we can avoid the latency of that
-        # extra roundtrip. 3KB would get us the encprivkey from a dirnode
-        # with up to 7 entries, allowing us to make an update in 2 RTT
-        # instead of 3.
-        self._read_size = 1000
-        self._status.initial_read_size = self._read_size
-
-        d = defer.succeed(total_shares)
-        d.addCallback(self._query_peers)
-        d.addCallback(self._query_peers_done)
-        d.addCallback(self._obtain_privkey)
-        d.addCallback(self._obtain_privkey_done)
-
-        d.addCallback(self._encrypt_and_encode, newdata, readkey, IV,
-                      required_shares, total_shares)
-        d.addCallback(self._generate_shares, self._new_seqnum, IV)
-
-        d.addCallback(self._send_shares, IV)
-        d.addCallback(self._maybe_recover)
-        d.addCallback(self._done)
-        return d
-
-    def _query_peers(self, total_shares):
-        self.log("_query_peers")
-        self._query_peers_started = now = time.time()
-        elapsed = now - self._started
-        self._status.timings["setup"] = elapsed
-
-        storage_index = self._storage_index
-
-        # In 0.7.0, we went through extra work to make sure that we include
-        # ourselves in the peerlist, mainly to match Retrieve (which did the
-        # same thing. With the post-0.7.0 Introducer refactoring, we got rid
-        # of the include-myself flags, and standardized on the
-        # uploading/downloading node not being special.
-
-        # One nice feature of the old approach was that by putting a share on
-        # the local storage server, we're more likely to be able to retrieve
-        # a copy of the encrypted private key (even if all the old servers
-        # have gone away), so we can regenerate new shares even if we can't
-        # retrieve the old contents. This need will eventually go away when
-        # we switch to DSA-based mutable files (which store the private key
-        # in the URI).
-
-        peerlist = self._node._client.get_permuted_peers("storage",
-                                                         storage_index)
-
-        current_share_peers = DictOfSets()
-        reachable_peers = {}
-        # list of (peerid, shnum, offset, length) where the encprivkey might
-        # be found
-        self._encprivkey_shares = []
-
-        EPSILON = total_shares / 2
-        #partial_peerlist = islice(peerlist, total_shares + EPSILON)
-        partial_peerlist = peerlist[:total_shares+EPSILON]
-        self._status.peers_queried = len(partial_peerlist)
-
-        self._storage_servers = {}
+        client = self._node._client
+        full_peerlist = client.get_permuted_peers("storage",
+                                                  self._storage_index)
+        self.full_peerlist = full_peerlist # for use later, immutable
+        self.bad_peers = set() # peerids who have errbacked/refused requests
 
         started = time.time()
         dl = []
@@ -1427,130 +1427,115 @@ class Publish:
                           peerid, permutedid,
                           reachable_peers, current_share_peers, started)
             dl.append(d)
-        d = defer.DeferredList(dl, fireOnOneErrback=True)
+        d = defer.DeferredList(dl)
         d.addCallback(self._got_all_query_results,
                       total_shares, reachable_peers,
                       current_share_peers)
         # TODO: add an errback too, probably to ignore that peer
 
-        # TODO: if we can't get a privkey from these servers, consider
-        # looking farther afield. Be aware of the old 0.7.0 behavior that
-        # causes us to create our initial directory before we've connected to
-        # anyone but ourselves.. those old directories may not be
-        # retrieveable if our own server is no longer in the early part of
-        # the permuted peerlist.
-        return d
+        # we limit the segment size as usual to constrain our memory
+        # footprint. The max segsize is higher for mutable files, because we
+        # want to support dirnodes with up to 10k children, and each child
+        # uses about 330 bytes. If you actually put that much into a
+        # directory you'll be using a footprint of around 14MB, which is
+        # higher than we'd like, but it is more important right now to
+        # support large directories than to make memory usage small when you
+        # use them. Once we implement MDMF (with multiple segments), we will
+        # drop this back down, probably to 128KiB.
+        self.MAX_SEGMENT_SIZE = 3500000
 
-    def _do_read(self, ss, peerid, storage_index, shnums, readv):
-        # isolate the callRemote to a separate method, so tests can subclass
-        # Publish and override it
-        d = ss.callRemote("slot_readv", storage_index, shnums, readv)
-        return d
+        segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata))
+        # this must be a multiple of self.required_shares
+        segment_size = mathutil.next_multiple(segment_size,
+                                              self.required_shares)
+        self.segment_size = segment_size
+        if segment_size:
+            self.num_segments = mathutil.div_ceil(len(self.newdata),
+                                                  segment_size)
+        else:
+            self.num_segments = 0
+        assert self.num_segments in [0, 1,] # SDMF restrictions
 
-    def _do_query(self, ss, peerid, storage_index):
-        self.log("querying %s" % idlib.shortnodeid_b2a(peerid))
-        d = self._do_read(ss, peerid, storage_index, [], [(0, self._read_size)])
-        return d
+        self.surprised = False
 
-    def _got_query_results(self, datavs, peerid, permutedid,
-                           reachable_peers, current_share_peers, started):
+        # we keep track of three tables. The first is our goal: which share
+        # we want to see on which servers. This is initially populated by the
+        # existing servermap.
+        self.goal = set() # pairs of (peerid, shnum) tuples
 
-        lp = self.log(format="_got_query_results from %(peerid)s",
-                      peerid=idlib.shortnodeid_b2a(peerid))
-        elapsed = time.time() - started
-        self._status.add_per_server_time(peerid, "read", elapsed)
+        # the second table is our list of outstanding queries: those which
+        # are in flight and may or may not be delivered, accepted, or
+        # acknowledged. Items are added to this table when the request is
+        # sent, and removed when the response returns (or errbacks).
+        self.outstanding = set() # (peerid, shnum) tuples
 
-        assert isinstance(datavs, dict)
-        reachable_peers[peerid] = permutedid
-        if not datavs:
-            self.log("peer has no shares", parent=lp)
-        for shnum, datav in datavs.items():
-            lp2 = self.log("peer has shnum %d" % shnum, parent=lp)
-            assert len(datav) == 1
-            data = datav[0]
-            # We want (seqnum, root_hash, IV) from all servers to know what
-            # versions we are replacing. We want the encprivkey from one
-            # server (assuming it's valid) so we know our own private key, so
-            # we can sign our update. SMDF: read the whole share from each
-            # server. TODO: later we can optimize this to transfer less data.
-
-            # we assume that we have enough data to extract the signature.
-            # TODO: if this raises NeedMoreDataError, arrange to do another
-            # read pass.
-            r = unpack_prefix_and_signature(data)
-            (seqnum, root_hash, IV, k, N, segsize, datalen,
-             pubkey_s, signature, prefix) = r
-
-            # self._pubkey is present because we require read-before-replace
-            valid = self._pubkey.verify(prefix, signature)
-            if not valid:
-                self.log(format="bad signature from %(peerid)s shnum %(shnum)d",
-                         peerid=idlib.shortnodeid_b2a(peerid), shnum=shnum,
-                         parent=lp2, level=log.WEIRD)
-                continue
-            self.log(format="peer has goodsig shnum %(shnum)d seqnum %(seqnum)d",
-                     shnum=shnum, seqnum=seqnum,
-                     parent=lp2, level=log.NOISY)
+        # the third is a table of successes: share which have actually been
+        # placed. These are populated when responses come back with success.
+        # When self.placed == self.goal, we're done.
+        self.placed = set() # (peerid, shnum) tuples
 
-            share = (shnum, seqnum, root_hash)
-            current_share_peers.add(shnum, (peerid, seqnum, root_hash) )
+        # we use the servermap to populate the initial goal: this way we will
+        # try to update each existing share in place.
+        for (peerid, shares) in self._servermap.servermap.items():
+            for (shnum, versionid, timestamp) in shares:
+                self.goal.add( (peerid, shnum) )
 
-            if not self._privkey:
-                self._try_to_extract_privkey(data, peerid, shnum)
+        # create the shares. We'll discard these as they are delivered. SMDF:
+        # we're allowed to hold everything in memory.
 
+        d = self._encrypt_and_encode()
+        d.addCallback(self._generate_shares)
+        d.addCallback(self.loop) # trigger delivery
 
-    def _try_to_extract_privkey(self, data, peerid, shnum):
-        try:
-            r = unpack_share(data)
-        except NeedMoreDataError, e:
-            # this share won't help us. oh well.
-            offset = e.encprivkey_offset
-            length = e.encprivkey_length
-            self.log("shnum %d on peerid %s: share was too short (%dB) "
-                     "to get the encprivkey; [%d:%d] ought to hold it" %
-                     (shnum, idlib.shortnodeid_b2a(peerid), len(data),
-                      offset, offset+length))
-            # NOTE: if uncoordinated writes are taking place, someone might
-            # change the share (and most probably move the encprivkey) before
-            # we get a chance to do one of these reads and fetch it. This
-            # will cause us to see a NotEnoughPeersError(unable to fetch
-            # privkey) instead of an UncoordinatedWriteError . This is a
-            # nuisance, but it will go away when we move to DSA-based mutable
-            # files (since the privkey will be small enough to fit in the
-            # write cap).
+        return self.done_deferred
 
-            self._encprivkey_shares.append( (peerid, shnum, offset, length))
+    def loop(self):
+        self.update_goal()
+        # how far are we from our goal?
+        needed = self.goal - self.placed - self.outstanding
+
+        if needed:
+            # we need to send out new shares
+            d = self.send_shares(needed)
+            d.addCallback(self.loop)
+            d.addErrback(self._fatal_error)
             return
 
-        (seqnum, root_hash, IV, k, N, segsize, datalen,
-         pubkey, signature, share_hash_chain, block_hash_tree,
-         share_data, enc_privkey) = r
+        if self.outstanding:
+            # queries are still pending, keep waiting
+            return
 
-        return self._try_to_validate_privkey(enc_privkey, peerid, shnum)
+        # no queries outstanding, no placements needed: we're done
+        return self._done()
 
-    def _try_to_validate_privkey(self, enc_privkey, peerid, shnum):
-        alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
-        alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
-        if alleged_writekey != self._writekey:
-            self.log("invalid privkey from %s shnum %d" %
-                     (idlib.nodeid_b2a(peerid)[:8], shnum), level=log.WEIRD)
-            return
+    def log_goal(self, goal):
+        logmsg = []
+        for (peerid, shnum) in goal:
+            logmsg.append("sh%d to [%s]" % (shnum,
+                                            idlib.shortnodeid_b2a(peerid)))
+        self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
+        self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
+                 level=log.NOISY)
 
-        # it's good
-        self.log("got valid privkey from shnum %d on peerid %s" %
-                 (shnum, idlib.shortnodeid_b2a(peerid)))
-        self._privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
-        self._encprivkey = enc_privkey
-        self._node._populate_encprivkey(self._encprivkey)
-        self._node._populate_privkey(self._privkey)
+    def update_goal(self):
+        # first, remove any bad peers from our goal
+        self.goal = set([ (peerid, shnum)
+                          for (peerid, shnum) in self.goal
+                          if peerid not in self.bad_peers ])
 
-    def _got_all_query_results(self, res,
-                               total_shares, reachable_peers,
-                               current_share_peers):
-        self.log("_got_all_query_results")
+        # find the homeless shares:
+        homefull_shares = set([shnum for (peerid, shnum) in self.goal])
+        homeless_shares = set(range(self.total_shares)) - homefull_shares
+        homeless_shares = sorted(list(homeless_shares))
+        # place them somewhere. We prefer unused servers at the beginning of
+        # the available peer list.
+
+        if not homeless_shares:
+            return
 
-        # now that we know everything about the shares currently out there,
-        # decide where to place the new shares.
+        # if log.recording_noisy
+        if False:
+            self.log_goal(self.goal)
 
         # if an old share X is on a node, put the new share X there too.
         # TODO: 1: redistribute shares to achieve one-per-peer, by copying
@@ -1559,160 +1544,58 @@ class Publish:
         # TODO: 2: move those shares instead of copying them, to reduce future
         #       update work
 
-        # if log.recording_noisy
-        logmsg = []
-        for shnum in range(total_shares):
-            logmsg2 = []
-            for oldplace in current_share_peers.get(shnum, []):
-                (peerid, seqnum, R) = oldplace
-                logmsg2.append("%s:#%d:R=%s" % (idlib.shortnodeid_b2a(peerid),
-                                                seqnum, base32.b2a(R)[:4]))
-            logmsg.append("sh%d on (%s)" % (shnum, "/".join(logmsg2)))
-        self.log("sharemap: %s" % (", ".join(logmsg)), level=log.NOISY)
-        self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
-                 level=log.NOISY)
-
-        shares_needing_homes = range(total_shares)
-        target_map = DictOfSets() # maps shnum to set((peerid,oldseqnum,oldR))
-        shares_per_peer = DictOfSets()
-        for shnum in range(total_shares):
-            for oldplace in current_share_peers.get(shnum, []):
-                (peerid, seqnum, R) = oldplace
-                if seqnum >= self._new_seqnum:
-                    self.log("the sequence number has changed unexpectedly",
-                             level=log.WEIRD)
-                    self.log(format="peerid=%(peerid)s, theirs=%(seqnum)d, mine=%(new_seqnum)d",
-                             peerid=idlib.shortnodeid_b2a(peerid),
-                             seqnum=seqnum,
-                             new_seqnum=self._new_seqnum)
-                    raise UncoordinatedWriteError("the sequence number has changed unexpectedly")
-                target_map.add(shnum, oldplace)
-                shares_per_peer.add(peerid, shnum)
-                if shnum in shares_needing_homes:
-                    shares_needing_homes.remove(shnum)
-
-        # now choose homes for the remaining shares. We prefer peers with the
-        # fewest target shares, then peers with the lowest permuted index. If
-        # there are no shares already in place, this will assign them
-        # one-per-peer in the normal permuted order.
-        while shares_needing_homes:
-            if not reachable_peers:
-                prefix = storage.si_b2a(self._node.get_storage_index())[:5]
-                raise NotEnoughPeersError("ran out of peers during upload of (%s); shares_needing_homes: %s, reachable_peers: %s" % (prefix, shares_needing_homes, reachable_peers,))
-            shnum = shares_needing_homes.pop(0)
-            possible_homes = reachable_peers.keys()
-            possible_homes.sort(lambda a,b:
-                                cmp( (len(shares_per_peer.get(a, [])),
-                                      reachable_peers[a]),
-                                     (len(shares_per_peer.get(b, [])),
-                                      reachable_peers[b]) ))
-            target_peerid = possible_homes[0]
-            target_map.add(shnum, (target_peerid, None, None) )
-            shares_per_peer.add(target_peerid, shnum)
-
-        assert not shares_needing_homes
-
-        target_info = (target_map, shares_per_peer)
-        return target_info
-
-    def _query_peers_done(self, target_info):
-        self._obtain_privkey_started = now = time.time()
-        elapsed = time.time() - self._query_peers_started
-        self._status.timings["query"] = elapsed
-        return target_info
-
-    def _obtain_privkey(self, target_info):
-        # make sure we've got a copy of our private key.
-        if self._privkey:
-            # Must have picked it up during _query_peers. We're good to go.
-            if "privkey_fetch" not in self._status.timings:
-                self._status.timings["privkey_fetch"] = 0.0
-            return target_info
-
-        # Nope, we haven't managed to grab a copy, and we still need it. Ask
-        # peers one at a time until we get a copy. Only bother asking peers
-        # who've admitted to holding a share.
-
-        self._privkey_fetch_started = time.time()
-        target_map, shares_per_peer = target_info
-        # pull shares from self._encprivkey_shares
-        if not self._encprivkey_shares:
-            raise NotEnoughPeersError("Unable to find a copy of the privkey")
-
-        (peerid, shnum, offset, length) = self._encprivkey_shares.pop(0)
-        ss = self._storage_servers[peerid]
-        self.log("trying to obtain privkey from %s shnum %d" %
-                 (idlib.shortnodeid_b2a(peerid), shnum))
-        d = self._do_privkey_query(ss, peerid, shnum, offset, length)
-        d.addErrback(self.log_err)
-        d.addCallback(lambda res: self._obtain_privkey(target_info))
-        return d
-
-    def _do_privkey_query(self, rref, peerid, shnum, offset, length):
-        started = time.time()
-        d = self._do_read(rref, peerid, self._storage_index,
-                          [shnum], [(offset, length)])
-        d.addCallback(self._privkey_query_response, peerid, shnum, started)
-        return d
-
-    def _privkey_query_response(self, datav, peerid, shnum, started):
-        elapsed = time.time() - started
-        self._status.add_per_server_time(peerid, "read", elapsed)
-
-        data = datav[shnum][0]
-        self._try_to_validate_privkey(data, peerid, shnum)
-
-        elapsed = time.time() - self._privkey_fetch_started
-        self._status.timings["privkey_fetch"] = elapsed
-        self._status.privkey_from = peerid
-
-    def _obtain_privkey_done(self, target_info):
-        elapsed = time.time() - self._obtain_privkey_started
-        self._status.timings["privkey"] = elapsed
-        return target_info
-
-    def _encrypt_and_encode(self, target_info,
-                            newdata, readkey, IV,
-                            required_shares, total_shares):
+        # this is CPU intensive but easy to analyze. We create a sort order
+        # for each peerid. If the peerid is marked as bad, we don't even put
+        # them in the list. Then we care about the number of shares which
+        # have already been assigned to them. After that we care about their
+        # permutation order.
+        old_assignments = DictOfSets()
+        for (peerid, shnum) in self.goal:
+            old_assignments.add(peerid, shnum)
+
+        peerlist = []
+        for i, (peerid, ss) in enumerate(self.full_peerlist):
+            entry = (len(old_assignments[peerid]), i, peerid, ss)
+            peerlist.add(entry)
+        peerlist.sort()
+
+        new_assignments = []
+        # we then index this peerlist with an integer, because we may have to
+        # wrap. We update the goal as we go.
+        i = 0
+        for shnum in homeless_shares:
+            (ignored1, ignored2, peerid, ss) = peerlist[i]
+            self.goal.add( (peerid, shnum) )
+            i += 1
+            if i >= len(peerlist):
+                i = 0
+
+
+
+    def _encrypt_and_encode(self):
+        # this returns a Deferred that fires with a list of (sharedata,
+        # sharenum) tuples. TODO: cache the ciphertext, only produce the
+        # shares that we care about.
         self.log("_encrypt_and_encode")
 
-        started = time.time()
+        #started = time.time()
 
-        key = hashutil.ssk_readkey_data_hash(IV, readkey)
+        key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
         enc = AES(key)
-        crypttext = enc.process(newdata)
-        assert len(crypttext) == len(newdata)
+        crypttext = enc.process(self.newdata)
+        assert len(crypttext) == len(self.newdata)
 
-        now = time.time()
-        self._status.timings["encrypt"] = now - started
-        started = now
+        #now = time.time()
+        #self._status.timings["encrypt"] = now - started
+        #started = now
 
         # now apply FEC
 
-        # we limit the segment size as usual to constrain our memory
-        # footprint. The max segsize is higher for mutable files, because we
-        # want to support dirnodes with up to 10k children, and each child
-        # uses about 330 bytes. If you actually put that much into a
-        # directory you'll be using a footprint of around 14MB, which is
-        # higher than we'd like, but it is more important right now to
-        # support large directories than to make memory usage small when you
-        # use them. Once we implement MDMF (with multiple segments), we will
-        # drop this back down, probably to 128KiB.
-        self.MAX_SEGMENT_SIZE = 3500000
-        data_length = len(crypttext)
-
-        segment_size = min(self.MAX_SEGMENT_SIZE, len(crypttext))
-        # this must be a multiple of self.required_shares
-        segment_size = mathutil.next_multiple(segment_size, required_shares)
-        if segment_size:
-            self.num_segments = mathutil.div_ceil(len(crypttext), segment_size)
-        else:
-            self.num_segments = 0
-        assert self.num_segments in [0, 1,] # SDMF restrictions
         fec = codec.CRSEncoder()
-        fec.set_params(segment_size, required_shares, total_shares)
+        fec.set_params(self.segment_size,
+                       self.required_shares, self.total_shares)
         piece_size = fec.get_block_size()
-        crypttext_pieces = [None] * required_shares
+        crypttext_pieces = [None] * self.required_shares
         for i in range(len(crypttext_pieces)):
             offset = i * piece_size
             piece = crypttext[offset:offset+piece_size]
@@ -1722,24 +1605,16 @@ class Publish:
 
         d = fec.encode(crypttext_pieces)
         def _done_encoding(res):
-            elapsed = time.time() - started
-            self._status.timings["encode"] = elapsed
+            #elapsed = time.time() - started
+            #self._status.timings["encode"] = elapsed
             return res
         d.addCallback(_done_encoding)
-        d.addCallback(lambda shares_and_shareids:
-                      (shares_and_shareids,
-                       required_shares, total_shares,
-                       segment_size, data_length,
-                       target_info) )
         return d
 
-    def _generate_shares(self, (shares_and_shareids,
-                                required_shares, total_shares,
-                                segment_size, data_length,
-                                target_info),
-                         seqnum, IV):
+    def _generate_shares(self, shares_and_shareids):
+        # this sets self.shares and self.root_hash
         self.log("_generate_shares")
-        started = time.time()
+        #started = time.time()
 
         # we should know these by now
         privkey = self._privkey
@@ -1749,7 +1624,7 @@ class Publish:
         (shares, share_ids) = shares_and_shareids
 
         assert len(shares) == len(share_ids)
-        assert len(shares) == total_shares
+        assert len(shares) == self.total_shares
         all_shares = {}
         block_hash_trees = {}
         share_hash_leaves = [None] * len(shares)
@@ -1767,7 +1642,7 @@ class Publish:
             assert leaf is not None
         share_hash_tree = hashtree.HashTree(share_hash_leaves)
         share_hash_chain = {}
-        for shnum in range(total_shares):
+        for shnum in range(self.total_shares):
             needed_hashes = share_hash_tree.needed_hashes(shnum)
             share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
                                               for i in needed_hashes ] )
@@ -1775,24 +1650,24 @@ class Publish:
         assert len(root_hash) == 32
         self.log("my new root_hash is %s" % base32.b2a(root_hash))
 
-        prefix = pack_prefix(seqnum, root_hash, IV,
-                             required_shares, total_shares,
-                             segment_size, data_length)
+        prefix = pack_prefix(self._new_seqnum, root_hash, self.salt,
+                             self.required_shares, self.total_shares,
+                             self.segment_size, len(self.newdata))
 
         # now pack the beginning of the share. All shares are the same up
         # to the signature, then they have divergent share hash chains,
-        # then completely different block hash trees + IV + share data,
+        # then completely different block hash trees + salt + share data,
         # then they all share the same encprivkey at the end. The sizes
         # of everything are the same for all shares.
 
-        sign_started = time.time()
+        #sign_started = time.time()
         signature = privkey.sign(prefix)
-        self._status.timings["sign"] = time.time() - sign_started
+        #self._status.timings["sign"] = time.time() - sign_started
 
         verification_key = pubkey.serialize()
 
         final_shares = {}
-        for shnum in range(total_shares):
+        for shnum in range(self.total_shares):
             final_share = pack_share(prefix,
                                      verification_key,
                                      signature,
@@ -1801,14 +1676,15 @@ class Publish:
                                      all_shares[shnum],
                                      encprivkey)
             final_shares[shnum] = final_share
-        elapsed = time.time() - started
-        self._status.timings["pack"] = elapsed
-        return (seqnum, root_hash, final_shares, target_info)
+        #elapsed = time.time() - started
+        #self._status.timings["pack"] = elapsed
+        self.shares = final_shares
+        self.root_hash = root_hash
 
 
-    def _send_shares(self, (seqnum, root_hash, final_shares, target_info), IV):
+    def _send_shares(self, needed):
         self.log("_send_shares")
-        started = time.time()
+        #started = time.time()
 
         # we're finally ready to send out our shares. If we encounter any
         # surprises here, it's because somebody else is writing at the same
@@ -1817,46 +1693,85 @@ class Publish:
         # surprises here are *not* indications of UncoordinatedWriteError,
         # and we'll need to respond to them more gracefully.)
 
-        target_map, shares_per_peer = target_info
-
-        my_checkstring = pack_checkstring(seqnum, root_hash, IV)
-        peer_messages = {}
-        expected_old_shares = {}
-
-        for shnum, peers in target_map.items():
-            for (peerid, old_seqnum, old_root_hash) in peers:
-                testv = [(0, len(my_checkstring), "le", my_checkstring)]
-                new_share = final_shares[shnum]
-                writev = [(0, new_share)]
-                if peerid not in peer_messages:
-                    peer_messages[peerid] = {}
-                peer_messages[peerid][shnum] = (testv, writev, None)
-                if peerid not in expected_old_shares:
-                    expected_old_shares[peerid] = {}
-                expected_old_shares[peerid][shnum] = (old_seqnum, old_root_hash)
-
-        read_vector = [(0, len(my_checkstring))]
+        # needed is a set of (peerid, shnum) tuples. The first thing we do is
+        # organize it by peerid.
+
+        peermap = DictOfSets()
+        for (peerid, shnum) in needed:
+            peermap[peerid].add(shnum)
+
+        # the next thing is to build up a bunch of test vectors. The
+        # semantics of Publish are that we perform the operation if the world
+        # hasn't changed since the ServerMap was constructed (more or less).
+        # For every share we're trying to place, we create a test vector that
+        # tests to see if the server*share still corresponds to the
+        # map.
+
+        all_tw_vectors = {} # maps peerid to tw_vectors
+        sm = self._servermap.servermap
+
+        for (peerid, shnum) in needed:
+            testvs = []
+            for (old_shnum, old_versionid, old_timestamp) in sm[peerid]:
+                if old_shnum == shnum:
+                    # an old version of that share already exists on the
+                    # server, according to our servermap. We will create a
+                    # request that attempts to replace it.
+                    (old_seqnum, old_root_hash, old_salt, old_segsize,
+                     old_datalength, old_k, old_N, old_prefix,
+                     old_offsets_tuple) = old_versionid
+                    old_checkstring = pack_checkstring(old_seqnum,
+                                                       old_root_hash,
+                                                       old_salt)
+                    testv = [ (0, len(old_checkstring), "eq", old_checkstring) ]
+                    testvs.append(testv)
+                    break
+            if not testvs:
+                # add a testv that requires the share not exist
+                testv = [ (0, 1, 'eq', "") ]
+                testvs.append(testv)
+
+            # the write vector is simply the share
+            writev = [(0, self.shares[shnum])]
+
+            if peerid not in all_tw_vectors:
+                all_tw_vectors[peerid] = {}
+                # maps shnum to (testvs, writevs, new_length)
+            assert shnum not in all_tw_vectors[peerid]
+
+            all_tw_vectors[peerid][shnum] = (testvs, writev, None)
+
+        # we read the checkstring back from each share, however we only use
+        # it to detect whether there was a new share that we didn't know
+        # about. The success or failure of the write will tell us whether
+        # there was a collision or not. If there is a collision, the first
+        # thing we'll do is update the servermap, which will find out what
+        # happened. We could conceivably reduce a roundtrip by using the
+        # readv checkstring to populate the servermap, but really we'd have
+        # to read enough data to validate the signatures too, so it wouldn't
+        # be an overall win.
+        read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
 
-        dl = []
         # ok, send the messages!
-        self._surprised = False
-        dispatch_map = DictOfSets()
-
-        for peerid, tw_vectors in peer_messages.items():
+        started = time.time()
+        dl = []
+        for (peerid, tw_vectors) in all_tw_vectors.items():
 
             write_enabler = self._node.get_write_enabler(peerid)
             renew_secret = self._node.get_renewal_secret(peerid)
             cancel_secret = self._node.get_cancel_secret(peerid)
             secrets = (write_enabler, renew_secret, cancel_secret)
+            shnums = tw_vectors.keys()
 
             d = self._do_testreadwrite(peerid, secrets,
                                        tw_vectors, read_vector)
-            d.addCallback(self._got_write_answer, tw_vectors, my_checkstring,
-                          peerid, expected_old_shares[peerid], dispatch_map,
-                          started)
+            d.addCallbacks(self._got_write_answer, self._got_write_error,
+                           callbackArgs=(peerid, shnums, started),
+                           errbackArgs=(peerid, shnums, started))
+            d.addErrback(self.error, peerid)
             dl.append(d)
 
-        d = defer.DeferredList(dl, fireOnOneErrback=True)
+        d = defer.DeferredList(dl)
         def _done_sending(res):
             elapsed = time.time() - started
             self._status.timings["push"] = elapsed
@@ -1868,7 +1783,7 @@ class Publish:
 
     def _do_testreadwrite(self, peerid, secrets,
                           tw_vectors, read_vector):
-        storage_index = self._node._uri.storage_index
+        storage_index = self._storage_index
         ss = self._storage_servers[peerid]
 
         d = ss.callRemote("slot_testv_and_readv_and_writev",
@@ -1878,54 +1793,63 @@ class Publish:
                           read_vector)
         return d
 
-    def _got_write_answer(self, answer, tw_vectors, my_checkstring,
-                          peerid, expected_old_shares,
-                          dispatch_map, started):
+    def _got_write_answer(self, answer, peerid, shnums, started):
         lp = self.log("_got_write_answer from %s" %
                       idlib.shortnodeid_b2a(peerid))
-        elapsed = time.time() - started
-        self._status.add_per_server_time(peerid, "write", elapsed)
+        for shnum in shnums:
+            self.outstanding.discard( (peerid, shnum) )
 
         wrote, read_data = answer
-        surprised = False
 
-        (new_seqnum,new_root_hash,new_IV) = unpack_checkstring(my_checkstring)
+        if not wrote:
+            # TODO: use the checkstring to add information to the log message
+            #self.log("somebody modified the share on us:"
+            #         " shnum=%d: I thought they had #%d:R=%s,"
+            #         " but testv reported #%d:R=%s" %
+            #         (shnum,
+            #          seqnum, base32.b2a(root_hash)[:4],
+            #          old_seqnum, base32.b2a(old_root_hash)[:4]),
+            #         parent=lp, level=log.WEIRD)
+            self.log("our testv failed, so the write did not happen",
+                     parent=lp, level=log.WEIRD)
+            self.surprised = True
+            self.bad_peers.add(peerid) # don't ask them again
+            # self.loop() will take care of finding new homes
+            return
 
-        if wrote:
-            for shnum in tw_vectors:
-                dispatch_map.add(shnum, (peerid, new_seqnum, new_root_hash))
-        else:
-            # surprise! our testv failed, so the write did not happen
-            self.log("our testv failed, that write did not happen",
+        sm = self._servermap.servermap
+        for shnum in shnums:
+            self.placed.add( (peerid, shnum) )
+            # and update the servermap. We strip the old entry out..
+            newset = set([ t
+                           for t in sm[peerid]
+                           if t[0] != shnum ])
+            sm[peerid] = newset
+            # and add a new one
+            sm[peerid].add( (shnum, self.versioninfo, started) )
+
+        surprise_shares = set(read_data.keys()) - set(shnums)
+        if surprise_shares:
+            self.log("they had shares %s that we didn't know about" %
+                     (list(surprise_shares),),
                      parent=lp, level=log.WEIRD)
-            surprised = True
-
-        for shnum, (old_cs,) in read_data.items():
-            (old_seqnum, old_root_hash, IV) = unpack_checkstring(old_cs)
-
-            if not wrote:
-                dispatch_map.add(shnum, (peerid, old_seqnum, old_root_hash))
-
-            if shnum not in expected_old_shares:
-                # surprise! there was a share we didn't know about
-                self.log("they had share %d that we didn't know about" % shnum,
-                         parent=lp, level=log.WEIRD)
-                surprised = True
-            else:
-                seqnum, root_hash = expected_old_shares[shnum]
-                if seqnum is not None:
-                    if seqnum != old_seqnum or root_hash != old_root_hash:
-                        # surprise! somebody modified the share on us
-                        self.log("somebody modified the share on us:"
-                                 " shnum=%d: I thought they had #%d:R=%s,"
-                                 " but testv reported #%d:R=%s" %
-                                 (shnum,
-                                  seqnum, base32.b2a(root_hash)[:4],
-                                  old_seqnum, base32.b2a(old_root_hash)[:4]),
-                                 parent=lp, level=log.WEIRD)
-                        surprised = True
-        if surprised:
-            self._surprised = True
+            self.surprised = True
+            return
+
+        # self.loop() will take care of checking to see if we're done
+        return
+
+    def _got_write_error(self, f, peerid, shnums, started):
+        for shnum in shnums:
+            self.outstanding.discard( (peerid, shnum) )
+        self.bad_peers.add(peerid)
+        self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s",
+                 shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid),
+                 level=log.UNUSUAL)
+        # self.loop() will take care of checking to see if we're done
+        return
+
+
 
     def _log_dispatch_map(self, dispatch_map):
         for shnum, places in dispatch_map.items():
@@ -1960,6 +1884,30 @@ class Publish:
         return self._status
 
 
+    def _do_privkey_query(self, rref, peerid, shnum, offset, length):
+        started = time.time()
+        d = self._do_read(rref, peerid, self._storage_index,
+                          [shnum], [(offset, length)])
+        d.addCallback(self._privkey_query_response, peerid, shnum, started)
+        return d
+
+    def _privkey_query_response(self, datav, peerid, shnum, started):
+        elapsed = time.time() - started
+        self._status.add_per_server_time(peerid, "read", elapsed)
+
+        data = datav[shnum][0]
+        self._try_to_validate_privkey(data, peerid, shnum)
+
+        elapsed = time.time() - self._privkey_fetch_started
+        self._status.timings["privkey_fetch"] = elapsed
+        self._status.privkey_from = peerid
+
+    def _obtain_privkey_done(self, target_info):
+        elapsed = time.time() - self._obtain_privkey_started
+        self._status.timings["privkey"] = elapsed
+        return target_info
+
+
 # use client.create_mutable_file() to make one of these
 
 class MutableFileNode:
@@ -2181,9 +2129,35 @@ class MutableFileNode:
         assert self._pubkey, "update_servermap must be called before publish"
         d = self.obtain_lock()
         d.addCallback(lambda res: Publish(self, servermap).publish(newdata))
+        # p = self.publish_class(self)
+        # self._client.notify_publish(p)
         d.addCallback(self.release_lock)
         return d
 
+    def modify(self, modifier, *args, **kwargs):
+        """I use a modifier callback to apply a change to the mutable file.
+        I implement the following pseudocode::
+
+         obtain_mutable_filenode_lock()
+         while True:
+           update_servermap(MODE_WRITE)
+           old = retrieve_best_version()
+           new = modifier(old, *args, **kwargs)
+           if new == old: break
+           try:
+             publish(new)
+           except UncoordinatedWriteError:
+             continue
+           break
+         release_mutable_filenode_lock()
+
+        The idea is that your modifier function can apply a delta of some
+        sort, and it will be re-run as necessary until it succeeds. The
+        modifier must inspect the old version to see whether its delta has
+        already been applied: if so it should return the contents unmodified.
+        """
+        NotImplementedError
+
     #################################
 
     def check(self):