-import os, struct, time
+import os, time
+from StringIO import StringIO
from itertools import count
from zope.interface import implements
from twisted.internet import defer
from twisted.python import failure
-from allmydata.interfaces import IPublishStatus
+from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION, \
+ IMutableUploadable
from allmydata.util import base32, hashutil, mathutil, idlib, log
from allmydata.util.dictutil import DictOfSets
from allmydata import hashtree, codec
from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, \
UncoordinatedWriteError, NotEnoughServersError
from allmydata.mutable.servermap import ServerMap
-from allmydata.mutable.layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
- unpack_checkstring, SIGNED_PREFIX
+from allmydata.mutable.layout import unpack_checkstring, MDMFSlotWriteProxy, \
+ SDMFSlotWriteProxy
+
+KiB = 1024
+DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
+PUSHING_BLOCKS_STATE = 0
+PUSHING_EVERYTHING_ELSE_STATE = 1
+DONE_STATE = 2
class PublishStatus:
implements(IPublishStatus)
self._log_number = num
self._running = True
self._first_write_error = None
+ self._last_failure = None
self._status = PublishStatus()
self._status.set_storage_index(self._storage_index)
self._status.set_helper(False)
self._status.set_progress(0.0)
self._status.set_active(True)
+ self._version = self._node.get_version()
+ assert self._version in (SDMF_VERSION, MDMF_VERSION)
+
def get_status(self):
return self._status
kwargs["facility"] = "tahoe.mutable.publish"
return log.msg(*args, **kwargs)
+
+ def update(self, data, offset, blockhashes, version):
+ """
+ I replace the contents of this file with the contents of data,
+ starting at offset. I return a Deferred that fires with None
+ when the replacement has been completed, or with an error if
+ something went wrong during the process.
+
+ Note that this process will not upload new shares. If the file
+ being updated is in need of repair, callers will have to repair
+ it on their own.
+ """
+ # How this works:
+ # 1: Make peer assignments. We'll assign each share that we know
+ # about on the grid to that peer that currently holds that
+ # share, and will not place any new shares.
+ # 2: Setup encoding parameters. Most of these will stay the same
+ # -- datalength will change, as will some of the offsets.
+ # 3. Upload the new segments.
+ # 4. Be done.
+ assert IMutableUploadable.providedBy(data)
+
+ self.data = data
+
+ # XXX: Use the MutableFileVersion instead.
+ self.datalength = self._node.get_size()
+ if data.get_size() > self.datalength:
+ self.datalength = data.get_size()
+
+ self.log("starting update")
+ self.log("adding new data of length %d at offset %d" % \
+ (data.get_size(), offset))
+ self.log("new data length is %d" % self.datalength)
+ self._status.set_size(self.datalength)
+ self._status.set_status("Started")
+ self._started = time.time()
+
+ self.done_deferred = defer.Deferred()
+
+ self._writekey = self._node.get_writekey()
+ assert self._writekey, "need write capability to publish"
+
+ # first, which servers will we publish to? We require that the
+ # servermap was updated in MODE_WRITE, so we can depend upon the
+ # peerlist computed by that process instead of computing our own.
+ assert self._servermap
+ assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
+ # we will push a version that is one larger than anything present
+ # in the grid, according to the servermap.
+ self._new_seqnum = self._servermap.highest_seqnum() + 1
+ self._status.set_servermap(self._servermap)
+
+ self.log(format="new seqnum will be %(seqnum)d",
+ seqnum=self._new_seqnum, level=log.NOISY)
+
+ # We're updating an existing file, so all of the following
+ # should be available.
+ self.readkey = self._node.get_readkey()
+ self.required_shares = self._node.get_required_shares()
+ assert self.required_shares is not None
+ self.total_shares = self._node.get_total_shares()
+ assert self.total_shares is not None
+ self._status.set_encoding(self.required_shares, self.total_shares)
+
+ self._pubkey = self._node.get_pubkey()
+ assert self._pubkey
+ self._privkey = self._node.get_privkey()
+ assert self._privkey
+ self._encprivkey = self._node.get_encprivkey()
+
+ sb = self._storage_broker
+ full_peerlist = [(s.get_serverid(), s.get_rref())
+ for s in sb.get_servers_for_psi(self._storage_index)]
+ self.full_peerlist = full_peerlist # for use later, immutable
+ self.bad_peers = set() # peerids who have errbacked/refused requests
+
+ # This will set self.segment_size, self.num_segments, and
+ # self.fec. TODO: Does it know how to do the offset? Probably
+ # not. So do that part next.
+ self.setup_encoding_parameters(offset=offset)
+
+ # if we experience any surprises (writes which were rejected because
+ # our test vector did not match, or shares which we didn't expect to
+ # see), we set this flag and report an UncoordinatedWriteError at the
+ # end of the publish process.
+ self.surprised = False
+
+ # we keep track of three tables. The first is our goal: which share
+ # we want to see on which servers. This is initially populated by the
+ # existing servermap.
+ self.goal = set() # pairs of (peerid, shnum) tuples
+
+ # the second table is our list of outstanding queries: those which
+ # are in flight and may or may not be delivered, accepted, or
+ # acknowledged. Items are added to this table when the request is
+ # sent, and removed when the response returns (or errbacks).
+ self.outstanding = set() # (peerid, shnum) tuples
+
+ # the third is a table of successes: share which have actually been
+ # placed. These are populated when responses come back with success.
+ # When self.placed == self.goal, we're done.
+ self.placed = set() # (peerid, shnum) tuples
+
+ # we also keep a mapping from peerid to RemoteReference. Each time we
+ # pull a connection out of the full peerlist, we add it to this for
+ # use later.
+ self.connections = {}
+
+ self.bad_share_checkstrings = {}
+
+ # This is set at the last step of the publishing process.
+ self.versioninfo = ""
+
+ # we use the servermap to populate the initial goal: this way we will
+ # try to update each existing share in place. Since we're
+ # updating, we ignore damaged and missing shares -- callers must
+ # do a repair to repair and recreate these.
+ for (peerid, shnum) in self._servermap.servermap:
+ self.goal.add( (peerid, shnum) )
+ self.connections[peerid] = self._servermap.connections[peerid]
+ self.writers = {}
+
+ # SDMF files are updated differently.
+ self._version = MDMF_VERSION
+ writer_class = MDMFSlotWriteProxy
+
+ # For each (peerid, shnum) in self.goal, we make a
+ # write proxy for that peer. We'll use this to write
+ # shares to the peer.
+ for key in self.goal:
+ peerid, shnum = key
+ write_enabler = self._node.get_write_enabler(peerid)
+ renew_secret = self._node.get_renewal_secret(peerid)
+ cancel_secret = self._node.get_cancel_secret(peerid)
+ secrets = (write_enabler, renew_secret, cancel_secret)
+
+ self.writers[shnum] = writer_class(shnum,
+ self.connections[peerid],
+ self._storage_index,
+ secrets,
+ self._new_seqnum,
+ self.required_shares,
+ self.total_shares,
+ self.segment_size,
+ self.datalength)
+ self.writers[shnum].peerid = peerid
+ assert (peerid, shnum) in self._servermap.servermap
+ old_versionid, old_timestamp = self._servermap.servermap[key]
+ (old_seqnum, old_root_hash, old_salt, old_segsize,
+ old_datalength, old_k, old_N, old_prefix,
+ old_offsets_tuple) = old_versionid
+ self.writers[shnum].set_checkstring(old_seqnum,
+ old_root_hash,
+ old_salt)
+
+ # Our remote shares will not have a complete checkstring until
+ # after we are done writing share data and have started to write
+ # blocks. In the meantime, we need to know what to look for when
+ # writing, so that we can detect UncoordinatedWriteErrors.
+ self._checkstring = self.writers.values()[0].get_checkstring()
+
+ # Now, we start pushing shares.
+ self._status.timings["setup"] = time.time() - self._started
+ # First, we encrypt, encode, and publish the shares that we need
+ # to encrypt, encode, and publish.
+
+ # Our update process fetched these for us. We need to update
+ # them in place as publishing happens.
+ self.blockhashes = {} # (shnum, [blochashes])
+ for (i, bht) in blockhashes.iteritems():
+ # We need to extract the leaves from our old hash tree.
+ old_segcount = mathutil.div_ceil(version[4],
+ version[3])
+ h = hashtree.IncompleteHashTree(old_segcount)
+ bht = dict(enumerate(bht))
+ h.set_hashes(bht)
+ leaves = h[h.get_leaf_index(0):]
+ for j in xrange(self.num_segments - len(leaves)):
+ leaves.append(None)
+
+ assert len(leaves) >= self.num_segments
+ self.blockhashes[i] = leaves
+ # This list will now be the leaves that were set during the
+ # initial upload + enough empty hashes to make it a
+ # power-of-two. If we exceed a power of two boundary, we
+ # should be encoding the file over again, and should not be
+ # here. So, we have
+ #assert len(self.blockhashes[i]) == \
+ # hashtree.roundup_pow2(self.num_segments), \
+ # len(self.blockhashes[i])
+ # XXX: Except this doesn't work. Figure out why.
+
+ # These are filled in later, after we've modified the block hash
+ # tree suitably.
+ self.sharehash_leaves = None # eventually [sharehashes]
+ self.sharehashes = {} # shnum -> [sharehash leaves necessary to
+ # validate the share]
+
+ self.log("Starting push")
+
+ self._state = PUSHING_BLOCKS_STATE
+ self._push()
+
+ return self.done_deferred
+
+
def publish(self, newdata):
"""Publish the filenode's current contents. Returns a Deferred that
fires (with None) when the publish has done as much work as it's ever
simultaneous write.
"""
- # 1: generate shares (SDMF: files are small, so we can do it in RAM)
- # 2: perform peer selection, get candidate servers
- # 2a: send queries to n+epsilon servers, to determine current shares
- # 2b: based upon responses, create target map
- # 3: send slot_testv_and_readv_and_writev messages
- # 4: as responses return, update share-dispatch table
- # 4a: may need to run recovery algorithm
- # 5: when enough responses are back, we're done
-
- self.log("starting publish, datalen is %s" % len(newdata))
- self._status.set_size(len(newdata))
+ # 0. Setup encoding parameters, encoder, and other such things.
+ # 1. Encrypt, encode, and publish segments.
+ assert IMutableUploadable.providedBy(newdata)
+
+ self.data = newdata
+ self.datalength = newdata.get_size()
+ #if self.datalength >= DEFAULT_MAX_SEGMENT_SIZE:
+ # self._version = MDMF_VERSION
+ #else:
+ # self._version = SDMF_VERSION
+
+ self.log("starting publish, datalen is %s" % self.datalength)
+ self._status.set_size(self.datalength)
self._status.set_status("Started")
self._started = time.time()
self.full_peerlist = full_peerlist # for use later, immutable
self.bad_peers = set() # peerids who have errbacked/refused requests
- self.newdata = newdata
- self.salt = os.urandom(16)
-
+ # This will set self.segment_size, self.num_segments, and
+ # self.fec.
self.setup_encoding_parameters()
# if we experience any surprises (writes which were rejected because
# end of the publish process.
self.surprised = False
- # as a failsafe, refuse to iterate through self.loop more than a
- # thousand times.
- self.looplimit = 1000
-
# we keep track of three tables. The first is our goal: which share
# we want to see on which servers. This is initially populated by the
# existing servermap.
self.bad_share_checkstrings = {}
+ # This is set at the last step of the publishing process.
+ self.versioninfo = ""
+
# we use the servermap to populate the initial goal: this way we will
# try to update each existing share in place.
for (peerid, shnum) in self._servermap.servermap:
self.bad_share_checkstrings[key] = old_checkstring
self.connections[peerid] = self._servermap.connections[peerid]
- # create the shares. We'll discard these as they are delivered. SDMF:
- # we're allowed to hold everything in memory.
+ # TODO: Make this part do peer selection.
+ self.update_goal()
+ self.writers = {}
+ if self._version == MDMF_VERSION:
+ writer_class = MDMFSlotWriteProxy
+ else:
+ writer_class = SDMFSlotWriteProxy
+
+ # For each (peerid, shnum) in self.goal, we make a
+ # write proxy for that peer. We'll use this to write
+ # shares to the peer.
+ for key in self.goal:
+ peerid, shnum = key
+ write_enabler = self._node.get_write_enabler(peerid)
+ renew_secret = self._node.get_renewal_secret(peerid)
+ cancel_secret = self._node.get_cancel_secret(peerid)
+ secrets = (write_enabler, renew_secret, cancel_secret)
+ self.writers[shnum] = writer_class(shnum,
+ self.connections[peerid],
+ self._storage_index,
+ secrets,
+ self._new_seqnum,
+ self.required_shares,
+ self.total_shares,
+ self.segment_size,
+ self.datalength)
+ self.writers[shnum].peerid = peerid
+ if (peerid, shnum) in self._servermap.servermap:
+ old_versionid, old_timestamp = self._servermap.servermap[key]
+ (old_seqnum, old_root_hash, old_salt, old_segsize,
+ old_datalength, old_k, old_N, old_prefix,
+ old_offsets_tuple) = old_versionid
+ self.writers[shnum].set_checkstring(old_seqnum,
+ old_root_hash,
+ old_salt)
+ elif (peerid, shnum) in self.bad_share_checkstrings:
+ old_checkstring = self.bad_share_checkstrings[(peerid, shnum)]
+ self.writers[shnum].set_checkstring(old_checkstring)
+
+ # Our remote shares will not have a complete checkstring until
+ # after we are done writing share data and have started to write
+ # blocks. In the meantime, we need to know what to look for when
+ # writing, so that we can detect UncoordinatedWriteErrors.
+ self._checkstring = self.writers.values()[0].get_checkstring()
+
+ # Now, we start pushing shares.
self._status.timings["setup"] = time.time() - self._started
- d = self._encrypt_and_encode()
- d.addCallback(self._generate_shares)
- def _start_pushing(res):
- self._started_pushing = time.time()
- return res
- d.addCallback(_start_pushing)
- d.addCallback(self.loop) # trigger delivery
- d.addErrback(self._fatal_error)
+ # First, we encrypt, encode, and publish the shares that we need
+ # to encrypt, encode, and publish.
+
+ # This will eventually hold the block hash chain for each share
+ # that we publish. We define it this way so that empty publishes
+ # will still have something to write to the remote slot.
+ self.blockhashes = dict([(i, []) for i in xrange(self.total_shares)])
+ for i in xrange(self.total_shares):
+ blocks = self.blockhashes[i]
+ for j in xrange(self.num_segments):
+ blocks.append(None)
+ self.sharehash_leaves = None # eventually [sharehashes]
+ self.sharehashes = {} # shnum -> [sharehash leaves necessary to
+ # validate the share]
+
+ self.log("Starting push")
+
+ self._state = PUSHING_BLOCKS_STATE
+ self._push()
return self.done_deferred
- def setup_encoding_parameters(self):
- segment_size = len(self.newdata)
+
+ def _update_status(self):
+ self._status.set_status("Sending Shares: %d placed out of %d, "
+ "%d messages outstanding" %
+ (len(self.placed),
+ len(self.goal),
+ len(self.outstanding)))
+ self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
+
+
+ def setup_encoding_parameters(self, offset=0):
+ if self._version == MDMF_VERSION:
+ segment_size = DEFAULT_MAX_SEGMENT_SIZE # 128 KiB by default
+ else:
+ segment_size = self.datalength # SDMF is only one segment
# this must be a multiple of self.required_shares
segment_size = mathutil.next_multiple(segment_size,
self.required_shares)
self.segment_size = segment_size
+
+ # Calculate the starting segment for the upload.
if segment_size:
- self.num_segments = mathutil.div_ceil(len(self.newdata),
+ # We use div_ceil instead of integer division here because
+ # it is semantically correct.
+ # If datalength isn't an even multiple of segment_size, but
+ # is larger than segment_size, datalength // segment_size
+ # will be the largest number such that num <= datalength and
+ # num % segment_size == 0. But that's not what we want,
+ # because it ignores the extra data. div_ceil will give us
+ # the right number of segments for the data that we're
+ # given.
+ self.num_segments = mathutil.div_ceil(self.datalength,
segment_size)
+
+ self.starting_segment = offset // segment_size
+
else:
self.num_segments = 0
- assert self.num_segments in [0, 1,] # SDMF restrictions
+ self.starting_segment = 0
- def _fatal_error(self, f):
- self.log("error during loop", failure=f, level=log.UNUSUAL)
- self._done(f)
- def _update_status(self):
- self._status.set_status("Sending Shares: %d placed out of %d, "
- "%d messages outstanding" %
- (len(self.placed),
- len(self.goal),
- len(self.outstanding)))
- self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
+ self.log("building encoding parameters for file")
+ self.log("got segsize %d" % self.segment_size)
+ self.log("got %d segments" % self.num_segments)
- def loop(self, ignored=None):
- self.log("entering loop", level=log.NOISY)
- if not self._running:
- return
+ if self._version == SDMF_VERSION:
+ assert self.num_segments in (0, 1) # SDMF
+ # calculate the tail segment size.
+
+ if segment_size and self.datalength:
+ self.tail_segment_size = self.datalength % segment_size
+ self.log("got tail segment size %d" % self.tail_segment_size)
+ else:
+ self.tail_segment_size = 0
+
+ if self.tail_segment_size == 0 and segment_size:
+ # The tail segment is the same size as the other segments.
+ self.tail_segment_size = segment_size
- self.looplimit -= 1
- if self.looplimit <= 0:
- raise LoopLimitExceededError("loop limit exceeded")
+ # Make FEC encoders
+ fec = codec.CRSEncoder()
+ fec.set_params(self.segment_size,
+ self.required_shares, self.total_shares)
+ self.piece_size = fec.get_block_size()
+ self.fec = fec
- if self.surprised:
- # don't send out any new shares, just wait for the outstanding
- # ones to be retired.
- self.log("currently surprised, so don't send any new shares",
- level=log.NOISY)
+ if self.tail_segment_size == self.segment_size:
+ self.tail_fec = self.fec
else:
- self.update_goal()
- # how far are we from our goal?
- needed = self.goal - self.placed - self.outstanding
- self._update_status()
-
- if needed:
- # we need to send out new shares
- self.log(format="need to send %(needed)d new shares",
- needed=len(needed), level=log.NOISY)
- self._send_shares(needed)
- return
-
- if self.outstanding:
- # queries are still pending, keep waiting
- self.log(format="%(outstanding)d queries still outstanding",
- outstanding=len(self.outstanding),
- level=log.NOISY)
- return
+ tail_fec = codec.CRSEncoder()
+ tail_fec.set_params(self.tail_segment_size,
+ self.required_shares,
+ self.total_shares)
+ self.tail_fec = tail_fec
+
+ self._current_segment = self.starting_segment
+ self.end_segment = self.num_segments - 1
+ # Now figure out where the last segment should be.
+ if self.data.get_size() != self.datalength:
+ # We're updating a few segments in the middle of a mutable
+ # file, so we don't want to republish the whole thing.
+ # (we don't have enough data to do that even if we wanted
+ # to)
+ end = self.data.get_size()
+ self.end_segment = end // segment_size
+ if end % segment_size == 0:
+ self.end_segment -= 1
+
+ self.log("got start segment %d" % self.starting_segment)
+ self.log("got end segment %d" % self.end_segment)
+
+
+ def _push(self, ignored=None):
+ """
+ I manage state transitions. In particular, I see that we still
+ have a good enough number of writers to complete the upload
+ successfully.
+ """
+ # Can we still successfully publish this file?
+ # TODO: Keep track of outstanding queries before aborting the
+ # process.
+ if len(self.writers) < self.required_shares or self.surprised:
+ return self._failure()
+
+ # Figure out what we need to do next. Each of these needs to
+ # return a deferred so that we don't block execution when this
+ # is first called in the upload method.
+ if self._state == PUSHING_BLOCKS_STATE:
+ return self.push_segment(self._current_segment)
+
+ elif self._state == PUSHING_EVERYTHING_ELSE_STATE:
+ return self.push_everything_else()
+
+ # If we make it to this point, we were successful in placing the
+ # file.
+ return self._done()
+
+
+ def push_segment(self, segnum):
+ if self.num_segments == 0 and self._version == SDMF_VERSION:
+ self._add_dummy_salts()
+
+ if segnum > self.end_segment:
+ # We don't have any more segments to push.
+ self._state = PUSHING_EVERYTHING_ELSE_STATE
+ return self._push()
+
+ d = self._encode_segment(segnum)
+ d.addCallback(self._push_segment, segnum)
+ def _increment_segnum(ign):
+ self._current_segment += 1
+ # XXX: I don't think we need to do addBoth here -- any errBacks
+ # should be handled within push_segment.
+ d.addCallback(_increment_segnum)
+ d.addCallback(self._turn_barrier)
+ d.addCallback(self._push)
+ d.addErrback(self._failure)
+
+
+ def _turn_barrier(self, result):
+ """
+ I help the publish process avoid the recursion limit issues
+ described in #237.
+ """
+ return fireEventually(result)
+
+
+ def _add_dummy_salts(self):
+ """
+ SDMF files need a salt even if they're empty, or the signature
+ won't make sense. This method adds a dummy salt to each of our
+ SDMF writers so that they can write the signature later.
+ """
+ salt = os.urandom(16)
+ assert self._version == SDMF_VERSION
+
+ for writer in self.writers.itervalues():
+ writer.put_salt(salt)
+
+
+ def _encode_segment(self, segnum):
+ """
+ I encrypt and encode the segment segnum.
+ """
+ started = time.time()
+
+ if segnum + 1 == self.num_segments:
+ segsize = self.tail_segment_size
+ else:
+ segsize = self.segment_size
+
+
+ self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments))
+ data = self.data.read(segsize)
+ # XXX: This is dumb. Why return a list?
+ data = "".join(data)
+
+ assert len(data) == segsize, len(data)
+
+ salt = os.urandom(16)
+
+ key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
+ self._status.set_status("Encrypting")
+ enc = AES(key)
+ crypttext = enc.process(data)
+ assert len(crypttext) == len(data)
- # no queries outstanding, no placements needed: we're done
- self.log("no queries outstanding, no placements needed: done",
- level=log.OPERATIONAL)
now = time.time()
- elapsed = now - self._started_pushing
- self._status.timings["push"] = elapsed
- return self._done(None)
+ self._status.timings["encrypt"] = now - started
+ started = now
+
+ # now apply FEC
+ if segnum + 1 == self.num_segments:
+ fec = self.tail_fec
+ else:
+ fec = self.fec
+
+ self._status.set_status("Encoding")
+ crypttext_pieces = [None] * self.required_shares
+ piece_size = fec.get_block_size()
+ for i in range(len(crypttext_pieces)):
+ offset = i * piece_size
+ piece = crypttext[offset:offset+piece_size]
+ piece = piece + "\x00"*(piece_size - len(piece)) # padding
+ crypttext_pieces[i] = piece
+ assert len(piece) == piece_size
+ d = fec.encode(crypttext_pieces)
+ def _done_encoding(res):
+ elapsed = time.time() - started
+ self._status.timings["encode"] = elapsed
+ return (res, salt)
+ d.addCallback(_done_encoding)
+ return d
+
+
+ def _push_segment(self, encoded_and_salt, segnum):
+ """
+ I push (data, salt) as segment number segnum.
+ """
+ results, salt = encoded_and_salt
+ shares, shareids = results
+ self._status.set_status("Pushing segment")
+ for i in xrange(len(shares)):
+ sharedata = shares[i]
+ shareid = shareids[i]
+ if self._version == MDMF_VERSION:
+ hashed = salt + sharedata
+ else:
+ hashed = sharedata
+ block_hash = hashutil.block_hash(hashed)
+ self.blockhashes[shareid][segnum] = block_hash
+ # find the writer for this share
+ writer = self.writers[shareid]
+ writer.put_block(sharedata, segnum, salt)
+
+
+ def push_everything_else(self):
+ """
+ I put everything else associated with a share.
+ """
+ self._pack_started = time.time()
+ self.push_encprivkey()
+ self.push_blockhashes()
+ self.push_sharehashes()
+ self.push_toplevel_hashes_and_signature()
+ d = self.finish_publishing()
+ def _change_state(ignored):
+ self._state = DONE_STATE
+ d.addCallback(_change_state)
+ d.addCallback(self._push)
+ return d
+
+
+ def push_encprivkey(self):
+ encprivkey = self._encprivkey
+ self._status.set_status("Pushing encrypted private key")
+ for writer in self.writers.itervalues():
+ writer.put_encprivkey(encprivkey)
+
+
+ def push_blockhashes(self):
+ self.sharehash_leaves = [None] * len(self.blockhashes)
+ self._status.set_status("Building and pushing block hash tree")
+ for shnum, blockhashes in self.blockhashes.iteritems():
+ t = hashtree.HashTree(blockhashes)
+ self.blockhashes[shnum] = list(t)
+ # set the leaf for future use.
+ self.sharehash_leaves[shnum] = t[0]
+
+ writer = self.writers[shnum]
+ writer.put_blockhashes(self.blockhashes[shnum])
+
+
+ def push_sharehashes(self):
+ self._status.set_status("Building and pushing share hash chain")
+ share_hash_tree = hashtree.HashTree(self.sharehash_leaves)
+ for shnum in xrange(len(self.sharehash_leaves)):
+ needed_indices = share_hash_tree.needed_hashes(shnum)
+ self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
+ for i in needed_indices] )
+ writer = self.writers[shnum]
+ writer.put_sharehashes(self.sharehashes[shnum])
+ self.root_hash = share_hash_tree[0]
+
+
+ def push_toplevel_hashes_and_signature(self):
+ # We need to to three things here:
+ # - Push the root hash and salt hash
+ # - Get the checkstring of the resulting layout; sign that.
+ # - Push the signature
+ self._status.set_status("Pushing root hashes and signature")
+ for shnum in xrange(self.total_shares):
+ writer = self.writers[shnum]
+ writer.put_root_hash(self.root_hash)
+ self._update_checkstring()
+ self._make_and_place_signature()
+
+
+ def _update_checkstring(self):
+ """
+ After putting the root hash, MDMF files will have the
+ checkstring written to the storage server. This means that we
+ can update our copy of the checkstring so we can detect
+ uncoordinated writes. SDMF files will have the same checkstring,
+ so we need not do anything.
+ """
+ self._checkstring = self.writers.values()[0].get_checkstring()
+
+
+ def _make_and_place_signature(self):
+ """
+ I create and place the signature.
+ """
+ started = time.time()
+ self._status.set_status("Signing prefix")
+ signable = self.writers[0].get_signable()
+ self.signature = self._privkey.sign(signable)
+
+ for (shnum, writer) in self.writers.iteritems():
+ writer.put_signature(self.signature)
+ self._status.timings['sign'] = time.time() - started
+
+
+ def finish_publishing(self):
+ # We're almost done -- we just need to put the verification key
+ # and the offsets
+ started = time.time()
+ self._status.set_status("Pushing shares")
+ self._started_pushing = started
+ ds = []
+ verification_key = self._pubkey.serialize()
+
+
+ # TODO: Bad, since we remove from this same dict. We need to
+ # make a copy, or just use a non-iterated value.
+ for (shnum, writer) in self.writers.iteritems():
+ writer.put_verification_key(verification_key)
+ d = writer.finish_publishing()
+ # Add the (peerid, shnum) tuple to our list of outstanding
+ # queries. This gets used by _loop if some of our queries
+ # fail to place shares.
+ self.outstanding.add((writer.peerid, writer.shnum))
+ d.addCallback(self._got_write_answer, writer, started)
+ d.addErrback(self._connection_problem, writer)
+ ds.append(d)
+ self._record_verinfo()
+ self._status.timings['pack'] = time.time() - started
+ return defer.DeferredList(ds)
+
+
+ def _record_verinfo(self):
+ self.versioninfo = self.writers.values()[0].get_verinfo()
+
+
+ def _connection_problem(self, f, writer):
+ """
+ We ran into a connection problem while working with writer, and
+ need to deal with that.
+ """
+ self.log("found problem: %s" % str(f))
+ self._last_failure = f
+ del(self.writers[writer.shnum])
+
def log_goal(self, goal, message=""):
logmsg = [message]
self.log_goal(self.goal, "after update: ")
+ def _got_write_answer(self, answer, writer, started):
+ if not answer:
+ # SDMF writers only pretend to write when readers set their
+ # blocks, salts, and so on -- they actually just write once,
+ # at the end of the upload process. In fake writes, they
+ # return defer.succeed(None). If we see that, we shouldn't
+ # bother checking it.
+ return
- def _encrypt_and_encode(self):
- # this returns a Deferred that fires with a list of (sharedata,
- # sharenum) tuples. TODO: cache the ciphertext, only produce the
- # shares that we care about.
- self.log("_encrypt_and_encode")
-
- self._status.set_status("Encrypting")
- started = time.time()
-
- key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
- enc = AES(key)
- crypttext = enc.process(self.newdata)
- assert len(crypttext) == len(self.newdata)
+ peerid = writer.peerid
+ lp = self.log("_got_write_answer from %s, share %d" %
+ (idlib.shortnodeid_b2a(peerid), writer.shnum))
now = time.time()
- self._status.timings["encrypt"] = now - started
- started = now
-
- # now apply FEC
-
- self._status.set_status("Encoding")
- fec = codec.CRSEncoder()
- fec.set_params(self.segment_size,
- self.required_shares, self.total_shares)
- piece_size = fec.get_block_size()
- crypttext_pieces = [None] * self.required_shares
- for i in range(len(crypttext_pieces)):
- offset = i * piece_size
- piece = crypttext[offset:offset+piece_size]
- piece = piece + "\x00"*(piece_size - len(piece)) # padding
- crypttext_pieces[i] = piece
- assert len(piece) == piece_size
-
- d = fec.encode(crypttext_pieces)
- def _done_encoding(res):
- elapsed = time.time() - started
- self._status.timings["encode"] = elapsed
- return res
- d.addCallback(_done_encoding)
- return d
-
- def _generate_shares(self, shares_and_shareids):
- # this sets self.shares and self.root_hash
- self.log("_generate_shares")
- self._status.set_status("Generating Shares")
- started = time.time()
-
- # we should know these by now
- privkey = self._privkey
- encprivkey = self._encprivkey
- pubkey = self._pubkey
-
- (shares, share_ids) = shares_and_shareids
-
- assert len(shares) == len(share_ids)
- assert len(shares) == self.total_shares
- all_shares = {}
- block_hash_trees = {}
- share_hash_leaves = [None] * len(shares)
- for i in range(len(shares)):
- share_data = shares[i]
- shnum = share_ids[i]
- all_shares[shnum] = share_data
-
- # build the block hash tree. SDMF has only one leaf.
- leaves = [hashutil.block_hash(share_data)]
- t = hashtree.HashTree(leaves)
- block_hash_trees[shnum] = list(t)
- share_hash_leaves[shnum] = t[0]
- for leaf in share_hash_leaves:
- assert leaf is not None
- share_hash_tree = hashtree.HashTree(share_hash_leaves)
- share_hash_chain = {}
- for shnum in range(self.total_shares):
- needed_hashes = share_hash_tree.needed_hashes(shnum)
- share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
- for i in needed_hashes ] )
- root_hash = share_hash_tree[0]
- assert len(root_hash) == 32
- self.log("my new root_hash is %s" % base32.b2a(root_hash))
- self._new_version_info = (self._new_seqnum, root_hash, self.salt)
-
- prefix = pack_prefix(self._new_seqnum, root_hash, self.salt,
- self.required_shares, self.total_shares,
- self.segment_size, len(self.newdata))
-
- # now pack the beginning of the share. All shares are the same up
- # to the signature, then they have divergent share hash chains,
- # then completely different block hash trees + salt + share data,
- # then they all share the same encprivkey at the end. The sizes
- # of everything are the same for all shares.
-
- sign_started = time.time()
- signature = privkey.sign(prefix)
- self._status.timings["sign"] = time.time() - sign_started
-
- verification_key = pubkey.serialize()
-
- final_shares = {}
- for shnum in range(self.total_shares):
- final_share = pack_share(prefix,
- verification_key,
- signature,
- share_hash_chain[shnum],
- block_hash_trees[shnum],
- all_shares[shnum],
- encprivkey)
- final_shares[shnum] = final_share
- elapsed = time.time() - started
- self._status.timings["pack"] = elapsed
- self.shares = final_shares
- self.root_hash = root_hash
-
- # we also need to build up the version identifier for what we're
- # pushing. Extract the offsets from one of our shares.
- assert final_shares
- offsets = unpack_header(final_shares.values()[0])[-1]
- offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
- verinfo = (self._new_seqnum, root_hash, self.salt,
- self.segment_size, len(self.newdata),
- self.required_shares, self.total_shares,
- prefix, offsets_tuple)
- self.versioninfo = verinfo
-
-
-
- def _send_shares(self, needed):
- self.log("_send_shares")
-
- # we're finally ready to send out our shares. If we encounter any
- # surprises here, it's because somebody else is writing at the same
- # time. (Note: in the future, when we remove the _query_peers() step
- # and instead speculate about [or remember] which shares are where,
- # surprises here are *not* indications of UncoordinatedWriteError,
- # and we'll need to respond to them more gracefully.)
-
- # needed is a set of (peerid, shnum) tuples. The first thing we do is
- # organize it by peerid.
-
- peermap = DictOfSets()
- for (peerid, shnum) in needed:
- peermap.add(peerid, shnum)
-
- # the next thing is to build up a bunch of test vectors. The
- # semantics of Publish are that we perform the operation if the world
- # hasn't changed since the ServerMap was constructed (more or less).
- # For every share we're trying to place, we create a test vector that
- # tests to see if the server*share still corresponds to the
- # map.
-
- all_tw_vectors = {} # maps peerid to tw_vectors
- sm = self._servermap.servermap
-
- for key in needed:
- (peerid, shnum) = key
-
- if key in sm:
- # an old version of that share already exists on the
- # server, according to our servermap. We will create a
- # request that attempts to replace it.
- old_versionid, old_timestamp = sm[key]
- (old_seqnum, old_root_hash, old_salt, old_segsize,
- old_datalength, old_k, old_N, old_prefix,
- old_offsets_tuple) = old_versionid
- old_checkstring = pack_checkstring(old_seqnum,
- old_root_hash,
- old_salt)
- testv = (0, len(old_checkstring), "eq", old_checkstring)
-
- elif key in self.bad_share_checkstrings:
- old_checkstring = self.bad_share_checkstrings[key]
- testv = (0, len(old_checkstring), "eq", old_checkstring)
-
- else:
- # add a testv that requires the share not exist
-
- # Unfortunately, foolscap-0.2.5 has a bug in the way inbound
- # constraints are handled. If the same object is referenced
- # multiple times inside the arguments, foolscap emits a
- # 'reference' token instead of a distinct copy of the
- # argument. The bug is that these 'reference' tokens are not
- # accepted by the inbound constraint code. To work around
- # this, we need to prevent python from interning the
- # (constant) tuple, by creating a new copy of this vector
- # each time.
-
- # This bug is fixed in foolscap-0.2.6, and even though this
- # version of Tahoe requires foolscap-0.3.1 or newer, we are
- # supposed to be able to interoperate with older versions of
- # Tahoe which are allowed to use older versions of foolscap,
- # including foolscap-0.2.5 . In addition, I've seen other
- # foolscap problems triggered by 'reference' tokens (see #541
- # for details). So we must keep this workaround in place.
-
- #testv = (0, 1, 'eq', "")
- testv = tuple([0, 1, 'eq', ""])
-
- testvs = [testv]
- # the write vector is simply the share
- writev = [(0, self.shares[shnum])]
-
- if peerid not in all_tw_vectors:
- all_tw_vectors[peerid] = {}
- # maps shnum to (testvs, writevs, new_length)
- assert shnum not in all_tw_vectors[peerid]
-
- all_tw_vectors[peerid][shnum] = (testvs, writev, None)
-
- # we read the checkstring back from each share, however we only use
- # it to detect whether there was a new share that we didn't know
- # about. The success or failure of the write will tell us whether
- # there was a collision or not. If there is a collision, the first
- # thing we'll do is update the servermap, which will find out what
- # happened. We could conceivably reduce a roundtrip by using the
- # readv checkstring to populate the servermap, but really we'd have
- # to read enough data to validate the signatures too, so it wouldn't
- # be an overall win.
- read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
-
- # ok, send the messages!
- self.log("sending %d shares" % len(all_tw_vectors), level=log.NOISY)
- started = time.time()
- for (peerid, tw_vectors) in all_tw_vectors.items():
-
- write_enabler = self._node.get_write_enabler(peerid)
- renew_secret = self._node.get_renewal_secret(peerid)
- cancel_secret = self._node.get_cancel_secret(peerid)
- secrets = (write_enabler, renew_secret, cancel_secret)
- shnums = tw_vectors.keys()
-
- for shnum in shnums:
- self.outstanding.add( (peerid, shnum) )
+ elapsed = now - started
- d = self._do_testreadwrite(peerid, secrets,
- tw_vectors, read_vector)
- d.addCallbacks(self._got_write_answer, self._got_write_error,
- callbackArgs=(peerid, shnums, started),
- errbackArgs=(peerid, shnums, started))
- # tolerate immediate errback, like with DeadReferenceError
- d.addBoth(fireEventually)
- d.addCallback(self.loop)
- d.addErrback(self._fatal_error)
+ self._status.add_per_server_time(peerid, elapsed)
- self._update_status()
- self.log("%d shares sent" % len(all_tw_vectors), level=log.NOISY)
-
- def _do_testreadwrite(self, peerid, secrets,
- tw_vectors, read_vector):
- storage_index = self._storage_index
- ss = self.connections[peerid]
-
- #print "SS[%s] is %s" % (idlib.shortnodeid_b2a(peerid), ss), ss.tracker.interfaceName
- d = ss.callRemote("slot_testv_and_readv_and_writev",
- storage_index,
- secrets,
- tw_vectors,
- read_vector)
- return d
+ wrote, read_data = answer
- def _got_write_answer(self, answer, peerid, shnums, started):
- lp = self.log("_got_write_answer from %s" %
- idlib.shortnodeid_b2a(peerid))
- for shnum in shnums:
- self.outstanding.discard( (peerid, shnum) )
+ surprise_shares = set(read_data.keys()) - set([writer.shnum])
- now = time.time()
- elapsed = now - started
- self._status.add_per_server_time(peerid, elapsed)
+ # We need to remove from surprise_shares any shares that we are
+ # knowingly also writing to that peer from other writers.
- wrote, read_data = answer
+ # TODO: Precompute this.
+ known_shnums = [x.shnum for x in self.writers.values()
+ if x.peerid == peerid]
+ surprise_shares -= set(known_shnums)
+ self.log("found the following surprise shares: %s" %
+ str(surprise_shares))
- surprise_shares = set(read_data.keys()) - set(shnums)
+ # Now surprise shares contains all of the shares that we did not
+ # expect to be there.
surprised = False
for shnum in surprise_shares:
# read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
checkstring = read_data[shnum][0]
- their_version_info = unpack_checkstring(checkstring)
- if their_version_info == self._new_version_info:
+ # What we want to do here is to see if their (seqnum,
+ # roothash, salt) is the same as our (seqnum, roothash,
+ # salt), or the equivalent for MDMF. The best way to do this
+ # is to store a packed representation of our checkstring
+ # somewhere, then not bother unpacking the other
+ # checkstring.
+ if checkstring == self._checkstring:
# they have the right share, somehow
if (peerid,shnum) in self.goal:
self.log("our testv failed, so the write did not happen",
parent=lp, level=log.WEIRD, umid="8sc26g")
self.surprised = True
- self.bad_peers.add(peerid) # don't ask them again
+ self.bad_peers.add(writer) # don't ask them again
# use the checkstring to add information to the log message
for (shnum,readv) in read_data.items():
checkstring = readv[0]
# if expected_version==None, then we didn't expect to see a
# share on that peer, and the 'surprise_shares' clause above
# will have logged it.
- # self.loop() will take care of finding new homes
return
- for shnum in shnums:
- self.placed.add( (peerid, shnum) )
- # and update the servermap
- self._servermap.add_new_share(peerid, shnum,
+ # and update the servermap
+ # self.versioninfo is set during the last phase of publishing.
+ # If we get there, we know that responses correspond to placed
+ # shares, and can safely execute these statements.
+ if self.versioninfo:
+ self.log("wrote successfully: adding new share to servermap")
+ self._servermap.add_new_share(peerid, writer.shnum,
self.versioninfo, started)
-
- # self.loop() will take care of checking to see if we're done
- return
-
- def _got_write_error(self, f, peerid, shnums, started):
- for shnum in shnums:
- self.outstanding.discard( (peerid, shnum) )
- self.bad_peers.add(peerid)
- if self._first_write_error is None:
- self._first_write_error = f
- self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s",
- shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid),
- failure=f,
- level=log.UNUSUAL)
- # self.loop() will take care of checking to see if we're done
+ self.placed.add( (peerid, writer.shnum) )
+ self._update_status()
+ # the next method in the deferred chain will check to see if
+ # we're done and successful.
return
- def _done(self, res):
+ def _done(self):
if not self._running:
return
self._running = False
now = time.time()
self._status.timings["total"] = now - self._started
+
+ elapsed = now - self._started_pushing
+ self._status.timings['push'] = elapsed
+
self._status.set_active(False)
- if isinstance(res, failure.Failure):
- self.log("Publish done, with failure", failure=res,
- level=log.WEIRD, umid="nRsR9Q")
- self._status.set_status("Failed")
- elif self.surprised:
- self.log("Publish done, UncoordinatedWriteError", level=log.UNUSUAL)
- self._status.set_status("UncoordinatedWriteError")
- # deliver a failure
- res = failure.Failure(UncoordinatedWriteError())
- # TODO: recovery
+ self.log("Publish done, success")
+ self._status.set_status("Finished")
+ self._status.set_progress(1.0)
+ # Get k and segsize, then give them to the caller.
+ hints = {}
+ hints['segsize'] = self.segment_size
+ hints['k'] = self.required_shares
+ self._node.set_downloader_hints(hints)
+ eventually(self.done_deferred.callback, None)
+
+ def _failure(self, f=None):
+ if f:
+ self._last_failure = f
+
+ if not self.surprised:
+ # We ran out of servers
+ msg = "Publish ran out of good servers"
+ if self._last_failure:
+ msg += ", last failure was: %s" % str(self._last_failure)
+ self.log(msg)
+ e = NotEnoughServersError(msg)
+
else:
- self.log("Publish done, success")
- self._status.set_status("Finished")
- self._status.set_progress(1.0)
- eventually(self.done_deferred.callback, res)
+ # We ran into shares that we didn't recognize, which means
+ # that we need to return an UncoordinatedWriteError.
+ self.log("Publish failed with UncoordinatedWriteError")
+ e = UncoordinatedWriteError()
+ f = failure.Failure(e)
+ eventually(self.done_deferred.callback, f)
+
+
+class MutableFileHandle:
+ """
+ I am a mutable uploadable built around a filehandle-like object,
+ usually either a StringIO instance or a handle to an actual file.
+ """
+ implements(IMutableUploadable)
+
+ def __init__(self, filehandle):
+ # The filehandle is defined as a generally file-like object that
+ # has these two methods. We don't care beyond that.
+ assert hasattr(filehandle, "read")
+ assert hasattr(filehandle, "close")
+
+ self._filehandle = filehandle
+ # We must start reading at the beginning of the file, or we risk
+ # encountering errors when the data read does not match the size
+ # reported to the uploader.
+ self._filehandle.seek(0)
+
+ # We have not yet read anything, so our position is 0.
+ self._marker = 0
+
+
+ def get_size(self):
+ """
+ I return the amount of data in my filehandle.
+ """
+ if not hasattr(self, "_size"):
+ old_position = self._filehandle.tell()
+ # Seek to the end of the file by seeking 0 bytes from the
+ # file's end
+ self._filehandle.seek(0, 2) # 2 == os.SEEK_END in 2.5+
+ self._size = self._filehandle.tell()
+ # Restore the previous position, in case this was called
+ # after a read.
+ self._filehandle.seek(old_position)
+ assert self._filehandle.tell() == old_position
+
+ assert hasattr(self, "_size")
+ return self._size
+
+
+ def pos(self):
+ """
+ I return the position of my read marker -- i.e., how much data I
+ have already read and returned to callers.
+ """
+ return self._marker
+ def read(self, length):
+ """
+ I return some data (up to length bytes) from my filehandle.
+
+ In most cases, I return length bytes, but sometimes I won't --
+ for example, if I am asked to read beyond the end of a file, or
+ an error occurs.
+ """
+ results = self._filehandle.read(length)
+ self._marker += len(results)
+ return [results]
+
+
+ def close(self):
+ """
+ I close the underlying filehandle. Any further operations on the
+ filehandle fail at this point.
+ """
+ self._filehandle.close()
+
+
+class MutableData(MutableFileHandle):
+ """
+ I am a mutable uploadable built around a string, which I then cast
+ into a StringIO and treat as a filehandle.
+ """
+
+ def __init__(self, s):
+ # Take a string and return a file-like uploadable.
+ assert isinstance(s, str)
+
+ MutableFileHandle.__init__(self, StringIO(s))
+
+
+class TransformingUploadable:
+ """
+ I am an IMutableUploadable that wraps another IMutableUploadable,
+ and some segments that are already on the grid. When I am called to
+ read, I handle merging of boundary segments.
+ """
+ implements(IMutableUploadable)
+
+
+ def __init__(self, data, offset, segment_size, start, end):
+ assert IMutableUploadable.providedBy(data)
+
+ self._newdata = data
+ self._offset = offset
+ self._segment_size = segment_size
+ self._start = start
+ self._end = end
+
+ self._read_marker = 0
+
+ self._first_segment_offset = offset % segment_size
+
+ num = self.log("TransformingUploadable: starting", parent=None)
+ self._log_number = num
+ self.log("got fso: %d" % self._first_segment_offset)
+ self.log("got offset: %d" % self._offset)
+
+
+ def log(self, *args, **kwargs):
+ if 'parent' not in kwargs:
+ kwargs['parent'] = self._log_number
+ if "facility" not in kwargs:
+ kwargs["facility"] = "tahoe.mutable.transforminguploadable"
+ return log.msg(*args, **kwargs)
+
+
+ def get_size(self):
+ return self._offset + self._newdata.get_size()
+
+
+ def read(self, length):
+ # We can get data from 3 sources here.
+ # 1. The first of the segments provided to us.
+ # 2. The data that we're replacing things with.
+ # 3. The last of the segments provided to us.
+
+ # are we in state 0?
+ self.log("reading %d bytes" % length)
+
+ old_start_data = ""
+ old_data_length = self._first_segment_offset - self._read_marker
+ if old_data_length > 0:
+ if old_data_length > length:
+ old_data_length = length
+ self.log("returning %d bytes of old start data" % old_data_length)
+
+ old_data_end = old_data_length + self._read_marker
+ old_start_data = self._start[self._read_marker:old_data_end]
+ length -= old_data_length
+ else:
+ # otherwise calculations later get screwed up.
+ old_data_length = 0
+
+ # Is there enough new data to satisfy this read? If not, we need
+ # to pad the end of the data with data from our last segment.
+ old_end_length = length - \
+ (self._newdata.get_size() - self._newdata.pos())
+ old_end_data = ""
+ if old_end_length > 0:
+ self.log("reading %d bytes of old end data" % old_end_length)
+
+ # TODO: We're not explicitly checking for tail segment size
+ # here. Is that a problem?
+ old_data_offset = (length - old_end_length + \
+ old_data_length) % self._segment_size
+ self.log("reading at offset %d" % old_data_offset)
+ old_end = old_data_offset + old_end_length
+ old_end_data = self._end[old_data_offset:old_end]
+ length -= old_end_length
+ assert length == self._newdata.get_size() - self._newdata.pos()
+
+ self.log("reading %d bytes of new data" % length)
+ new_data = self._newdata.read(length)
+ new_data = "".join(new_data)
+
+ self._read_marker += len(old_start_data + new_data + old_end_data)
+
+ return old_start_data + new_data + old_end_data
+
+ def close(self):
+ pass