]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/mutable/publish.py
mutable/layout.py: make unpack_sdmf_checkstring and unpack_mdmf_checkstring more...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / publish.py
index 066ee9045c22e4975db51195945c49ca9d3ee9e7..036bf51d23d91ee9ee0ac87a01e8b14406ecf7ae 100644 (file)
@@ -18,8 +18,11 @@ from foolscap.api import eventually, fireEventually
 from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, \
      UncoordinatedWriteError, NotEnoughServersError
 from allmydata.mutable.servermap import ServerMap
-from allmydata.mutable.layout import unpack_checkstring, MDMFSlotWriteProxy, \
-                                     SDMFSlotWriteProxy
+from allmydata.mutable.layout import get_version_from_checkstring,\
+                                     unpack_mdmf_checkstring, \
+                                     unpack_sdmf_checkstring, \
+                                     MDMFSlotWriteProxy, \
+                                     SDMFSlotWriteProxy, MDMFCHECKSTRING
 
 KiB = 1024
 DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
@@ -33,6 +36,8 @@ class PublishStatus:
     def __init__(self):
         self.timings = {}
         self.timings["send_per_server"] = {}
+        self.timings["encrypt"] = 0.0
+        self.timings["encode"] = 0.0
         self.servermap = None
         self.problems = {}
         self.active = True
@@ -49,6 +54,10 @@ class PublishStatus:
         if peerid not in self.timings["send_per_server"]:
             self.timings["send_per_server"][peerid] = []
         self.timings["send_per_server"][peerid].append(elapsed)
+    def accumulate_encode_time(self, elapsed):
+        self.timings["encode"] += elapsed
+    def accumulate_encrypt_time(self, elapsed):
+        self.timings["encrypt"] += elapsed
 
     def get_started(self):
         return self.started
@@ -222,11 +231,11 @@ class Publish:
         # existing servermap.
         self.goal = set() # pairs of (peerid, shnum) tuples
 
-        # the second table is our list of outstanding queries: those which
-        # are in flight and may or may not be delivered, accepted, or
-        # acknowledged. Items are added to this table when the request is
-        # sent, and removed when the response returns (or errbacks).
-        self.outstanding = set() # (peerid, shnum) tuples
+        # the number of outstanding queries: those that are in flight and
+        # may or may not be delivered, accepted, or acknowledged. This is
+        # incremented when a query is sent, and decremented when the response
+        # returns or errbacks.
+        self.num_outstanding = 0
 
         # the third is a table of successes: share which have actually been
         # placed. These are populated when responses come back with success.
@@ -419,11 +428,11 @@ class Publish:
         # existing servermap.
         self.goal = set() # pairs of (peerid, shnum) tuples
 
-        # the second table is our list of outstanding queries: those which
-        # are in flight and may or may not be delivered, accepted, or
-        # acknowledged. Items are added to this table when the request is
-        # sent, and removed when the response returns (or errbacks).
-        self.outstanding = set() # (peerid, shnum) tuples
+        # the number of outstanding queries: those that are in flight and
+        # may or may not be delivered, accepted, or acknowledged. This is
+        # incremented when a query is sent, and decremented when the response
+        # returns or errbacks.
+        self.num_outstanding = 0
 
         # the third is a table of successes: share which have actually been
         # placed. These are populated when responses come back with success.
@@ -529,7 +538,7 @@ class Publish:
                                 "%d messages outstanding" %
                                 (len(self.placed),
                                  len(self.goal),
-                                 len(self.outstanding)))
+                                 self.num_outstanding))
         self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
 
 
@@ -711,7 +720,7 @@ class Publish:
         assert len(crypttext) == len(data)
 
         now = time.time()
-        self._status.timings["encrypt"] = now - started
+        self._status.accumulate_encrypt_time(now - started)
         started = now
 
         # now apply FEC
@@ -732,7 +741,7 @@ class Publish:
         d = fec.encode(crypttext_pieces)
         def _done_encoding(res):
             elapsed = time.time() - started
-            self._status.timings["encode"] = elapsed
+            self._status.accumulate_encode_time(elapsed)
             return (res, salt)
         d.addCallback(_done_encoding)
         return d
@@ -855,18 +864,17 @@ class Publish:
         ds = []
         verification_key = self._pubkey.serialize()
 
-
-        # TODO: Bad, since we remove from this same dict. We need to
-        # make a copy, or just use a non-iterated value.
-        for (shnum, writer) in self.writers.iteritems():
+        for (shnum, writer) in self.writers.copy().iteritems():
             writer.put_verification_key(verification_key)
+            self.num_outstanding += 1
+            def _no_longer_outstanding(res):
+                self.num_outstanding -= 1
+                return res
+
             d = writer.finish_publishing()
-            # Add the (peerid, shnum) tuple to our list of outstanding
-            # queries. This gets used by _loop if some of our queries
-            # fail to place shares.
-            self.outstanding.add((writer.peerid, writer.shnum))
-            d.addCallback(self._got_write_answer, writer, started)
+            d.addBoth(_no_longer_outstanding)
             d.addErrback(self._connection_problem, writer)
+            d.addCallback(self._got_write_answer, writer, started)
             ds.append(d)
         self._record_verinfo()
         self._status.timings['pack'] = time.time() - started
@@ -1094,23 +1102,35 @@ class Publish:
             self.surprised = True
             self.bad_peers.add(writer) # don't ask them again
             # use the checkstring to add information to the log message
+            unknown_format = False
             for (shnum,readv) in read_data.items():
                 checkstring = readv[0]
-                (other_seqnum,
-                 other_roothash,
-                 other_salt) = unpack_checkstring(checkstring)
+                version = get_version_from_checkstring(checkstring)
+                if version == MDMF_VERSION:
+                    (other_seqnum,
+                     other_roothash) = unpack_mdmf_checkstring(checkstring)
+                elif version == SDMF_VERSION:
+                    (other_seqnum,
+                     other_roothash,
+                     other_IV) = unpack_sdmf_checkstring(checkstring)
+                else:
+                    unknown_format = True
                 expected_version = self._servermap.version_on_peer(peerid,
                                                                    shnum)
                 if expected_version:
                     (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
                      offsets_tuple) = expected_version
-                    self.log("somebody modified the share on us:"
-                             " shnum=%d: I thought they had #%d:R=%s,"
-                             " but testv reported #%d:R=%s" %
-                             (shnum,
-                              seqnum, base32.b2a(root_hash)[:4],
-                              other_seqnum, base32.b2a(other_roothash)[:4]),
-                             parent=lp, level=log.NOISY)
+                    msg = ("somebody modified the share on us:"
+                           " shnum=%d: I thought they had #%d:R=%s," %
+                           (shnum,
+                            seqnum, base32.b2a(root_hash)[:4]))
+                    if unknown_format:
+                        msg += (" but I don't know how to read share"
+                                " format %d" % version)
+                    else:
+                        msg += " but testv reported #%d:R=%s" % \
+                               (other_seqnum, other_roothash)
+                    self.log(msg, parent=lp, level=log.NOISY)
                 # if expected_version==None, then we didn't expect to see a
                 # share on that peer, and the 'surprise_shares' clause above
                 # will have logged it.