cancel_secret = self._node.get_cancel_secret(server)
secrets = (write_enabler, renew_secret, cancel_secret)
- self.writers[shnum] = writer_class(shnum,
- server.get_rref(),
- self._storage_index,
- secrets,
- self._new_seqnum,
- self.required_shares,
- self.total_shares,
- self.segment_size,
- self.datalength)
- self.writers[shnum].server = server
+ writer = writer_class(shnum,
+ server.get_rref(),
+ self._storage_index,
+ secrets,
+ self._new_seqnum,
+ self.required_shares,
+ self.total_shares,
+ self.segment_size,
+ self.datalength)
+
+ self.writers.setdefault(shnum, []).append(writer)
+ writer.server = server
known_shares = self._servermap.get_known_shares()
assert (server, shnum) in known_shares
old_versionid, old_timestamp = known_shares[(server,shnum)]
(old_seqnum, old_root_hash, old_salt, old_segsize,
old_datalength, old_k, old_N, old_prefix,
old_offsets_tuple) = old_versionid
- self.writers[shnum].set_checkstring(old_seqnum,
- old_root_hash,
- old_salt)
+ writer.set_checkstring(old_seqnum,
+ old_root_hash,
+ old_salt)
# Our remote shares will not have a complete checkstring until
# after we are done writing share data and have started to write
# blocks. In the meantime, we need to know what to look for when
# writing, so that we can detect UncoordinatedWriteErrors.
- self._checkstring = self.writers.values()[0].get_checkstring()
+ self._checkstring = self.writers.values()[0][0].get_checkstring()
# Now, we start pushing shares.
self._status.timings["setup"] = time.time() - self._started
cancel_secret = self._node.get_cancel_secret(server)
secrets = (write_enabler, renew_secret, cancel_secret)
- self.writers[shnum] = writer_class(shnum,
- server.get_rref(),
- self._storage_index,
- secrets,
- self._new_seqnum,
- self.required_shares,
- self.total_shares,
- self.segment_size,
- self.datalength)
- self.writers[shnum].server = server
+ writer = writer_class(shnum,
+ server.get_rref(),
+ self._storage_index,
+ secrets,
+ self._new_seqnum,
+ self.required_shares,
+ self.total_shares,
+ self.segment_size,
+ self.datalength)
+ self.writers.setdefault(shnum, []).append(writer)
+ writer.server = server
known_shares = self._servermap.get_known_shares()
if (server, shnum) in known_shares:
old_versionid, old_timestamp = known_shares[(server,shnum)]
(old_seqnum, old_root_hash, old_salt, old_segsize,
old_datalength, old_k, old_N, old_prefix,
old_offsets_tuple) = old_versionid
- self.writers[shnum].set_checkstring(old_seqnum,
- old_root_hash,
- old_salt)
+ writer.set_checkstring(old_seqnum,
+ old_root_hash,
+ old_salt)
elif (server, shnum) in self.bad_share_checkstrings:
old_checkstring = self.bad_share_checkstrings[(server, shnum)]
- self.writers[shnum].set_checkstring(old_checkstring)
+ writer.set_checkstring(old_checkstring)
# Our remote shares will not have a complete checkstring until
# after we are done writing share data and have started to write
# blocks. In the meantime, we need to know what to look for when
# writing, so that we can detect UncoordinatedWriteErrors.
- self._checkstring = self.writers.values()[0].get_checkstring()
+ self._checkstring = self.writers.values()[0][0].get_checkstring()
# Now, we start pushing shares.
self._status.timings["setup"] = time.time() - self._started
# Can we still successfully publish this file?
# TODO: Keep track of outstanding queries before aborting the
# process.
- if len(self.writers) < self.required_shares or self.surprised:
+ all_writers = []
+ for shnum, writers in self.writers.iteritems():
+ all_writers.extend(writers)
+ if len(all_writers) < self.required_shares or self.surprised:
return self._failure()
# Figure out what we need to do next. Each of these needs to
salt = os.urandom(16)
assert self._version == SDMF_VERSION
- for writer in self.writers.itervalues():
- writer.put_salt(salt)
+ for shnum, writers in self.writers.iteritems():
+ for writer in writers:
+ writer.put_salt(salt)
def _encode_segment(self, segnum):
block_hash = hashutil.block_hash(hashed)
self.blockhashes[shareid][segnum] = block_hash
# find the writer for this share
- writer = self.writers[shareid]
- writer.put_block(sharedata, segnum, salt)
+ writers = self.writers[shareid]
+ for writer in writers:
+ writer.put_block(sharedata, segnum, salt)
def push_everything_else(self):
def push_encprivkey(self):
encprivkey = self._encprivkey
self._status.set_status("Pushing encrypted private key")
- for writer in self.writers.itervalues():
- writer.put_encprivkey(encprivkey)
+ for shnum, writers in self.writers.iteritems():
+ for writer in writers:
+ writer.put_encprivkey(encprivkey)
def push_blockhashes(self):
# set the leaf for future use.
self.sharehash_leaves[shnum] = t[0]
- writer = self.writers[shnum]
- writer.put_blockhashes(self.blockhashes[shnum])
+ writers = self.writers[shnum]
+ for writer in writers:
+ writer.put_blockhashes(self.blockhashes[shnum])
def push_sharehashes(self):
needed_indices = share_hash_tree.needed_hashes(shnum)
self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
for i in needed_indices] )
- writer = self.writers[shnum]
- writer.put_sharehashes(self.sharehashes[shnum])
+ writers = self.writers[shnum]
+ for writer in writers:
+ writer.put_sharehashes(self.sharehashes[shnum])
self.root_hash = share_hash_tree[0]
# - Push the signature
self._status.set_status("Pushing root hashes and signature")
for shnum in xrange(self.total_shares):
- writer = self.writers[shnum]
- writer.put_root_hash(self.root_hash)
+ writers = self.writers[shnum]
+ for writer in writers:
+ writer.put_root_hash(self.root_hash)
self._update_checkstring()
self._make_and_place_signature()
uncoordinated writes. SDMF files will have the same checkstring,
so we need not do anything.
"""
- self._checkstring = self.writers.values()[0].get_checkstring()
+ self._checkstring = self.writers.values()[0][0].get_checkstring()
def _make_and_place_signature(self):
"""
started = time.time()
self._status.set_status("Signing prefix")
- signable = self.writers[0].get_signable()
+ signable = self.writers.values()[0][0].get_signable()
self.signature = self._privkey.sign(signable)
- for (shnum, writer) in self.writers.iteritems():
- writer.put_signature(self.signature)
+ for (shnum, writers) in self.writers.iteritems():
+ for writer in writers:
+ writer.put_signature(self.signature)
self._status.timings['sign'] = time.time() - started
ds = []
verification_key = self._pubkey.serialize()
- for (shnum, writer) in self.writers.copy().iteritems():
- writer.put_verification_key(verification_key)
- self.num_outstanding += 1
- def _no_longer_outstanding(res):
- self.num_outstanding -= 1
- return res
-
- d = writer.finish_publishing()
- d.addBoth(_no_longer_outstanding)
- d.addErrback(self._connection_problem, writer)
- d.addCallback(self._got_write_answer, writer, started)
- ds.append(d)
+ for (shnum, writers) in self.writers.copy().iteritems():
+ for writer in writers:
+ writer.put_verification_key(verification_key)
+ self.num_outstanding += 1
+ def _no_longer_outstanding(res):
+ self.num_outstanding -= 1
+ return res
+
+ d = writer.finish_publishing()
+ d.addBoth(_no_longer_outstanding)
+ d.addErrback(self._connection_problem, writer)
+ d.addCallback(self._got_write_answer, writer, started)
+ ds.append(d)
self._record_verinfo()
self._status.timings['pack'] = time.time() - started
return defer.DeferredList(ds)
def _record_verinfo(self):
- self.versioninfo = self.writers.values()[0].get_verinfo()
+ self.versioninfo = self.writers.values()[0][0].get_verinfo()
def _connection_problem(self, f, writer):
"""
self.log("found problem: %s" % str(f))
self._last_failure = f
- del(self.writers[writer.shnum])
+ self.writers[writer.shnum].remove(writer)
def log_goal(self, goal, message=""):
# knowingly also writing to that server from other writers.
# TODO: Precompute this.
- known_shnums = [x.shnum for x in self.writers.values()
- if x.server == server]
- surprise_shares -= set(known_shnums)
+ shares = []
+ for shnum, writers in self.writers.iteritems():
+ shares.extend([x.shnum for x in writers if x.server == server])
+ known_shnums = set(shares)
+ surprise_shares -= known_shnums
self.log("found the following surprise shares: %s" %
str(surprise_shares))