]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
mutable: WIP. make Publish work, remove some test scaffolding. test_system still...
authorBrian Warner <warner@allmydata.com>
Fri, 11 Apr 2008 01:44:06 +0000 (18:44 -0700)
committerBrian Warner <warner@allmydata.com>
Fri, 11 Apr 2008 01:44:06 +0000 (18:44 -0700)
src/allmydata/mutable.py
src/allmydata/test/test_mutable.py

index 5fa05816e1906976577f31de1589e2cd003a5c03..956f83b6aaf8af5c8949c34857496951e9d2ae04 100644 (file)
@@ -1,5 +1,5 @@
 
-import os, struct, time, weakref
+import os, sys, struct, time, weakref
 from itertools import count
 from zope.interface import implements
 from twisted.internet import defer
@@ -32,6 +32,9 @@ class UncoordinatedWriteError(Exception):
     def __repr__(self):
         return "<%s -- You, oh user, tried to change a file or directory at the same time as another process was trying to change it.  To avoid data loss, don't do this.  Please see docs/write_coordination.html for details.>" % (self.__class__.__name__,)
 
+class UnrecoverableFileError(Exception):
+    pass
+
 class CorruptShareError(Exception):
     def __init__(self, peerid, shnum, reason):
         self.args = (peerid, shnum, reason)
@@ -278,6 +281,18 @@ class ServerMap:
         self.last_update_mode = None
         self.last_update_time = 0
 
+    def dump(self, out=sys.stdout):
+        print >>out, "servermap:"
+        for (peerid, shares) in self.servermap.items():
+            for (shnum, versionid, timestamp) in sorted(shares):
+                (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+                 offsets_tuple) = versionid
+                print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
+                              (idlib.shortnodeid_b2a(peerid), shnum,
+                               seqnum, base32.b2a(root_hash)[:4], k, N,
+                               datalength))
+        return out
+
     def make_versionmap(self):
         """Return a dict that maps versionid to sets of (shnum, peerid,
         timestamp) tuples."""
@@ -287,6 +302,18 @@ class ServerMap:
                 versionmap.add(verinfo, (shnum, peerid, timestamp))
         return versionmap
 
+    def shares_on_peer(self, peerid):
+        return set([shnum
+                    for (shnum, versionid, timestamp)
+                    in self.servermap.get(peerid, [])])
+
+    def version_on_peer(self, peerid, shnum):
+        shares = self.servermap.get(peerid, [])
+        for (sm_shnum, sm_versionid, sm_timestamp) in shares:
+            if sm_shnum == shnum:
+                return sm_versionid
+        return None
+
     def shares_available(self):
         """Return a dict that maps versionid to tuples of
         (num_distinct_shares, k) tuples."""
@@ -301,6 +328,13 @@ class ServerMap:
             all_shares[versionid] = (len(s), k)
         return all_shares
 
+    def highest_seqnum(self):
+        available = self.shares_available()
+        seqnums = [versionid[0]
+                   for versionid in available.keys()]
+        seqnums.append(0)
+        return max(seqnums)
+
     def recoverable_versions(self):
         """Return a set of versionids, one for each version that is currently
         recoverable."""
@@ -433,7 +467,10 @@ class ServermapUpdater:
         #  * if we only need the checkstring, then [0:75]
         #  * if we need to validate the checkstring sig, then [543ish:799ish]
         #  * if we need the verification key, then [107:436ish]
-        #  * the offset table at [75:107] tells us about the 'ish'
+        #   * the offset table at [75:107] tells us about the 'ish'
+        #  * if we need the encrypted private key, we want [-1216ish:]
+        #   * but we can't read from negative offsets
+        #   * the offset table tells us the 'ish', also the positive offset
         # A future version of the SMDF slot format should consider using
         # fixed-size slots so we can retrieve less data. For now, we'll just
         # read 2000 bytes, which also happens to read enough actual data to
@@ -442,6 +479,9 @@ class ServermapUpdater:
         if mode == MODE_CHECK:
             # we use unpack_prefix_and_signature, so we need 1k
             self._read_size = 1000
+        self._need_privkey = False
+        if mode == MODE_WRITE and not self._node._privkey:
+            self._need_privkey = True
 
         prefix = storage.si_b2a(self._storage_index)[:5]
         self._log_number = log.msg("SharemapUpdater(%s): starting" % prefix)
@@ -494,9 +534,6 @@ class ServermapUpdater:
         # might not wait for all of their answers to come back)
         self.num_peers_to_query = k + self.EPSILON
 
-        # TODO: initial_peers_to_query needs to be ordered list of (peerid,
-        # ss) tuples
-
         if self.mode == MODE_CHECK:
             initial_peers_to_query = dict(full_peerlist)
             must_query = set(initial_peers_to_query.keys())
@@ -509,12 +546,11 @@ class ServermapUpdater:
             initial_peers_to_query, must_query = self._build_initial_querylist()
             self.required_num_empty_peers = self.EPSILON
 
-            # TODO: also populate self._filenode._privkey
+            # TODO: arrange to read lots of data from k-ish servers, to avoid
+            # the extra round trip required to read large directories. This
+            # might also avoid the round trip required to read the encrypted
+            # private key.
 
-            # TODO: arrange to read 3KB from one peer who is likely to hold a
-            # share, so we can avoid the latency of that extra roundtrip. 3KB
-            # would get us the encprivkey from a dirnode with up to 7
-            # entries, allowing us to make an update in 2 RTT instead of 3.
         else:
             initial_peers_to_query, must_query = self._build_initial_querylist()
 
@@ -531,10 +567,8 @@ class ServermapUpdater:
         # contains the overflow (peers that we should tap if we don't get
         # enough responses)
 
-        d = defer.succeed(initial_peers_to_query)
-        d.addCallback(self._send_initial_requests)
-        d.addCallback(lambda res: self._done_deferred)
-        return d
+        self._send_initial_requests(initial_peers_to_query)
+        return self._done_deferred
 
     def _build_initial_querylist(self):
         initial_peers_to_query = {}
@@ -566,10 +600,6 @@ class ServermapUpdater:
         # might produce a result.
         return None
 
-    def _do_read(self, ss, peerid, storage_index, shnums, readv):
-        d = ss.callRemote("slot_readv", storage_index, shnums, readv)
-        return d
-
     def _do_query(self, ss, peerid, storage_index, readsize):
         self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
                  peerid=idlib.shortnodeid_b2a(peerid),
@@ -586,63 +616,18 @@ class ServermapUpdater:
         # _query_failed) get logged, but we still want to check for doneness.
         d.addErrback(log.err)
         d.addBoth(self._check_for_done)
-        d.addErrback(log.err)
+        d.addErrback(self._fatal_error)
         return d
 
-    def _deserialize_pubkey(self, pubkey_s):
-        verifier = rsa.create_verifying_key_from_string(pubkey_s)
-        return verifier
-
-    def _try_to_extract_privkey(self, data, peerid, shnum):
-        try:
-            r = unpack_share(data)
-        except NeedMoreDataError, e:
-            # this share won't help us. oh well.
-            offset = e.encprivkey_offset
-            length = e.encprivkey_length
-            self.log("shnum %d on peerid %s: share was too short (%dB) "
-                     "to get the encprivkey; [%d:%d] ought to hold it" %
-                     (shnum, idlib.shortnodeid_b2a(peerid), len(data),
-                      offset, offset+length))
-            # NOTE: if uncoordinated writes are taking place, someone might
-            # change the share (and most probably move the encprivkey) before
-            # we get a chance to do one of these reads and fetch it. This
-            # will cause us to see a NotEnoughPeersError(unable to fetch
-            # privkey) instead of an UncoordinatedWriteError . This is a
-            # nuisance, but it will go away when we move to DSA-based mutable
-            # files (since the privkey will be small enough to fit in the
-            # write cap).
-
-            self._encprivkey_shares.append( (peerid, shnum, offset, length))
-            return
-
-        (seqnum, root_hash, IV, k, N, segsize, datalen,
-         pubkey, signature, share_hash_chain, block_hash_tree,
-         share_data, enc_privkey) = r
-
-        return self._try_to_validate_privkey(enc_privkey, peerid, shnum)
-
-    def _try_to_validate_privkey(self, enc_privkey, peerid, shnum):
-        alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
-        alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
-        if alleged_writekey != self._writekey:
-            self.log("invalid privkey from %s shnum %d" %
-                     (idlib.nodeid_b2a(peerid)[:8], shnum), level=log.WEIRD)
-            return
-
-        # it's good
-        self.log("got valid privkey from shnum %d on peerid %s" %
-                 (shnum, idlib.shortnodeid_b2a(peerid)))
-        self._privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
-        self._encprivkey = enc_privkey
-        self._node._populate_encprivkey(self._encprivkey)
-        self._node._populate_privkey(self._privkey)
+    def _do_read(self, ss, peerid, storage_index, shnums, readv):
+        d = ss.callRemote("slot_readv", storage_index, shnums, readv)
+        return d
 
     def _got_results(self, datavs, peerid, readsize, stuff, started):
-        self.log(format="got result from [%(peerid)s], %(numshares)d shares",
-                 peerid=idlib.shortnodeid_b2a(peerid),
-                 numshares=len(datavs),
-                 level=log.NOISY)
+        lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
+                     peerid=idlib.shortnodeid_b2a(peerid),
+                     numshares=len(datavs),
+                     level=log.NOISY)
         self._queries_outstanding.discard(peerid)
         self._must_query.discard(peerid)
         self._queries_completed += 1
@@ -655,10 +640,14 @@ class ServermapUpdater:
         else:
             self._empty_peers.add(peerid)
 
+        last_verinfo = None
+        last_shnum = None
         for shnum,datav in datavs.items():
             data = datav[0]
             try:
-                self._got_results_one_share(shnum, data, peerid)
+                verinfo = self._got_results_one_share(shnum, data, peerid)
+                last_verinfo = verinfo
+                last_shnum = shnum
             except CorruptShareError, e:
                 # log it and give the other shares a chance to be processed
                 f = failure.Failure()
@@ -667,8 +656,27 @@ class ServermapUpdater:
                 self._last_failure = f
                 self._servermap.problems.append(f)
                 pass
+
+        if self._need_privkey and last_verinfo:
+            # send them a request for the privkey. We send one request per
+            # server.
+            (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+             offsets_tuple) = last_verinfo
+            o = dict(offsets_tuple)
+
+            self._queries_outstanding.add(peerid)
+            readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
+            ss = self._servermap.connections[peerid]
+            d = self._do_read(ss, peerid, self._storage_index,
+                              [last_shnum], readv)
+            d.addCallback(self._got_privkey_results, peerid, last_shnum)
+            d.addErrback(self._privkey_query_failed, peerid, last_shnum)
+            d.addErrback(log.err)
+            d.addCallback(self._check_for_done)
+            d.addErrback(self._fatal_error)
+
         # all done!
-        self.log("DONE")
+        self.log("_got_results done", parent=lp)
 
     def _got_results_one_share(self, shnum, data, peerid):
         self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
@@ -689,6 +697,9 @@ class ServermapUpdater:
                                         "pubkey doesn't match fingerprint")
             self._node._pubkey = self._deserialize_pubkey(pubkey_s)
 
+        if self._need_privkey:
+            self._try_to_extract_privkey(data, peerid, shnum)
+
         (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
          ig_segsize, ig_datalen, offsets) = unpack_header(data)
         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
@@ -716,6 +727,57 @@ class ServermapUpdater:
         self._servermap.servermap.add(peerid, (shnum, verinfo, timestamp))
         # and the versionmap
         self.versionmap.add(verinfo, (shnum, peerid, timestamp))
+        return verinfo
+
+    def _deserialize_pubkey(self, pubkey_s):
+        verifier = rsa.create_verifying_key_from_string(pubkey_s)
+        return verifier
+
+    def _try_to_extract_privkey(self, data, peerid, shnum):
+        try:
+            r = unpack_share(data)
+        except NeedMoreDataError, e:
+            # this share won't help us. oh well.
+            offset = e.encprivkey_offset
+            length = e.encprivkey_length
+            self.log("shnum %d on peerid %s: share was too short (%dB) "
+                     "to get the encprivkey; [%d:%d] ought to hold it" %
+                     (shnum, idlib.shortnodeid_b2a(peerid), len(data),
+                      offset, offset+length))
+            # NOTE: if uncoordinated writes are taking place, someone might
+            # change the share (and most probably move the encprivkey) before
+            # we get a chance to do one of these reads and fetch it. This
+            # will cause us to see a NotEnoughPeersError(unable to fetch
+            # privkey) instead of an UncoordinatedWriteError . This is a
+            # nuisance, but it will go away when we move to DSA-based mutable
+            # files (since the privkey will be small enough to fit in the
+            # write cap).
+
+            self._encprivkey_shares.append( (peerid, shnum, offset, length))
+            return
+
+        (seqnum, root_hash, IV, k, N, segsize, datalen,
+         pubkey, signature, share_hash_chain, block_hash_tree,
+         share_data, enc_privkey) = r
+
+        return self._try_to_validate_privkey(self, enc_privkey, peerid, shnum)
+
+    def _try_to_validate_privkey(self, enc_privkey, peerid, shnum):
+
+        alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
+        alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
+        if alleged_writekey != self._writekey:
+            self.log("invalid privkey from %s shnum %d" %
+                     (idlib.nodeid_b2a(peerid)[:8], shnum), level=log.WEIRD)
+            return
+
+        # it's good
+        self.log("got valid privkey from shnum %d on peerid %s" %
+                 (shnum, idlib.shortnodeid_b2a(peerid)))
+        privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
+        self._node._populate_encprivkey(enc_privkey)
+        self._node._populate_privkey(privkey)
+        self._need_privkey = False
 
 
     def _query_failed(self, f, peerid):
@@ -730,6 +792,27 @@ class ServermapUpdater:
         self._queries_completed += 1
         self._last_failure = f
 
+    def _got_privkey_results(self, datavs, peerid, shnum):
+        self._queries_outstanding.discard(peerid)
+        if not self._need_privkey:
+            return
+        if shnum not in datavs:
+            self.log("privkey wasn't there when we asked it", level=log.WEIRD)
+            return
+        datav = datavs[shnum]
+        enc_privkey = datav[0]
+        self._try_to_validate_privkey(enc_privkey, peerid, shnum)
+
+    def _privkey_query_failed(self, f, peerid, shnum):
+        self._queries_outstanding.discard(peerid)
+        self.log("error during privkey query: %s %s" % (f, f.value),
+                 level=log.WEIRD)
+        if not self._running:
+            return
+        self._queries_outstanding.discard(peerid)
+        self._servermap.problems.append(f)
+        self._last_failure = f
+
     def _check_for_done(self, res):
         # exit paths:
         #  return self._send_more_queries(outstanding) : send some more queries
@@ -821,6 +904,8 @@ class ServermapUpdater:
             num_not_responded = 0
             num_not_found = 0
             states = []
+            found_boundary = False
+
             for i,(peerid,ss) in enumerate(self.full_peerlist):
                 if peerid in self._bad_peers:
                     # query failed
@@ -835,14 +920,8 @@ class ServermapUpdater:
                         if num_not_found >= self.EPSILON:
                             self.log("MODE_WRITE: found our boundary, %s" %
                                      "".join(states))
-                            # we need to know that we've gotten answers from
-                            # everybody to the left of here
-                            if last_not_responded == -1:
-                                # we're done
-                                self.log("have all our answers")
-                                return self._done()
-                            # still waiting for somebody
-                            return self._send_more_queries(num_not_responded)
+                            found_boundary = True
+                            break
 
                 elif peerid in self._good_peers:
                     # yes shares
@@ -857,6 +936,25 @@ class ServermapUpdater:
                     last_not_responded = i
                     num_not_responded += 1
 
+            if found_boundary:
+                # we need to know that we've gotten answers from
+                # everybody to the left of here
+                if last_not_responded == -1:
+                    # we're done
+                    self.log("have all our answers")
+                    # .. unless we're still waiting on the privkey
+                    if self._need_privkey:
+                        self.log("but we're still waiting for the privkey")
+                        # if we found the boundary but we haven't yet found
+                        # the privkey, we may need to look further. If
+                        # somehow all the privkeys were corrupted (but the
+                        # shares were readable), then this is likely to do an
+                        # exhaustive search.
+                        return self._send_more_queries(MAX_IN_FLIGHT)
+                    return self._done()
+                # still waiting for somebody
+                return self._send_more_queries(num_not_responded)
+
             # if we hit here, we didn't find our boundary, so we're still
             # waiting for peers
             self.log("MODE_WRITE: no boundary yet, %s" % "".join(states))
@@ -869,11 +967,12 @@ class ServermapUpdater:
         return self._send_more_queries(MAX_IN_FLIGHT)
 
     def _send_more_queries(self, num_outstanding):
-        assert self.extra_peers # we shouldn't get here with nothing in reserve
         more_queries = []
 
         while True:
-            self.log(" there are %d queries outstanding" % len(self._queries_outstanding))
+            self.log(format=" there are %(outstanding)d queries outstanding",
+                     outstanding=len(self._queries_outstanding),
+                     level=log.NOISY)
             active_queries = len(self._queries_outstanding) + len(more_queries)
             if active_queries >= num_outstanding:
                 break
@@ -895,11 +994,15 @@ class ServermapUpdater:
         if not self._running:
             return
         self._running = False
-        self._servermap.last_update_mode = self._mode
+        self._servermap.last_update_mode = self.mode
         self._servermap.last_update_time = self._started
         # the servermap will not be touched after this
         eventually(self._done_deferred.callback, self._servermap)
 
+    def _fatal_error(self, f):
+        self.log("fatal error", failure=f, level=log.WEIRD)
+        self._done_deferred.errback(f)
+
 
 class Marker:
     pass
@@ -1270,9 +1373,9 @@ class Retrieve:
         self._running = False
         # res is either the new contents, or a Failure
         if isinstance(res, failure.Failure):
-            self.log("DONE, with failure", failure=res)
+            self.log("Retrieve done, with failure", failure=res)
         else:
-            self.log("DONE, success!: res=%s" % (res,))
+            self.log("Retrieve done, success!: res=%s" % (res,))
         eventually(self._done_deferred.callback, res)
 
 
@@ -1345,6 +1448,16 @@ class Publish:
     To make the initial publish, set servermap to None.
     """
 
+    # we limit the segment size as usual to constrain our memory footprint.
+    # The max segsize is higher for mutable files, because we want to support
+    # dirnodes with up to 10k children, and each child uses about 330 bytes.
+    # If you actually put that much into a directory you'll be using a
+    # footprint of around 14MB, which is higher than we'd like, but it is
+    # more important right now to support large directories than to make
+    # memory usage small when you use them. Once we implement MDMF (with
+    # multiple segments), we will drop this back down, probably to 128KiB.
+    MAX_SEGMENT_SIZE = 3500000
+
     def __init__(self, filenode, servermap):
         self._node = filenode
         self._servermap = servermap
@@ -1352,6 +1465,7 @@ class Publish:
         self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
         num = self._node._client.log("Publish(%s): starting" % prefix)
         self._log_number = num
+        self._running = True
 
     def log(self, *args, **kwargs):
         if 'parent' not in kwargs:
@@ -1381,6 +1495,8 @@ class Publish:
 
         self.log("starting publish, datalen is %s" % len(newdata))
 
+        self.done_deferred = defer.Deferred()
+
         self._writekey = self._node.get_writekey()
         assert self._writekey, "need write capability to publish"
 
@@ -1433,28 +1549,7 @@ class Publish:
                       current_share_peers)
         # TODO: add an errback too, probably to ignore that peer
 
-        # we limit the segment size as usual to constrain our memory
-        # footprint. The max segsize is higher for mutable files, because we
-        # want to support dirnodes with up to 10k children, and each child
-        # uses about 330 bytes. If you actually put that much into a
-        # directory you'll be using a footprint of around 14MB, which is
-        # higher than we'd like, but it is more important right now to
-        # support large directories than to make memory usage small when you
-        # use them. Once we implement MDMF (with multiple segments), we will
-        # drop this back down, probably to 128KiB.
-        self.MAX_SEGMENT_SIZE = 3500000
-
-        segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata))
-        # this must be a multiple of self.required_shares
-        segment_size = mathutil.next_multiple(segment_size,
-                                              self.required_shares)
-        self.segment_size = segment_size
-        if segment_size:
-            self.num_segments = mathutil.div_ceil(len(self.newdata),
-                                                  segment_size)
-        else:
-            self.num_segments = 0
-        assert self.num_segments in [0, 1,] # SDMF restrictions
+        self.setup_encoding_parameters()
 
         self.surprised = False
 
@@ -1474,11 +1569,17 @@ class Publish:
         # When self.placed == self.goal, we're done.
         self.placed = set() # (peerid, shnum) tuples
 
+        # we also keep a mapping from peerid to RemoteReference. Each time we
+        # pull a connection out of the full peerlist, we add it to this for
+        # use later.
+        self.connections = {}
+
         # we use the servermap to populate the initial goal: this way we will
         # try to update each existing share in place.
         for (peerid, shares) in self._servermap.servermap.items():
             for (shnum, versionid, timestamp) in shares:
                 self.goal.add( (peerid, shnum) )
+                self.connections[peerid] = self._servermap.connections[peerid]
 
         # create the shares. We'll discard these as they are delivered. SMDF:
         # we're allowed to hold everything in memory.
@@ -1486,27 +1587,53 @@ class Publish:
         d = self._encrypt_and_encode()
         d.addCallback(self._generate_shares)
         d.addCallback(self.loop) # trigger delivery
+        d.addErrback(self._fatal_error)
 
         return self.done_deferred
 
-    def loop(self):
+    def setup_encoding_parameters(self):
+        segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata))
+        # this must be a multiple of self.required_shares
+        segment_size = mathutil.next_multiple(segment_size,
+                                              self.required_shares)
+        self.segment_size = segment_size
+        if segment_size:
+            self.num_segments = mathutil.div_ceil(len(self.newdata),
+                                                  segment_size)
+        else:
+            self.num_segments = 0
+        assert self.num_segments in [0, 1,] # SDMF restrictions
+
+    def _fatal_error(self, f):
+        self.log("error during loop", failure=f, level=log.SCARY)
+        self._done(f)
+
+    def loop(self, ignored=None):
+        self.log("entering loop", level=log.NOISY)
         self.update_goal()
         # how far are we from our goal?
         needed = self.goal - self.placed - self.outstanding
 
         if needed:
             # we need to send out new shares
-            d = self.send_shares(needed)
+            self.log(format="need to send %(needed)d new shares",
+                     needed=len(needed), level=log.NOISY)
+            d = self._send_shares(needed)
             d.addCallback(self.loop)
             d.addErrback(self._fatal_error)
             return
 
         if self.outstanding:
             # queries are still pending, keep waiting
+            self.log(format="%(outstanding)d queries still outstanding",
+                     outstanding=len(self.outstanding),
+                     level=log.NOISY)
             return
 
         # no queries outstanding, no placements needed: we're done
-        return self._done()
+        self.log("no queries outstanding, no placements needed: done",
+                 level=log.OPERATIONAL)
+        return self._done(None)
 
     def log_goal(self, goal):
         logmsg = []
@@ -1544,19 +1671,19 @@ class Publish:
         # TODO: 2: move those shares instead of copying them, to reduce future
         #       update work
 
-        # this is CPU intensive but easy to analyze. We create a sort order
-        # for each peerid. If the peerid is marked as bad, we don't even put
-        # them in the list. Then we care about the number of shares which
-        # have already been assigned to them. After that we care about their
-        # permutation order.
+        # this is a bit CPU intensive but easy to analyze. We create a sort
+        # order for each peerid. If the peerid is marked as bad, we don't
+        # even put them in the list. Then we care about the number of shares
+        # which have already been assigned to them. After that we care about
+        # their permutation order.
         old_assignments = DictOfSets()
         for (peerid, shnum) in self.goal:
             old_assignments.add(peerid, shnum)
 
         peerlist = []
         for i, (peerid, ss) in enumerate(self.full_peerlist):
-            entry = (len(old_assignments[peerid]), i, peerid, ss)
-            peerlist.add(entry)
+            entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
+            peerlist.append(entry)
         peerlist.sort()
 
         new_assignments = []
@@ -1566,6 +1693,7 @@ class Publish:
         for shnum in homeless_shares:
             (ignored1, ignored2, peerid, ss) = peerlist[i]
             self.goal.add( (peerid, shnum) )
+            self.connections[peerid] = ss
             i += 1
             if i >= len(peerlist):
                 i = 0
@@ -1681,6 +1809,18 @@ class Publish:
         self.shares = final_shares
         self.root_hash = root_hash
 
+        # we also need to build up the version identifier for what we're
+        # pushing. Extract the offsets from one of our shares.
+        assert final_shares
+        offsets = unpack_header(final_shares.values()[0])[-1]
+        offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
+        verinfo = (self._new_seqnum, root_hash, self.salt,
+                   self.segment_size, len(self.newdata),
+                   self.required_shares, self.total_shares,
+                   prefix, offsets_tuple)
+        self.versioninfo = verinfo
+
+
 
     def _send_shares(self, needed):
         self.log("_send_shares")
@@ -1698,7 +1838,7 @@ class Publish:
 
         peermap = DictOfSets()
         for (peerid, shnum) in needed:
-            peermap[peerid].add(shnum)
+            peermap.add(peerid, shnum)
 
         # the next thing is to build up a bunch of test vectors. The
         # semantics of Publish are that we perform the operation if the world
@@ -1712,7 +1852,7 @@ class Publish:
 
         for (peerid, shnum) in needed:
             testvs = []
-            for (old_shnum, old_versionid, old_timestamp) in sm[peerid]:
+            for (old_shnum, old_versionid, old_timestamp) in sm.get(peerid,[]):
                 if old_shnum == shnum:
                     # an old version of that share already exists on the
                     # server, according to our servermap. We will create a
@@ -1723,12 +1863,23 @@ class Publish:
                     old_checkstring = pack_checkstring(old_seqnum,
                                                        old_root_hash,
                                                        old_salt)
-                    testv = [ (0, len(old_checkstring), "eq", old_checkstring) ]
+                    testv = (0, len(old_checkstring), "eq", old_checkstring)
                     testvs.append(testv)
                     break
             if not testvs:
                 # add a testv that requires the share not exist
-                testv = [ (0, 1, 'eq', "") ]
+                #testv = (0, 1, 'eq', "")
+
+                # Unfortunately, foolscap-0.2.5 has a bug in the way inbound
+                # constraints are handled. If the same object is referenced
+                # multiple times inside the arguments, foolscap emits a
+                # 'reference' token instead of a distinct copy of the
+                # argument. The bug is that these 'reference' tokens are not
+                # accepted by the inbound constraint code. To work around
+                # this, we need to prevent python from interning the
+                # (constant) tuple, by creating a new copy of this vector
+                # each time. This bug is fixed in later versions of foolscap.
+                testv = tuple([0, 1, 'eq', ""])
                 testvs.append(testv)
 
             # the write vector is simply the share
@@ -1768,7 +1919,7 @@ class Publish:
             d.addCallbacks(self._got_write_answer, self._got_write_error,
                            callbackArgs=(peerid, shnums, started),
                            errbackArgs=(peerid, shnums, started))
-            d.addErrback(self.error, peerid)
+            d.addErrback(self._fatal_error)
             dl.append(d)
 
         d = defer.DeferredList(dl)
@@ -1784,8 +1935,9 @@ class Publish:
     def _do_testreadwrite(self, peerid, secrets,
                           tw_vectors, read_vector):
         storage_index = self._storage_index
-        ss = self._storage_servers[peerid]
+        ss = self.connections[peerid]
 
+        #print "SS[%s] is %s" % (idlib.shortnodeid_b2a(peerid), ss), ss.tracker.interfaceName
         d = ss.callRemote("slot_testv_and_readv_and_writev",
                           storage_index,
                           secrets,
@@ -1798,31 +1950,40 @@ class Publish:
                       idlib.shortnodeid_b2a(peerid))
         for shnum in shnums:
             self.outstanding.discard( (peerid, shnum) )
+        sm = self._servermap.servermap
 
         wrote, read_data = answer
 
         if not wrote:
-            # TODO: use the checkstring to add information to the log message
-            #self.log("somebody modified the share on us:"
-            #         " shnum=%d: I thought they had #%d:R=%s,"
-            #         " but testv reported #%d:R=%s" %
-            #         (shnum,
-            #          seqnum, base32.b2a(root_hash)[:4],
-            #          old_seqnum, base32.b2a(old_root_hash)[:4]),
-            #         parent=lp, level=log.WEIRD)
             self.log("our testv failed, so the write did not happen",
                      parent=lp, level=log.WEIRD)
             self.surprised = True
             self.bad_peers.add(peerid) # don't ask them again
+            # use the checkstring to add information to the log message
+            for (shnum,readv) in read_data.items():
+                checkstring = readv[0]
+                (other_seqnum,
+                 other_roothash,
+                 other_salt) = unpack_checkstring(checkstring)
+                expected_version = self._servermap.version_on_peer(peerid,
+                                                                   shnum)
+                (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+                 offsets_tuple) = expected_version
+                self.log("somebody modified the share on us:"
+                         " shnum=%d: I thought they had #%d:R=%s,"
+                         " but testv reported #%d:R=%s" %
+                         (shnum,
+                          seqnum, base32.b2a(root_hash)[:4],
+                          other_seqnum, base32.b2a(other_roothash)[:4]),
+                         parent=lp, level=log.NOISY)
             # self.loop() will take care of finding new homes
             return
 
-        sm = self._servermap.servermap
         for shnum in shnums:
             self.placed.add( (peerid, shnum) )
             # and update the servermap. We strip the old entry out..
             newset = set([ t
-                           for t in sm[peerid]
+                           for t in sm.get(peerid, [])
                            if t[0] != shnum ])
             sm[peerid] = newset
             # and add a new one
@@ -1845,6 +2006,7 @@ class Publish:
         self.bad_peers.add(peerid)
         self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s",
                  shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid),
+                 failure=f,
                  level=log.UNUSUAL)
         # self.loop() will take care of checking to see if we're done
         return
@@ -1873,40 +2035,21 @@ class Publish:
         raise UncoordinatedWriteError("I was surprised!")
 
     def _done(self, res):
-        now = time.time()
-        self._status.timings["total"] = now - self._started
-        self._status.set_active(False)
-        self._status.set_status("Done")
-        self._status.set_progress(1.0)
+        if not self._running:
+            return
+        self._running = False
+        #now = time.time()
+        #self._status.timings["total"] = now - self._started
+        #self._status.set_active(False)
+        #self._status.set_status("Done")
+        #self._status.set_progress(1.0)
+        self.done_deferred.callback(res)
         return None
 
     def get_status(self):
         return self._status
 
 
-    def _do_privkey_query(self, rref, peerid, shnum, offset, length):
-        started = time.time()
-        d = self._do_read(rref, peerid, self._storage_index,
-                          [shnum], [(offset, length)])
-        d.addCallback(self._privkey_query_response, peerid, shnum, started)
-        return d
-
-    def _privkey_query_response(self, datav, peerid, shnum, started):
-        elapsed = time.time() - started
-        self._status.add_per_server_time(peerid, "read", elapsed)
-
-        data = datav[shnum][0]
-        self._try_to_validate_privkey(data, peerid, shnum)
-
-        elapsed = time.time() - self._privkey_fetch_started
-        self._status.timings["privkey_fetch"] = elapsed
-        self._status.privkey_from = peerid
-
-    def _obtain_privkey_done(self, target_info):
-        elapsed = time.time() - self._obtain_privkey_started
-        self._status.timings["privkey"] = elapsed
-        return target_info
-
 
 # use client.create_mutable_file() to make one of these
 
@@ -1991,13 +2134,6 @@ class MutableFileNode:
             verifier = signer.get_verifying_key()
             return verifier, signer
 
-    def _publish(self, initial_contents):
-        p = self.publish_class(self)
-        self._client.notify_publish(p)
-        d = p.publish(initial_contents)
-        d.addCallback(lambda res: self)
-        return d
-
     def _encrypt_privkey(self, writekey, privkey):
         enc = AES(writekey)
         crypttext = enc.process(privkey)
@@ -2114,7 +2250,7 @@ class MutableFileNode:
         d = self.obtain_lock()
         d.addCallback(lambda res:
                       ServermapUpdater(self, servermap, mode).update())
-        d.addCallback(self.release_lock)
+        d.addBoth(self.release_lock)
         return d
 
     def download_version(self, servermap, versionid):
@@ -2122,7 +2258,7 @@ class MutableFileNode:
         d = self.obtain_lock()
         d.addCallback(lambda res:
                       Retrieve(self, servermap, versionid).download())
-        d.addCallback(self.release_lock)
+        d.addBoth(self.release_lock)
         return d
 
     def publish(self, servermap, newdata):
@@ -2131,7 +2267,7 @@ class MutableFileNode:
         d.addCallback(lambda res: Publish(self, servermap).publish(newdata))
         # p = self.publish_class(self)
         # self._client.notify_publish(p)
-        d.addCallback(self.release_lock)
+        d.addBoth(self.release_lock)
         return d
 
     def modify(self, modifier, *args, **kwargs):
@@ -2178,17 +2314,32 @@ class MutableFileNode:
     def download_to_data(self):
         d = self.obtain_lock()
         d.addCallback(lambda res: self.update_servermap(mode=MODE_ENOUGH))
-        d.addCallback(lambda smap:
-                      self.download_version(smap,
-                                            smap.best_recoverable_version()))
-        d.addCallback(self.release_lock)
+        def _updated(smap):
+            goal = smap.best_recoverable_version()
+            if not goal:
+                raise UnrecoverableFileError("no recoverable versions")
+            return self.download_version(smap, goal)
+        d.addCallback(_updated)
+        d.addBoth(self.release_lock)
+        return d
+
+    def _publish(self, initial_contents):
+        p = Publish(self, None)
+        d = p.publish(initial_contents)
+        d.addCallback(lambda res: self)
         return d
 
     def update(self, newdata):
-        return self._publish(newdata)
+        d = self.obtain_lock()
+        d.addCallback(lambda res: self.update_servermap(mode=MODE_WRITE))
+        d.addCallback(lambda smap:
+                      Publish(self, smap).publish(newdata))
+        d.addBoth(self.release_lock)
+        return d
 
     def overwrite(self, newdata):
-        return self._publish(newdata)
+        return self.update(newdata)
+
 
 class MutableWatcher(service.MultiService):
     MAX_PUBLISH_STATUSES = 20
index 1b188c10c67ea30ea12090787dd3bcf49d3789b4..32d228b614acefe70c45f6a301ac278e6005f072 100644 (file)
@@ -1,57 +1,29 @@
 
-import itertools, struct, re
+import struct
 from cStringIO import StringIO
 from twisted.trial import unittest
 from twisted.internet import defer, reactor
 from twisted.python import failure
-from allmydata import mutable, uri, dirnode, download
+from allmydata import mutable, uri, download
 from allmydata.util import base32
 from allmydata.util.idlib import shortnodeid_b2a
 from allmydata.util.hashutil import tagged_hash
 from allmydata.encode import NotEnoughPeersError
-from allmydata.interfaces import IURI, INewDirectoryURI, \
-     IMutableFileURI, IUploadable, IFileURI
-from allmydata.filenode import LiteralFileNode
+from allmydata.interfaces import IURI, IMutableFileURI, IUploadable
 from foolscap.eventual import eventually, fireEventually
 from foolscap.logging import log
 import sha
 
-#from allmydata.test.common import FakeMutableFileNode
-#FakeFilenode = FakeMutableFileNode
+# this "FastMutableFileNode" exists solely to speed up tests by using smaller
+# public/private keys. Once we switch to fast DSA-based keys, we can get rid
+# of this.
 
-class FakeFilenode(mutable.MutableFileNode):
-    counter = itertools.count(1)
-    all_contents = {}
-    all_rw_friends = {}
+class FastMutableFileNode(mutable.MutableFileNode):
+    SIGNATURE_KEY_SIZE = 522
 
-    def create(self, initial_contents):
-        d = mutable.MutableFileNode.create(self, initial_contents)
-        def _then(res):
-            self.all_contents[self.get_uri()] = initial_contents
-            return res
-        d.addCallback(_then)
-        return d
-    def init_from_uri(self, myuri):
-        mutable.MutableFileNode.init_from_uri(self, myuri)
-        return self
-    def _generate_pubprivkeys(self, key_size):
-        count = self.counter.next()
-        return FakePubKey(count), FakePrivKey(count)
-    def _publish(self, initial_contents):
-        self.all_contents[self.get_uri()] = initial_contents
-        return defer.succeed(self)
-
-    def download_to_data(self):
-        if self.is_readonly():
-            assert self.all_rw_friends.has_key(self.get_uri()), (self.get_uri(), id(self.all_rw_friends))
-            return defer.succeed(self.all_contents[self.all_rw_friends[self.get_uri()]])
-        else:
-            return defer.succeed(self.all_contents[self.get_uri()])
-    def update(self, newdata):
-        self.all_contents[self.get_uri()] = newdata
-        return defer.succeed(None)
-    def overwrite(self, newdata):
-        return self.update(newdata)
+# this "FakeStorage" exists to put the share data in RAM and avoid using real
+# network connections, both to speed up the tests and to reduce the amount of
+# non-mutable.py code being exercised.
 
 class FakeStorage:
     # this class replaces the collection of storage servers, allowing the
@@ -77,7 +49,7 @@ class FakeStorage:
     def read(self, peerid, storage_index):
         shares = self._peers.get(peerid, {})
         if self._sequence is None:
-            return shares
+            return defer.succeed(shares)
         d = defer.Deferred()
         if not self._pending:
             reactor.callLater(1.0, self._fire_readers)
@@ -106,42 +78,68 @@ class FakeStorage:
         shares[shnum] = f.getvalue()
 
 
-class FakePublish(mutable.Publish):
-
-    def _do_read(self, ss, peerid, storage_index, shnums, readv):
-        assert ss[0] == peerid
-        assert shnums == []
+class FakeStorageServer:
+    def __init__(self, peerid, storage):
+        self.peerid = peerid
+        self.storage = storage
+        self.queries = 0
+    def callRemote(self, methname, *args, **kwargs):
+        def _call():
+            meth = getattr(self, methname)
+            return meth(*args, **kwargs)
         d = fireEventually()
-        d.addCallback(lambda res: self._storage.read(peerid, storage_index))
+        d.addCallback(lambda res: _call())
+        return d
+
+    def slot_readv(self, storage_index, shnums, readv):
+        d = self.storage.read(self.peerid, storage_index)
+        def _read(shares):
+            response = {}
+            for shnum in shares:
+                if shnums and shnum not in shnums:
+                    continue
+                vector = response[shnum] = []
+                for (offset, length) in readv:
+                    assert isinstance(offset, (int, long)), offset
+                    assert isinstance(length, (int, long)), length
+                    vector.append(shares[shnum][offset:offset+length])
+            return response
+        d.addCallback(_read)
         return d
 
-    def _do_testreadwrite(self, peerid, secrets,
-                          tw_vectors, read_vector):
-        storage_index = self._node._uri.storage_index
+    def slot_testv_and_readv_and_writev(self, storage_index, secrets,
+                                        tw_vectors, read_vector):
         # always-pass: parrot the test vectors back to them.
         readv = {}
         for shnum, (testv, writev, new_length) in tw_vectors.items():
             for (offset, length, op, specimen) in testv:
                 assert op in ("le", "eq", "ge")
+            # TODO: this isn't right, the read is controlled by read_vector,
+            # not by testv
             readv[shnum] = [ specimen
                              for (offset, length, op, specimen)
                              in testv ]
             for (offset, data) in writev:
-                self._storage.write(peerid, storage_index, shnum, offset, data)
+                self.storage.write(self.peerid, storage_index, shnum,
+                                   offset, data)
         answer = (True, readv)
-        return defer.succeed(answer)
-
+        return fireEventually(answer)
 
 
-
-class FakeNewDirectoryNode(dirnode.NewDirectoryNode):
-    filenode_class = FakeFilenode
+# our "FakeClient" has just enough functionality of the real Client to let
+# the tests run.
 
 class FakeClient:
+    mutable_file_node_class = FastMutableFileNode
+
     def __init__(self, num_peers=10):
+        self._storage = FakeStorage()
         self._num_peers = num_peers
         self._peerids = [tagged_hash("peerid", "%d" % i)[:20]
                          for i in range(self._num_peers)]
+        self._connections = dict([(peerid, FakeStorageServer(peerid,
+                                                             self._storage))
+                                  for peerid in self._peerids])
         self.nodeid = "fakenodeid"
 
     def log(self, msg, **kw):
@@ -152,17 +150,8 @@ class FakeClient:
     def get_cancel_secret(self):
         return "I hereby permit you to cancel my leases"
 
-    def create_empty_dirnode(self):
-        n = FakeNewDirectoryNode(self)
-        d = n.create()
-        d.addCallback(lambda res: n)
-        return d
-
-    def create_dirnode_from_uri(self, u):
-        return FakeNewDirectoryNode(self).init_from_uri(u)
-
     def create_mutable_file(self, contents=""):
-        n = FakeFilenode(self)
+        n = self.mutable_file_node_class(self)
         d = n.create(contents)
         d.addCallback(lambda res: n)
         return d
@@ -172,25 +161,16 @@ class FakeClient:
 
     def create_node_from_uri(self, u):
         u = IURI(u)
-        if INewDirectoryURI.providedBy(u):
-            return self.create_dirnode_from_uri(u)
-        if IFileURI.providedBy(u):
-            if isinstance(u, uri.LiteralFileURI):
-                return LiteralFileNode(u, self)
-            else:
-                # CHK
-                raise RuntimeError("not simulated")
         assert IMutableFileURI.providedBy(u), u
-        res = FakeFilenode(self).init_from_uri(u)
+        res = self.mutable_file_node_class(self).init_from_uri(u)
         return res
 
     def get_permuted_peers(self, service_name, key):
         """
         @return: list of (peerid, connection,)
         """
-        peers_and_connections = [(pid, (pid,)) for pid in self._peerids]
         results = []
-        for peerid, connection in peers_and_connections:
+        for (peerid, connection) in self._connections.items():
             assert isinstance(peerid, str)
             permuted = sha.new(key + peerid).digest()
             results.append((permuted, peerid, connection))
@@ -205,33 +185,11 @@ class FakeClient:
         #d.addCallback(self.create_mutable_file)
         def _got_data(datav):
             data = "".join(datav)
-            #newnode = FakeFilenode(self)
+            #newnode = FastMutableFileNode(self)
             return uri.LiteralFileURI(data)
         d.addCallback(_got_data)
         return d
 
-class FakePubKey:
-    def __init__(self, count):
-        self.count = count
-    def serialize(self):
-        return "PUBKEY-%d" % self.count
-    def verify(self, msg, signature):
-        if signature[:5] != "SIGN(":
-            return False
-        if signature[5:-1] != msg:
-            return False
-        if signature[-1] != ")":
-            return False
-        return True
-
-class FakePrivKey:
-    def __init__(self, count):
-        self.count = count
-    def serialize(self):
-        return "PRIVKEY-%d" % self.count
-    def sign(self, data):
-        return "SIGN(%s)" % data
-
 
 class Filenode(unittest.TestCase):
     def setUp(self):
@@ -240,7 +198,22 @@ class Filenode(unittest.TestCase):
     def test_create(self):
         d = self.client.create_mutable_file()
         def _created(n):
-            d = n.overwrite("contents 1")
+            self.failUnless(isinstance(n, FastMutableFileNode))
+            peer0 = self.client._peerids[0]
+            shnums = self.client._storage._peers[peer0].keys()
+            self.failUnlessEqual(len(shnums), 1)
+        d.addCallback(_created)
+        return d
+
+    def test_upload_and_download(self):
+        d = self.client.create_mutable_file()
+        def _created(n):
+            d = defer.succeed(None)
+            d.addCallback(lambda res: n.update_servermap())
+            d.addCallback(lambda smap: smap.dump(StringIO()))
+            d.addCallback(lambda sio:
+                          self.failUnless("3-of-10" in sio.getvalue()))
+            d.addCallback(lambda res: n.overwrite("contents 1"))
             d.addCallback(lambda res: self.failUnlessIdentical(res, None))
             d.addCallback(lambda res: n.download_to_data())
             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
@@ -268,40 +241,61 @@ class Filenode(unittest.TestCase):
         d.addCallback(_created)
         return d
 
+    def test_upload_and_download_full_size_keys(self):
+        self.client.mutable_file_node_class = mutable.MutableFileNode
+        d = self.client.create_mutable_file()
+        def _created(n):
+            d = defer.succeed(None)
+            d.addCallback(lambda res: n.update_servermap())
+            d.addCallback(lambda smap: smap.dump(StringIO()))
+            d.addCallback(lambda sio:
+                          self.failUnless("3-of-10" in sio.getvalue()))
+            d.addCallback(lambda res: n.overwrite("contents 1"))
+            d.addCallback(lambda res: self.failUnlessIdentical(res, None))
+            d.addCallback(lambda res: n.download_to_data())
+            d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
+            d.addCallback(lambda res: n.overwrite("contents 2"))
+            d.addCallback(lambda res: n.download_to_data())
+            d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
+            d.addCallback(lambda res: n.download(download.Data()))
+            d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
+            d.addCallback(lambda res: n.update("contents 3"))
+            d.addCallback(lambda res: n.download_to_data())
+            d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
+            return d
+        d.addCallback(_created)
+        return d
+
 
 class Publish(unittest.TestCase):
     def test_encrypt(self):
         c = FakeClient()
-        fn = FakeFilenode(c)
-        # .create usually returns a Deferred, but we happen to know it's
-        # synchronous
+        fn = FastMutableFileNode(c)
         CONTENTS = "some initial contents"
-        fn.create(CONTENTS)
-        p = mutable.Publish(fn)
-        target_info = None
-        d = defer.maybeDeferred(p._encrypt_and_encode, target_info,
-                                CONTENTS, "READKEY", "IV"*8, 3, 10)
-        def _done( ((shares, share_ids),
-                    required_shares, total_shares,
-                    segsize, data_length, target_info2) ):
+        d = fn.create(CONTENTS)
+        def _created(res):
+            p = mutable.Publish(fn, None)
+            p.salt = "SALT" * 4
+            p.readkey = "\x00" * 16
+            p.newdata = CONTENTS
+            p.required_shares = 3
+            p.total_shares = 10
+            p.setup_encoding_parameters()
+            return p._encrypt_and_encode()
+        d.addCallback(_created)
+        def _done(shares_and_shareids):
+            (shares, share_ids) = shares_and_shareids
             self.failUnlessEqual(len(shares), 10)
             for sh in shares:
                 self.failUnless(isinstance(sh, str))
                 self.failUnlessEqual(len(sh), 7)
             self.failUnlessEqual(len(share_ids), 10)
-            self.failUnlessEqual(required_shares, 3)
-            self.failUnlessEqual(total_shares, 10)
-            self.failUnlessEqual(segsize, 21)
-            self.failUnlessEqual(data_length, len(CONTENTS))
-            self.failUnlessIdentical(target_info, target_info2)
         d.addCallback(_done)
         return d
 
     def test_generate(self):
         c = FakeClient()
-        fn = FakeFilenode(c)
-        # .create usually returns a Deferred, but we happen to know it's
-        # synchronous
+        fn = FastMutableFileNode(c)
         CONTENTS = "some initial contents"
         fn.create(CONTENTS)
         p = mutable.Publish(fn)
@@ -328,7 +322,6 @@ class Publish(unittest.TestCase):
             self.failUnlessEqual(sorted(final_shares.keys()), range(10))
             for i,sh in final_shares.items():
                 self.failUnless(isinstance(sh, str))
-                self.failUnlessEqual(len(sh), 381)
                 # feed the share through the unpacker as a sanity-check
                 pieces = mutable.unpack_share(sh)
                 (u_seqnum, u_root_hash, IV, k, N, segsize, datalen,
@@ -340,12 +333,12 @@ class Publish(unittest.TestCase):
                 self.failUnlessEqual(N, 10)
                 self.failUnlessEqual(segsize, 21)
                 self.failUnlessEqual(datalen, len(CONTENTS))
-                self.failUnlessEqual(pubkey, FakePubKey(0).serialize())
+                self.failUnlessEqual(pubkey, p._pubkey.serialize())
                 sig_material = struct.pack(">BQ32s16s BBQQ",
-                                           0, seqnum, root_hash, IV,
+                                           0, p._new_seqnum, root_hash, IV,
                                            k, N, segsize, datalen)
-                self.failUnlessEqual(signature,
-                                     FakePrivKey(0).sign(sig_material))
+                self.failUnless(p._pubkey.verify(sig_material, signature))
+                #self.failUnlessEqual(signature, p._privkey.sign(sig_material))
                 self.failUnless(isinstance(share_hash_chain, dict))
                 self.failUnlessEqual(len(share_hash_chain), 4) # ln2(10)++
                 for shnum,share_hash in share_hash_chain.items():
@@ -354,188 +347,36 @@ class Publish(unittest.TestCase):
                     self.failUnlessEqual(len(share_hash), 32)
                 self.failUnless(isinstance(block_hash_tree, list))
                 self.failUnlessEqual(len(block_hash_tree), 1) # very small tree
-                self.failUnlessEqual(IV, "IV"*8)
+                self.failUnlessEqual(IV, "SALT"*4)
                 self.failUnlessEqual(len(share_data), len("%07d" % 1))
-                self.failUnlessEqual(enc_privkey, "encprivkey")
-            self.failUnlessIdentical(target_info, target_info2)
-        d.addCallback(_done)
-        return d
-
-    def setup_for_sharemap(self, num_peers):
-        c = FakeClient(num_peers)
-        fn = FakeFilenode(c)
-        s = FakeStorage()
-        # .create usually returns a Deferred, but we happen to know it's
-        # synchronous
-        CONTENTS = "some initial contents"
-        fn.create(CONTENTS)
-        p = FakePublish(fn)
-        p._storage_index = "\x00"*32
-        p._new_seqnum = 3
-        p._read_size = 1000
-        #r = mutable.Retrieve(fn)
-        p._storage = s
-        return c, p
-
-    def shouldFail(self, expected_failure, which, call, *args, **kwargs):
-        substring = kwargs.pop("substring", None)
-        d = defer.maybeDeferred(call, *args, **kwargs)
-        def _done(res):
-            if isinstance(res, failure.Failure):
-                res.trap(expected_failure)
-                if substring:
-                    self.failUnless(substring in str(res),
-                                    "substring '%s' not in '%s'"
-                                    % (substring, str(res)))
-            else:
-                self.fail("%s was supposed to raise %s, not get '%s'" %
-                          (which, expected_failure, res))
-        d.addBoth(_done)
-        return d
-
-    def test_sharemap_20newpeers(self):
-        c, p = self.setup_for_sharemap(20)
-
-        total_shares = 10
-        d = p._query_peers(total_shares)
-        def _done(target_info):
-            (target_map, shares_per_peer) = target_info
-            shares_per_peer = {}
-            for shnum in target_map:
-                for (peerid, old_seqnum, old_R) in target_map[shnum]:
-                    #print "shnum[%d]: send to %s [oldseqnum=%s]" % \
-                    #      (shnum, idlib.b2a(peerid), old_seqnum)
-                    if peerid not in shares_per_peer:
-                        shares_per_peer[peerid] = 1
-                    else:
-                        shares_per_peer[peerid] += 1
-            # verify that we're sending only one share per peer
-            for peerid, count in shares_per_peer.items():
-                self.failUnlessEqual(count, 1)
-        d.addCallback(_done)
+                self.failUnlessEqual(enc_privkey, fn.get_encprivkey())
+        d.addCallback(_generated)
         return d
 
-    def test_sharemap_3newpeers(self):
-        c, p = self.setup_for_sharemap(3)
-
-        total_shares = 10
-        d = p._query_peers(total_shares)
-        def _done(target_info):
-            (target_map, shares_per_peer) = target_info
-            shares_per_peer = {}
-            for shnum in target_map:
-                for (peerid, old_seqnum, old_R) in target_map[shnum]:
-                    if peerid not in shares_per_peer:
-                        shares_per_peer[peerid] = 1
-                    else:
-                        shares_per_peer[peerid] += 1
-            # verify that we're sending 3 or 4 shares per peer
-            for peerid, count in shares_per_peer.items():
-                self.failUnless(count in (3,4), count)
-        d.addCallback(_done)
-        return d
-
-    def test_sharemap_nopeers(self):
-        c, p = self.setup_for_sharemap(0)
-
-        total_shares = 10
-        d = self.shouldFail(NotEnoughPeersError, "test_sharemap_nopeers",
-                            p._query_peers, total_shares)
-        return d
-
-    def test_write(self):
-        total_shares = 10
-        c, p = self.setup_for_sharemap(20)
-        p._privkey = FakePrivKey(0)
-        p._encprivkey = "encprivkey"
-        p._pubkey = FakePubKey(0)
-        # make some fake shares
-        CONTENTS = "some initial contents"
-        shares_and_ids = ( ["%07d" % i for i in range(10)], range(10) )
-        d = defer.maybeDeferred(p._query_peers, total_shares)
-        IV = "IV"*8
-        d.addCallback(lambda target_info:
-                      p._generate_shares( (shares_and_ids,
-                                           3, total_shares,
-                                           21, # segsize
-                                           len(CONTENTS),
-                                           target_info),
-                                          3, # seqnum
-                                          IV))
-        d.addCallback(p._send_shares, IV)
-        def _done((surprised, dispatch_map)):
-            self.failIf(surprised, "surprised!")
-        d.addCallback(_done)
-        return d
-
-class FakeRetrieve(mutable.Retrieve):
-    def _do_read(self, ss, peerid, storage_index, shnums, readv):
-        d = fireEventually()
-        d.addCallback(lambda res: self._storage.read(peerid, storage_index))
-        def _read(shares):
-            response = {}
-            for shnum in shares:
-                if shnums and shnum not in shnums:
-                    continue
-                vector = response[shnum] = []
-                for (offset, length) in readv:
-                    assert isinstance(offset, (int, long)), offset
-                    assert isinstance(length, (int, long)), length
-                    vector.append(shares[shnum][offset:offset+length])
-            return response
-        d.addCallback(_read)
-        return d
-
-class FakeServermapUpdater(mutable.ServermapUpdater):
-
-    def _do_read(self, ss, peerid, storage_index, shnums, readv):
-        d = fireEventually()
-        d.addCallback(lambda res: self._storage.read(peerid, storage_index))
-        def _read(shares):
-            response = {}
-            for shnum in shares:
-                if shnums and shnum not in shnums:
-                    continue
-                vector = response[shnum] = []
-                for (offset, length) in readv:
-                    vector.append(shares[shnum][offset:offset+length])
-            return response
-        d.addCallback(_read)
-        return d
-
-    def _deserialize_pubkey(self, pubkey_s):
-        mo = re.search(r"^PUBKEY-(\d+)$", pubkey_s)
-        if not mo:
-            raise RuntimeError("mangled pubkey")
-        count = mo.group(1)
-        return FakePubKey(int(count))
+    # TODO: when we publish to 20 peers, we should get one share per peer on 10
+    # when we publish to 3 peers, we should get either 3 or 4 shares per peer
+    # when we publish to zero peers, we should get a NotEnoughPeersError
 
-class Sharemap(unittest.TestCase):
+class Servermap(unittest.TestCase):
     def setUp(self):
         # publish a file and create shares, which can then be manipulated
         # later.
         num_peers = 20
         self._client = FakeClient(num_peers)
-        self._fn = FakeFilenode(self._client)
-        self._storage = FakeStorage()
-        d = self._fn.create("")
-        def _created(res):
-            p = FakePublish(self._fn)
-            p._storage = self._storage
-            contents = "New contents go here"
-            return p.publish(contents)
+        self._storage = self._client._storage
+        d = self._client.create_mutable_file("New contents go here")
+        def _created(node):
+            self._fn = node
         d.addCallback(_created)
         return d
 
-    def make_servermap(self, storage, mode=mutable.MODE_CHECK):
-        smu = FakeServermapUpdater(self._fn, mutable.ServerMap(), mode)
-        smu._storage = storage
+    def make_servermap(self, mode=mutable.MODE_CHECK):
+        smu = mutable.ServermapUpdater(self._fn, mutable.ServerMap(), mode)
         d = smu.update()
         return d
 
-    def update_servermap(self, storage, oldmap, mode=mutable.MODE_CHECK):
-        smu = FakeServermapUpdater(self._fn, oldmap, mode)
-        smu._storage = storage
+    def update_servermap(self, oldmap, mode=mutable.MODE_CHECK):
+        smu = mutable.ServermapUpdater(self._fn, oldmap, mode)
         d = smu.update()
         return d
 
@@ -550,19 +391,18 @@ class Sharemap(unittest.TestCase):
         return sm
 
     def test_basic(self):
-        s = self._storage # unmangled
         d = defer.succeed(None)
         ms = self.make_servermap
         us = self.update_servermap
 
-        d.addCallback(lambda res: ms(s, mode=mutable.MODE_CHECK))
+        d.addCallback(lambda res: ms(mode=mutable.MODE_CHECK))
         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
-        d.addCallback(lambda res: ms(s, mode=mutable.MODE_WRITE))
+        d.addCallback(lambda res: ms(mode=mutable.MODE_WRITE))
         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
-        d.addCallback(lambda res: ms(s, mode=mutable.MODE_ENOUGH))
+        d.addCallback(lambda res: ms(mode=mutable.MODE_ENOUGH))
         # this more stops at k+epsilon, and epsilon=k, so 6 shares
         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
-        d.addCallback(lambda res: ms(s, mode=mutable.MODE_ANYTHING))
+        d.addCallback(lambda res: ms(mode=mutable.MODE_ANYTHING))
         # this mode stops at 'k' shares
         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 3))
 
@@ -570,12 +410,12 @@ class Sharemap(unittest.TestCase):
         # increasing order of number of servers queried, since once a server
         # gets into the servermap, we'll always ask it for an update.
         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 3))
-        d.addCallback(lambda sm: us(s, sm, mode=mutable.MODE_ENOUGH))
+        d.addCallback(lambda sm: us(sm, mode=mutable.MODE_ENOUGH))
         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
-        d.addCallback(lambda sm: us(s, sm, mode=mutable.MODE_WRITE))
-        d.addCallback(lambda sm: us(s, sm, mode=mutable.MODE_CHECK))
+        d.addCallback(lambda sm: us(sm, mode=mutable.MODE_WRITE))
+        d.addCallback(lambda sm: us(sm, mode=mutable.MODE_CHECK))
         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
-        d.addCallback(lambda sm: us(s, sm, mode=mutable.MODE_ANYTHING))
+        d.addCallback(lambda sm: us(sm, mode=mutable.MODE_ANYTHING))
         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
 
         return d
@@ -588,21 +428,20 @@ class Sharemap(unittest.TestCase):
         self.failUnlessEqual(len(sm.shares_available()), 0)
 
     def test_no_shares(self):
-        s = self._storage
-        s._peers = {} # delete all shares
+        self._client._storage._peers = {} # delete all shares
         ms = self.make_servermap
         d = defer.succeed(None)
 
-        d.addCallback(lambda res: ms(s, mode=mutable.MODE_CHECK))
+        d.addCallback(lambda res: ms(mode=mutable.MODE_CHECK))
         d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
 
-        d.addCallback(lambda res: ms(s, mode=mutable.MODE_ANYTHING))
+        d.addCallback(lambda res: ms(mode=mutable.MODE_ANYTHING))
         d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
 
-        d.addCallback(lambda res: ms(s, mode=mutable.MODE_WRITE))
+        d.addCallback(lambda res: ms(mode=mutable.MODE_WRITE))
         d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
 
-        d.addCallback(lambda res: ms(s, mode=mutable.MODE_ENOUGH))
+        d.addCallback(lambda res: ms(mode=mutable.MODE_ENOUGH))
         d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
 
         return d
@@ -616,7 +455,7 @@ class Sharemap(unittest.TestCase):
         self.failUnlessEqual(sm.shares_available().values()[0], (2,3) )
 
     def test_not_quite_enough_shares(self):
-        s = self._storage
+        s = self._client._storage
         ms = self.make_servermap
         num_shares = len(s._peers)
         for peerid in s._peers:
@@ -629,13 +468,13 @@ class Sharemap(unittest.TestCase):
 
         d = defer.succeed(None)
 
-        d.addCallback(lambda res: ms(s, mode=mutable.MODE_CHECK))
+        d.addCallback(lambda res: ms(mode=mutable.MODE_CHECK))
         d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
-        d.addCallback(lambda res: ms(s, mode=mutable.MODE_ANYTHING))
+        d.addCallback(lambda res: ms(mode=mutable.MODE_ANYTHING))
         d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
-        d.addCallback(lambda res: ms(s, mode=mutable.MODE_WRITE))
+        d.addCallback(lambda res: ms(mode=mutable.MODE_WRITE))
         d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
-        d.addCallback(lambda res: ms(s, mode=mutable.MODE_ENOUGH))
+        d.addCallback(lambda res: ms(mode=mutable.MODE_ENOUGH))
         d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
 
         return d
@@ -643,28 +482,23 @@ class Sharemap(unittest.TestCase):
 
 
 class Roundtrip(unittest.TestCase):
-
     def setUp(self):
         # publish a file and create shares, which can then be manipulated
         # later.
         self.CONTENTS = "New contents go here"
         num_peers = 20
         self._client = FakeClient(num_peers)
-        self._fn = FakeFilenode(self._client)
-        self._storage = FakeStorage()
-        d = self._fn.create("")
-        def _created(res):
-            p = FakePublish(self._fn)
-            p._storage = self._storage
-            return p.publish(self.CONTENTS)
+        self._storage = self._client._storage
+        d = self._client.create_mutable_file(self.CONTENTS)
+        def _created(node):
+            self._fn = node
         d.addCallback(_created)
         return d
 
     def make_servermap(self, mode=mutable.MODE_ENOUGH, oldmap=None):
         if oldmap is None:
             oldmap = mutable.ServerMap()
-        smu = FakeServermapUpdater(self._fn, oldmap, mode)
-        smu._storage = self._storage
+        smu = mutable.ServermapUpdater(self._fn, oldmap, mode)
         d = smu.update()
         return d
 
@@ -693,8 +527,7 @@ class Roundtrip(unittest.TestCase):
     def do_download(self, servermap, version=None):
         if version is None:
             version = servermap.best_recoverable_version()
-        r = FakeRetrieve(self._fn, servermap, version)
-        r._storage = self._storage
+        r = mutable.Retrieve(self._fn, servermap, version)
         return r.download()
 
     def test_basic(self):
@@ -796,8 +629,7 @@ class Roundtrip(unittest.TestCase):
                     allproblems = [str(f) for f in servermap.problems]
                     self.failUnless(substring in "".join(allproblems))
                 return
-            r = FakeRetrieve(self._fn, servermap, ver)
-            r._storage = self._storage
+            r = mutable.Retrieve(self._fn, servermap, ver)
             if should_succeed:
                 d1 = r.download()
                 d1.addCallback(lambda new_contents:
@@ -900,20 +732,33 @@ class Roundtrip(unittest.TestCase):
             self.failUnless("pubkey doesn't match fingerprint"
                             in str(servermap.problems[0]))
             ver = servermap.best_recoverable_version()
-            r = FakeRetrieve(self._fn, servermap, ver)
-            r._storage = self._storage
+            r = mutable.Retrieve(self._fn, servermap, ver)
             return r.download()
         d.addCallback(_do_retrieve)
         d.addCallback(lambda new_contents:
                       self.failUnlessEqual(new_contents, self.CONTENTS))
         return d
 
-    def _encode(self, c, s, fn, k, n, data):
+
+class MultipleEncodings(unittest.TestCase):
+    def setUp(self):
+        self.CONTENTS = "New contents go here"
+        num_peers = 20
+        self._client = FakeClient(num_peers)
+        self._storage = self._client._storage
+        d = self._client.create_mutable_file(self.CONTENTS)
+        def _created(node):
+            self._fn = node
+        d.addCallback(_created)
+        return d
+
+    def _encode(self, k, n, data):
         # encode 'data' into a peerid->shares dict.
 
-        fn2 = FakeFilenode(c)
+        fn2 = FastMutableFileNode(self._client)
         # init_from_uri populates _uri, _writekey, _readkey, _storage_index,
         # and _fingerprint
+        fn = self._fn
         fn2.init_from_uri(fn.get_uri())
         # then we copy over other fields that are normally fetched from the
         # existing shares
@@ -926,9 +771,9 @@ class Roundtrip(unittest.TestCase):
         fn2._required_shares = k
         fn2._total_shares = n
 
-        p2 = FakePublish(fn2)
-        p2._storage = s
-        p2._storage._peers = {} # clear existing storage
+        s = self._client._storage
+        s._peers = {} # clear existing storage
+        p2 = mutable.Publish(fn2, None)
         d = p2.publish(data)
         def _published(res):
             shares = s._peers
@@ -937,29 +782,10 @@ class Roundtrip(unittest.TestCase):
         d.addCallback(_published)
         return d
 
-class MultipleEncodings(unittest.TestCase):
-
-    def publish(self):
-        # publish a file and create shares, which can then be manipulated
-        # later.
-        self.CONTENTS = "New contents go here"
-        num_peers = 20
-        self._client = FakeClient(num_peers)
-        self._fn = FakeFilenode(self._client)
-        self._storage = FakeStorage()
-        d = self._fn.create("")
-        def _created(res):
-            p = FakePublish(self._fn)
-            p._storage = self._storage
-            return p.publish(self.CONTENTS)
-        d.addCallback(_created)
-        return d
-
     def make_servermap(self, mode=mutable.MODE_ENOUGH, oldmap=None):
         if oldmap is None:
             oldmap = mutable.ServerMap()
-        smu = FakeServermapUpdater(self._fn, oldmap, mode)
-        smu._storage = self._storage
+        smu = mutable.ServermapUpdater(self._fn, oldmap, mode)
         d = smu.update()
         return d
 
@@ -967,8 +793,6 @@ class MultipleEncodings(unittest.TestCase):
         # we encode the same file in two different ways (3-of-10 and 4-of-9),
         # then mix up the shares, to make sure that download survives seeing
         # a variety of encodings. This is actually kind of tricky to set up.
-        c, s, fn, p, r = self.setup_for_publish(20)
-        # we ignore fn, p, and r
 
         contents1 = "Contents for encoding 1 (3-of-10) go here"
         contents2 = "Contents for encoding 2 (4-of-9) go here"
@@ -976,19 +800,19 @@ class MultipleEncodings(unittest.TestCase):
 
         # we make a retrieval object that doesn't know what encoding
         # parameters to use
-        fn3 = FakeFilenode(c)
-        fn3.init_from_uri(fn.get_uri())
+        fn3 = FastMutableFileNode(self._client)
+        fn3.init_from_uri(self._fn.get_uri())
 
         # now we upload a file through fn1, and grab its shares
-        d = self._encode(c, s, fn, 3, 10, contents1)
+        d = self._encode(3, 10, contents1)
         def _encoded_1(shares):
             self._shares1 = shares
         d.addCallback(_encoded_1)
-        d.addCallback(lambda res: self._encode(c, s, fn, 4, 9, contents2))
+        d.addCallback(lambda res: self._encode(4, 9, contents2))
         def _encoded_2(shares):
             self._shares2 = shares
         d.addCallback(_encoded_2)
-        d.addCallback(lambda res: self._encode(c, s, fn, 4, 7, contents3))
+        d.addCallback(lambda res: self._encode(4, 7, contents3))
         def _encoded_3(shares):
             self._shares3 = shares
         d.addCallback(_encoded_3)
@@ -1021,14 +845,14 @@ class MultipleEncodings(unittest.TestCase):
 
             sharemap = {}
 
-            for i,peerid in enumerate(c._peerids):
+            for i,peerid in enumerate(self._client._peerids):
                 peerid_s = shortnodeid_b2a(peerid)
                 for shnum in self._shares1.get(peerid, {}):
                     if shnum < len(places):
                         which = places[shnum]
                     else:
                         which = "x"
-                    s._peers[peerid] = peers = {}
+                    self._client._storage._peers[peerid] = peers = {}
                     in_1 = shnum in self._shares1[peerid]
                     in_2 = shnum in self._shares2.get(peerid, {})
                     in_3 = shnum in self._shares3.get(peerid, {})
@@ -1050,14 +874,10 @@ class MultipleEncodings(unittest.TestCase):
             # now sort the sequence so that share 0 is returned first
             new_sequence = [sharemap[shnum]
                             for shnum in sorted(sharemap.keys())]
-            s._sequence = new_sequence
+            self._client._storage._sequence = new_sequence
             log.msg("merge done")
         d.addCallback(_merge)
-        def _retrieve(res):
-            r3 = FakeRetrieve(fn3)
-            r3._storage = s
-            return r3.retrieve()
-        d.addCallback(_retrieve)
+        d.addCallback(lambda res: fn3.download_to_data())
         def _retrieved(new_contents):
             # the current specified behavior is "first version recoverable"
             self.failUnlessEqual(new_contents, contents1)
@@ -1147,4 +967,3 @@ class Utils(unittest.TestCase):
         c.add("v1", 1, 10, xdata[10:20], "time1")
         #self.failUnlessEqual(c.read("v1", 1, 0, 20), (xdata[:20], "time0"))
 
-