]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/mutable/publish.py
Remove the whitespace reported by find-trailing-spaces. No code changes.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / publish.py
index e8f2ebb4751e42257535f7913e22bbc326081396..4acc2d6c3687444bafdd8736fa9a9452fe009b07 100644 (file)
@@ -15,7 +15,7 @@ from allmydata.storage.server import si_b2a
 from pycryptopp.cipher.aes import AES
 from foolscap.api import eventually, fireEventually
 
-from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, \
+from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, MODE_REPAIR, \
      UncoordinatedWriteError, NotEnoughServersError
 from allmydata.mutable.servermap import ServerMap
 from allmydata.mutable.layout import get_version_from_checkstring,\
@@ -51,10 +51,9 @@ class PublishStatus:
         self.started = time.time()
 
     def add_per_server_time(self, server, elapsed):
-        serverid = server.get_serverid()
-        if serverid not in self.timings["send_per_server"]:
-            self.timings["send_per_server"][serverid] = []
-        self.timings["send_per_server"][serverid].append(elapsed)
+        if server not in self.timings["send_per_server"]:
+            self.timings["send_per_server"][server] = []
+        self.timings["send_per_server"][server].append(elapsed)
     def accumulate_encode_time(self, elapsed):
         self.timings["encode"] += elapsed
     def accumulate_encrypt_time(self, elapsed):
@@ -188,7 +187,7 @@ class Publish:
         # servermap was updated in MODE_WRITE, so we can depend upon the
         # serverlist computed by that process instead of computing our own.
         assert self._servermap
-        assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK)
+        assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR)
         # we will push a version that is one larger than anything present
         # in the grid, according to the servermap.
         self._new_seqnum = self._servermap.highest_seqnum() + 1
@@ -254,7 +253,9 @@ class Publish:
         # updating, we ignore damaged and missing shares -- callers must
         # do a repair to repair and recreate these.
         self.goal = set(self._servermap.get_known_shares())
-        self.writers = {}
+
+        # shnum -> set of IMutableSlotWriter
+        self.writers = DictOfSets()
 
         # SDMF files are updated differently.
         self._version = MDMF_VERSION
@@ -279,7 +280,7 @@ class Publish:
                                   self.segment_size,
                                   self.datalength)
 
-            self.writers.setdefault(shnum, []).append(writer)
+            self.writers.add(shnum, writer)
             writer.server = server
             known_shares = self._servermap.get_known_shares()
             assert (server, shnum) in known_shares
@@ -295,7 +296,7 @@ class Publish:
         # after we are done writing share data and have started to write
         # blocks. In the meantime, we need to know what to look for when
         # writing, so that we can detect UncoordinatedWriteErrors.
-        self._checkstring = self.writers.values()[0][0].get_checkstring()
+        self._checkstring = self._get_some_writer().get_checkstring()
 
         # Now, we start pushing shares.
         self._status.timings["setup"] = time.time() - self._started
@@ -331,7 +332,7 @@ class Publish:
         # These are filled in later, after we've modified the block hash
         # tree suitably.
         self.sharehash_leaves = None # eventually [sharehashes]
-        self.sharehashes = {} # shnum -> [sharehash leaves necessary to 
+        self.sharehashes = {} # shnum -> [sharehash leaves necessary to
                               # validate the share]
 
         self.log("Starting push")
@@ -374,7 +375,7 @@ class Publish:
         # servermap was updated in MODE_WRITE, so we can depend upon the
         # serverlist computed by that process instead of computing our own.
         if self._servermap:
-            assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK)
+            assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR)
             # we will push a version that is one larger than anything present
             # in the grid, according to the servermap.
             self._new_seqnum = self._servermap.highest_seqnum() + 1
@@ -453,7 +454,10 @@ class Publish:
 
         # TODO: Make this part do server selection.
         self.update_goal()
-        self.writers = {}
+
+        # shnum -> set of IMutableSlotWriter
+        self.writers = DictOfSets()
+
         if self._version == MDMF_VERSION:
             writer_class = MDMFSlotWriteProxy
         else:
@@ -477,7 +481,7 @@ class Publish:
                                    self.total_shares,
                                    self.segment_size,
                                    self.datalength)
-            self.writers.setdefault(shnum, []).append(writer)
+            self.writers.add(shnum, writer)
             writer.server = server
             known_shares = self._servermap.get_known_shares()
             if (server, shnum) in known_shares:
@@ -496,7 +500,7 @@ class Publish:
         # after we are done writing share data and have started to write
         # blocks. In the meantime, we need to know what to look for when
         # writing, so that we can detect UncoordinatedWriteErrors.
-        self._checkstring = self.writers.values()[0][0].get_checkstring()
+        self._checkstring = self._get_some_writer().get_checkstring()
 
         # Now, we start pushing shares.
         self._status.timings["setup"] = time.time() - self._started
@@ -512,7 +516,7 @@ class Publish:
             for j in xrange(self.num_segments):
                 blocks.append(None)
         self.sharehash_leaves = None # eventually [sharehashes]
-        self.sharehashes = {} # shnum -> [sharehash leaves necessary to 
+        self.sharehashes = {} # shnum -> [sharehash leaves necessary to
                               # validate the share]
 
         self.log("Starting push")
@@ -522,6 +526,8 @@ class Publish:
 
         return self.done_deferred
 
+    def _get_some_writer(self):
+        return list(self.writers.values()[0])[0]
 
     def _update_status(self):
         self._status.set_status("Sending Shares: %d placed out of %d, "
@@ -623,9 +629,8 @@ class Publish:
         # Can we still successfully publish this file?
         # TODO: Keep track of outstanding queries before aborting the
         #       process.
-        all_shnums = filter(lambda sh: len(self.writers[sh]) > 0,
-                            self.writers.iterkeys())
-        if len(all_shnums) < self.required_shares or self.surprised:
+        num_shnums = len(self.writers)
+        if num_shnums < self.required_shares or self.surprised:
             return self._failure()
 
         # Figure out what we need to do next. Each of these needs to
@@ -836,7 +841,7 @@ class Publish:
         uncoordinated writes. SDMF files will have the same checkstring,
         so we need not do anything.
         """
-        self._checkstring = self.writers.values()[0][0].get_checkstring()
+        self._checkstring = self._get_some_writer().get_checkstring()
 
 
     def _make_and_place_signature(self):
@@ -845,7 +850,7 @@ class Publish:
         """
         started = time.time()
         self._status.set_status("Signing prefix")
-        signable = self.writers.values()[0][0].get_signable()
+        signable = self._get_some_writer().get_signable()
         self.signature = self._privkey.sign(signable)
 
         for (shnum, writers) in self.writers.iteritems():
@@ -882,7 +887,7 @@ class Publish:
 
 
     def _record_verinfo(self):
-        self.versioninfo = self.writers.values()[0][0].get_verinfo()
+        self.versioninfo = self._get_some_writer().get_verinfo()
 
 
     def _connection_problem(self, f, writer):
@@ -892,7 +897,7 @@ class Publish:
         """
         self.log("found problem: %s" % str(f))
         self._last_failure = f
-        self.writers[writer.shnum].remove(writer)
+        self.writers.discard(writer.shnum, writer)
 
 
     def log_goal(self, goal, message=""):
@@ -1225,7 +1230,7 @@ class MutableFileHandle:
             old_position = self._filehandle.tell()
             # Seek to the end of the file by seeking 0 bytes from the
             # file's end
-            self._filehandle.seek(0, 2) # 2 == os.SEEK_END in 2.5+
+            self._filehandle.seek(0, os.SEEK_END)
             self._size = self._filehandle.tell()
             # Restore the previous position, in case this was called
             # after a read.
@@ -1319,7 +1324,7 @@ class TransformingUploadable:
 
 
     def read(self, length):
-        # We can get data from 3 sources here. 
+        # We can get data from 3 sources here.
         #   1. The first of the segments provided to us.
         #   2. The data that we're replacing things with.
         #   3. The last of the segments provided to us.