from pycryptopp.cipher.aes import AES
from foolscap.api import eventually, fireEventually
-from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, \
+from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, MODE_REPAIR, \
UncoordinatedWriteError, NotEnoughServersError
from allmydata.mutable.servermap import ServerMap
from allmydata.mutable.layout import get_version_from_checkstring,\
self.started = time.time()
def add_per_server_time(self, server, elapsed):
- serverid = server.get_serverid()
- if serverid not in self.timings["send_per_server"]:
- self.timings["send_per_server"][serverid] = []
- self.timings["send_per_server"][serverid].append(elapsed)
+ if server not in self.timings["send_per_server"]:
+ self.timings["send_per_server"][server] = []
+ self.timings["send_per_server"][server].append(elapsed)
def accumulate_encode_time(self, elapsed):
self.timings["encode"] += elapsed
def accumulate_encrypt_time(self, elapsed):
# servermap was updated in MODE_WRITE, so we can depend upon the
# serverlist computed by that process instead of computing our own.
assert self._servermap
- assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK)
+ assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR)
# we will push a version that is one larger than anything present
# in the grid, according to the servermap.
self._new_seqnum = self._servermap.highest_seqnum() + 1
# updating, we ignore damaged and missing shares -- callers must
# do a repair to repair and recreate these.
self.goal = set(self._servermap.get_known_shares())
- self.writers = {}
+
+ # shnum -> set of IMutableSlotWriter
+ self.writers = DictOfSets()
# SDMF files are updated differently.
self._version = MDMF_VERSION
self.segment_size,
self.datalength)
- self.writers.setdefault(shnum, []).append(writer)
+ self.writers.add(shnum, writer)
writer.server = server
known_shares = self._servermap.get_known_shares()
assert (server, shnum) in known_shares
# after we are done writing share data and have started to write
# blocks. In the meantime, we need to know what to look for when
# writing, so that we can detect UncoordinatedWriteErrors.
- self._checkstring = self.writers.values()[0][0].get_checkstring()
+ self._checkstring = self._get_some_writer().get_checkstring()
# Now, we start pushing shares.
self._status.timings["setup"] = time.time() - self._started
# These are filled in later, after we've modified the block hash
# tree suitably.
self.sharehash_leaves = None # eventually [sharehashes]
- self.sharehashes = {} # shnum -> [sharehash leaves necessary to
+ self.sharehashes = {} # shnum -> [sharehash leaves necessary to
# validate the share]
self.log("Starting push")
# servermap was updated in MODE_WRITE, so we can depend upon the
# serverlist computed by that process instead of computing our own.
if self._servermap:
- assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK)
+ assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR)
# we will push a version that is one larger than anything present
# in the grid, according to the servermap.
self._new_seqnum = self._servermap.highest_seqnum() + 1
# TODO: Make this part do server selection.
self.update_goal()
- self.writers = {}
+
+ # shnum -> set of IMutableSlotWriter
+ self.writers = DictOfSets()
+
if self._version == MDMF_VERSION:
writer_class = MDMFSlotWriteProxy
else:
self.total_shares,
self.segment_size,
self.datalength)
- self.writers.setdefault(shnum, []).append(writer)
+ self.writers.add(shnum, writer)
writer.server = server
known_shares = self._servermap.get_known_shares()
if (server, shnum) in known_shares:
# after we are done writing share data and have started to write
# blocks. In the meantime, we need to know what to look for when
# writing, so that we can detect UncoordinatedWriteErrors.
- self._checkstring = self.writers.values()[0][0].get_checkstring()
+ self._checkstring = self._get_some_writer().get_checkstring()
# Now, we start pushing shares.
self._status.timings["setup"] = time.time() - self._started
for j in xrange(self.num_segments):
blocks.append(None)
self.sharehash_leaves = None # eventually [sharehashes]
- self.sharehashes = {} # shnum -> [sharehash leaves necessary to
+ self.sharehashes = {} # shnum -> [sharehash leaves necessary to
# validate the share]
self.log("Starting push")
return self.done_deferred
+ def _get_some_writer(self):
+ return list(self.writers.values()[0])[0]
def _update_status(self):
self._status.set_status("Sending Shares: %d placed out of %d, "
# Can we still successfully publish this file?
# TODO: Keep track of outstanding queries before aborting the
# process.
- all_shnums = filter(lambda sh: len(self.writers[sh]) > 0,
- self.writers.iterkeys())
- if len(all_shnums) < self.required_shares or self.surprised:
+ num_shnums = len(self.writers)
+ if num_shnums < self.required_shares or self.surprised:
return self._failure()
# Figure out what we need to do next. Each of these needs to
uncoordinated writes. SDMF files will have the same checkstring,
so we need not do anything.
"""
- self._checkstring = self.writers.values()[0][0].get_checkstring()
+ self._checkstring = self._get_some_writer().get_checkstring()
def _make_and_place_signature(self):
"""
started = time.time()
self._status.set_status("Signing prefix")
- signable = self.writers.values()[0][0].get_signable()
+ signable = self._get_some_writer().get_signable()
self.signature = self._privkey.sign(signable)
for (shnum, writers) in self.writers.iteritems():
def _record_verinfo(self):
- self.versioninfo = self.writers.values()[0][0].get_verinfo()
+ self.versioninfo = self._get_some_writer().get_verinfo()
def _connection_problem(self, f, writer):
"""
self.log("found problem: %s" % str(f))
self._last_failure = f
- self.writers[writer.shnum].remove(writer)
+ self.writers.discard(writer.shnum, writer)
def log_goal(self, goal, message=""):
old_position = self._filehandle.tell()
# Seek to the end of the file by seeking 0 bytes from the
# file's end
- self._filehandle.seek(0, 2) # 2 == os.SEEK_END in 2.5+
+ self._filehandle.seek(0, os.SEEK_END)
self._size = self._filehandle.tell()
# Restore the previous position, in case this was called
# after a read.
def read(self, length):
- # We can get data from 3 sources here.
+ # We can get data from 3 sources here.
# 1. The first of the segments provided to us.
# 2. The data that we're replacing things with.
# 3. The last of the segments provided to us.