# datalength, k, N, signed_prefix, offsets) tuple
self.servermap = DictOfSets()
self.connections = {} # maps peerid to a RemoteReference
+ self.unreachable_peers = set() # peerids that didn't respond to queries
self.problems = [] # mostly for debugging
+ self.last_update_mode = None
+ self.last_update_time = 0
def make_versionmap(self):
"""Return a dict that maps versionid to sets of (shnum, peerid,
self.num_peers_to_query = N + self.EPSILON
initial_peers_to_query, must_query = self._build_initial_querylist()
self.required_num_empty_peers = self.EPSILON
+
+ # TODO: also populate self._filenode._privkey
+
+ # TODO: arrange to read 3KB from one peer who is likely to hold a
+ # share, so we can avoid the latency of that extra roundtrip. 3KB
+ # would get us the encprivkey from a dirnode with up to 7
+ # entries, allowing us to make an update in 2 RTT instead of 3.
else:
initial_peers_to_query, must_query = self._build_initial_querylist()
verifier = rsa.create_verifying_key_from_string(pubkey_s)
return verifier
+ def _try_to_extract_privkey(self, data, peerid, shnum):
+ try:
+ r = unpack_share(data)
+ except NeedMoreDataError, e:
+ # this share won't help us. oh well.
+ offset = e.encprivkey_offset
+ length = e.encprivkey_length
+ self.log("shnum %d on peerid %s: share was too short (%dB) "
+ "to get the encprivkey; [%d:%d] ought to hold it" %
+ (shnum, idlib.shortnodeid_b2a(peerid), len(data),
+ offset, offset+length))
+ # NOTE: if uncoordinated writes are taking place, someone might
+ # change the share (and most probably move the encprivkey) before
+ # we get a chance to do one of these reads and fetch it. This
+ # will cause us to see a NotEnoughPeersError(unable to fetch
+ # privkey) instead of an UncoordinatedWriteError . This is a
+ # nuisance, but it will go away when we move to DSA-based mutable
+ # files (since the privkey will be small enough to fit in the
+ # write cap).
+
+ self._encprivkey_shares.append( (peerid, shnum, offset, length))
+ return
+
+ (seqnum, root_hash, IV, k, N, segsize, datalen,
+ pubkey, signature, share_hash_chain, block_hash_tree,
+ share_data, enc_privkey) = r
+
+ return self._try_to_validate_privkey(enc_privkey, peerid, shnum)
+
+ def _try_to_validate_privkey(self, enc_privkey, peerid, shnum):
+ alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
+ alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
+ if alleged_writekey != self._writekey:
+ self.log("invalid privkey from %s shnum %d" %
+ (idlib.nodeid_b2a(peerid)[:8], shnum), level=log.WEIRD)
+ return
+
+ # it's good
+ self.log("got valid privkey from shnum %d on peerid %s" %
+ (shnum, idlib.shortnodeid_b2a(peerid)))
+ self._privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
+ self._encprivkey = enc_privkey
+ self._node._populate_encprivkey(self._encprivkey)
+ self._node._populate_privkey(self._privkey)
+
def _got_results(self, datavs, peerid, readsize, stuff, started):
self.log(format="got result from [%(peerid)s], %(numshares)d shares",
peerid=idlib.shortnodeid_b2a(peerid),
self._queries_outstanding.discard(peerid)
self._bad_peers.add(peerid)
self._servermap.problems.append(f)
+ self._servermap.unreachable_peers.add(peerid) # TODO: overkill?
self._queries_completed += 1
self._last_failure = f
if not self._running:
return
self._running = False
+ self._servermap.last_update_mode = self._mode
+ self._servermap.last_update_time = self._started
# the servermap will not be touched after this
eventually(self._done_deferred.callback, self._servermap)
self.active = value
class Publish:
- """I represent a single act of publishing the mutable file to the grid."""
+ """I represent a single act of publishing the mutable file to the grid. I
+ will only publish my data if the servermap I am using still represents
+ the current state of the world.
- def __init__(self, filenode):
- self._node = filenode
- self._storage_index = self._node.get_storage_index()
- self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
- num = self._node._client.log("Publish(%s): starting" % prefix)
- self._log_number = num
- self._status = PublishStatus()
- self._status.set_storage_index(self._storage_index)
- self._status.set_helper(False)
- self._status.set_progress(0.0)
- self._status.set_active(True)
- self._started = time.time()
+ To make the initial publish, set servermap to None.
+ """
- def new__init__(self, filenode, servermap):
+ def __init__(self, filenode, servermap):
self._node = filenode
- self._servermap =servermap
+ self._servermap = servermap
self._storage_index = self._node.get_storage_index()
self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
num = self._node._client.log("Publish(%s): starting" % prefix)
# 5: when enough responses are back, we're done
self.log("starting publish, datalen is %s" % len(newdata))
- self._status.set_size(len(newdata))
self._writekey = self._node.get_writekey()
assert self._writekey, "need write capability to publish"
- old_roothash = self._node._current_roothash
- old_seqnum = self._node._current_seqnum
- assert old_seqnum is not None, "must read before replace"
- self._new_seqnum = old_seqnum + 1
-
- # read-before-replace also guarantees these fields are available
- readkey = self._node.get_readkey()
- required_shares = self._node.get_required_shares()
- total_shares = self._node.get_total_shares()
+ # first, which servers will we publish to? We require that the
+ # servermap was updated in MODE_WRITE, so we can depend upon the
+ # peerlist computed by that process instead of computing our own.
+ if self._servermap:
+ assert self._servermap.last_update_mode == MODE_WRITE
+ # we will push a version that is one larger than anything present
+ # in the grid, according to the servermap.
+ self._new_seqnum = self._servermap.highest_seqnum() + 1
+ else:
+ # If we don't have a servermap, that's because we're doing the
+ # initial publish
+ self._new_seqnum = 1
+ self._servermap = ServerMap()
+
+ # having an up-to-date servermap (or using a filenode that was just
+ # created for the first time) also guarantees that the following
+ # fields are available
+ self.readkey = self._node.get_readkey()
+ self.required_shares = self._node.get_required_shares()
+ assert self.required_shares is not None
+ self.total_shares = self._node.get_total_shares()
+ assert self.total_shares is not None
self._pubkey = self._node.get_pubkey()
- self._status.set_encoding(required_shares, total_shares)
-
- # these two may not be, we might have to get them from the first peer
+ assert self._pubkey
self._privkey = self._node.get_privkey()
+ assert self._privkey
self._encprivkey = self._node.get_encprivkey()
- IV = os.urandom(16)
-
- # we read only 1KB because all we generally care about is the seqnum
- # ("prefix") info, so we know which shares are where. We need to get
- # the privkey from somebody, which means reading more like 3KB, but
- # the code in _obtain_privkey will ensure that we manage that even if
- # we need an extra roundtrip. TODO: arrange to read 3KB from one peer
- # who is likely to hold a share, so we can avoid the latency of that
- # extra roundtrip. 3KB would get us the encprivkey from a dirnode
- # with up to 7 entries, allowing us to make an update in 2 RTT
- # instead of 3.
- self._read_size = 1000
- self._status.initial_read_size = self._read_size
-
- d = defer.succeed(total_shares)
- d.addCallback(self._query_peers)
- d.addCallback(self._query_peers_done)
- d.addCallback(self._obtain_privkey)
- d.addCallback(self._obtain_privkey_done)
-
- d.addCallback(self._encrypt_and_encode, newdata, readkey, IV,
- required_shares, total_shares)
- d.addCallback(self._generate_shares, self._new_seqnum, IV)
-
- d.addCallback(self._send_shares, IV)
- d.addCallback(self._maybe_recover)
- d.addCallback(self._done)
- return d
-
- def _query_peers(self, total_shares):
- self.log("_query_peers")
- self._query_peers_started = now = time.time()
- elapsed = now - self._started
- self._status.timings["setup"] = elapsed
-
- storage_index = self._storage_index
-
- # In 0.7.0, we went through extra work to make sure that we include
- # ourselves in the peerlist, mainly to match Retrieve (which did the
- # same thing. With the post-0.7.0 Introducer refactoring, we got rid
- # of the include-myself flags, and standardized on the
- # uploading/downloading node not being special.
-
- # One nice feature of the old approach was that by putting a share on
- # the local storage server, we're more likely to be able to retrieve
- # a copy of the encrypted private key (even if all the old servers
- # have gone away), so we can regenerate new shares even if we can't
- # retrieve the old contents. This need will eventually go away when
- # we switch to DSA-based mutable files (which store the private key
- # in the URI).
-
- peerlist = self._node._client.get_permuted_peers("storage",
- storage_index)
-
- current_share_peers = DictOfSets()
- reachable_peers = {}
- # list of (peerid, shnum, offset, length) where the encprivkey might
- # be found
- self._encprivkey_shares = []
-
- EPSILON = total_shares / 2
- #partial_peerlist = islice(peerlist, total_shares + EPSILON)
- partial_peerlist = peerlist[:total_shares+EPSILON]
- self._status.peers_queried = len(partial_peerlist)
-
- self._storage_servers = {}
+ client = self._node._client
+ full_peerlist = client.get_permuted_peers("storage",
+ self._storage_index)
+ self.full_peerlist = full_peerlist # for use later, immutable
+ self.bad_peers = set() # peerids who have errbacked/refused requests
started = time.time()
dl = []
peerid, permutedid,
reachable_peers, current_share_peers, started)
dl.append(d)
- d = defer.DeferredList(dl, fireOnOneErrback=True)
+ d = defer.DeferredList(dl)
d.addCallback(self._got_all_query_results,
total_shares, reachable_peers,
current_share_peers)
# TODO: add an errback too, probably to ignore that peer
- # TODO: if we can't get a privkey from these servers, consider
- # looking farther afield. Be aware of the old 0.7.0 behavior that
- # causes us to create our initial directory before we've connected to
- # anyone but ourselves.. those old directories may not be
- # retrieveable if our own server is no longer in the early part of
- # the permuted peerlist.
- return d
+ # we limit the segment size as usual to constrain our memory
+ # footprint. The max segsize is higher for mutable files, because we
+ # want to support dirnodes with up to 10k children, and each child
+ # uses about 330 bytes. If you actually put that much into a
+ # directory you'll be using a footprint of around 14MB, which is
+ # higher than we'd like, but it is more important right now to
+ # support large directories than to make memory usage small when you
+ # use them. Once we implement MDMF (with multiple segments), we will
+ # drop this back down, probably to 128KiB.
+ self.MAX_SEGMENT_SIZE = 3500000
- def _do_read(self, ss, peerid, storage_index, shnums, readv):
- # isolate the callRemote to a separate method, so tests can subclass
- # Publish and override it
- d = ss.callRemote("slot_readv", storage_index, shnums, readv)
- return d
+ segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata))
+ # this must be a multiple of self.required_shares
+ segment_size = mathutil.next_multiple(segment_size,
+ self.required_shares)
+ self.segment_size = segment_size
+ if segment_size:
+ self.num_segments = mathutil.div_ceil(len(self.newdata),
+ segment_size)
+ else:
+ self.num_segments = 0
+ assert self.num_segments in [0, 1,] # SDMF restrictions
- def _do_query(self, ss, peerid, storage_index):
- self.log("querying %s" % idlib.shortnodeid_b2a(peerid))
- d = self._do_read(ss, peerid, storage_index, [], [(0, self._read_size)])
- return d
+ self.surprised = False
- def _got_query_results(self, datavs, peerid, permutedid,
- reachable_peers, current_share_peers, started):
+ # we keep track of three tables. The first is our goal: which share
+ # we want to see on which servers. This is initially populated by the
+ # existing servermap.
+ self.goal = set() # pairs of (peerid, shnum) tuples
- lp = self.log(format="_got_query_results from %(peerid)s",
- peerid=idlib.shortnodeid_b2a(peerid))
- elapsed = time.time() - started
- self._status.add_per_server_time(peerid, "read", elapsed)
+ # the second table is our list of outstanding queries: those which
+ # are in flight and may or may not be delivered, accepted, or
+ # acknowledged. Items are added to this table when the request is
+ # sent, and removed when the response returns (or errbacks).
+ self.outstanding = set() # (peerid, shnum) tuples
- assert isinstance(datavs, dict)
- reachable_peers[peerid] = permutedid
- if not datavs:
- self.log("peer has no shares", parent=lp)
- for shnum, datav in datavs.items():
- lp2 = self.log("peer has shnum %d" % shnum, parent=lp)
- assert len(datav) == 1
- data = datav[0]
- # We want (seqnum, root_hash, IV) from all servers to know what
- # versions we are replacing. We want the encprivkey from one
- # server (assuming it's valid) so we know our own private key, so
- # we can sign our update. SMDF: read the whole share from each
- # server. TODO: later we can optimize this to transfer less data.
-
- # we assume that we have enough data to extract the signature.
- # TODO: if this raises NeedMoreDataError, arrange to do another
- # read pass.
- r = unpack_prefix_and_signature(data)
- (seqnum, root_hash, IV, k, N, segsize, datalen,
- pubkey_s, signature, prefix) = r
-
- # self._pubkey is present because we require read-before-replace
- valid = self._pubkey.verify(prefix, signature)
- if not valid:
- self.log(format="bad signature from %(peerid)s shnum %(shnum)d",
- peerid=idlib.shortnodeid_b2a(peerid), shnum=shnum,
- parent=lp2, level=log.WEIRD)
- continue
- self.log(format="peer has goodsig shnum %(shnum)d seqnum %(seqnum)d",
- shnum=shnum, seqnum=seqnum,
- parent=lp2, level=log.NOISY)
+ # the third is a table of successes: share which have actually been
+ # placed. These are populated when responses come back with success.
+ # When self.placed == self.goal, we're done.
+ self.placed = set() # (peerid, shnum) tuples
- share = (shnum, seqnum, root_hash)
- current_share_peers.add(shnum, (peerid, seqnum, root_hash) )
+ # we use the servermap to populate the initial goal: this way we will
+ # try to update each existing share in place.
+ for (peerid, shares) in self._servermap.servermap.items():
+ for (shnum, versionid, timestamp) in shares:
+ self.goal.add( (peerid, shnum) )
- if not self._privkey:
- self._try_to_extract_privkey(data, peerid, shnum)
+ # create the shares. We'll discard these as they are delivered. SMDF:
+ # we're allowed to hold everything in memory.
+ d = self._encrypt_and_encode()
+ d.addCallback(self._generate_shares)
+ d.addCallback(self.loop) # trigger delivery
- def _try_to_extract_privkey(self, data, peerid, shnum):
- try:
- r = unpack_share(data)
- except NeedMoreDataError, e:
- # this share won't help us. oh well.
- offset = e.encprivkey_offset
- length = e.encprivkey_length
- self.log("shnum %d on peerid %s: share was too short (%dB) "
- "to get the encprivkey; [%d:%d] ought to hold it" %
- (shnum, idlib.shortnodeid_b2a(peerid), len(data),
- offset, offset+length))
- # NOTE: if uncoordinated writes are taking place, someone might
- # change the share (and most probably move the encprivkey) before
- # we get a chance to do one of these reads and fetch it. This
- # will cause us to see a NotEnoughPeersError(unable to fetch
- # privkey) instead of an UncoordinatedWriteError . This is a
- # nuisance, but it will go away when we move to DSA-based mutable
- # files (since the privkey will be small enough to fit in the
- # write cap).
+ return self.done_deferred
- self._encprivkey_shares.append( (peerid, shnum, offset, length))
+ def loop(self):
+ self.update_goal()
+ # how far are we from our goal?
+ needed = self.goal - self.placed - self.outstanding
+
+ if needed:
+ # we need to send out new shares
+ d = self.send_shares(needed)
+ d.addCallback(self.loop)
+ d.addErrback(self._fatal_error)
return
- (seqnum, root_hash, IV, k, N, segsize, datalen,
- pubkey, signature, share_hash_chain, block_hash_tree,
- share_data, enc_privkey) = r
+ if self.outstanding:
+ # queries are still pending, keep waiting
+ return
- return self._try_to_validate_privkey(enc_privkey, peerid, shnum)
+ # no queries outstanding, no placements needed: we're done
+ return self._done()
- def _try_to_validate_privkey(self, enc_privkey, peerid, shnum):
- alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
- alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
- if alleged_writekey != self._writekey:
- self.log("invalid privkey from %s shnum %d" %
- (idlib.nodeid_b2a(peerid)[:8], shnum), level=log.WEIRD)
- return
+ def log_goal(self, goal):
+ logmsg = []
+ for (peerid, shnum) in goal:
+ logmsg.append("sh%d to [%s]" % (shnum,
+ idlib.shortnodeid_b2a(peerid)))
+ self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
+ self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
+ level=log.NOISY)
- # it's good
- self.log("got valid privkey from shnum %d on peerid %s" %
- (shnum, idlib.shortnodeid_b2a(peerid)))
- self._privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
- self._encprivkey = enc_privkey
- self._node._populate_encprivkey(self._encprivkey)
- self._node._populate_privkey(self._privkey)
+ def update_goal(self):
+ # first, remove any bad peers from our goal
+ self.goal = set([ (peerid, shnum)
+ for (peerid, shnum) in self.goal
+ if peerid not in self.bad_peers ])
- def _got_all_query_results(self, res,
- total_shares, reachable_peers,
- current_share_peers):
- self.log("_got_all_query_results")
+ # find the homeless shares:
+ homefull_shares = set([shnum for (peerid, shnum) in self.goal])
+ homeless_shares = set(range(self.total_shares)) - homefull_shares
+ homeless_shares = sorted(list(homeless_shares))
+ # place them somewhere. We prefer unused servers at the beginning of
+ # the available peer list.
+
+ if not homeless_shares:
+ return
- # now that we know everything about the shares currently out there,
- # decide where to place the new shares.
+ # if log.recording_noisy
+ if False:
+ self.log_goal(self.goal)
# if an old share X is on a node, put the new share X there too.
# TODO: 1: redistribute shares to achieve one-per-peer, by copying
# TODO: 2: move those shares instead of copying them, to reduce future
# update work
- # if log.recording_noisy
- logmsg = []
- for shnum in range(total_shares):
- logmsg2 = []
- for oldplace in current_share_peers.get(shnum, []):
- (peerid, seqnum, R) = oldplace
- logmsg2.append("%s:#%d:R=%s" % (idlib.shortnodeid_b2a(peerid),
- seqnum, base32.b2a(R)[:4]))
- logmsg.append("sh%d on (%s)" % (shnum, "/".join(logmsg2)))
- self.log("sharemap: %s" % (", ".join(logmsg)), level=log.NOISY)
- self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
- level=log.NOISY)
-
- shares_needing_homes = range(total_shares)
- target_map = DictOfSets() # maps shnum to set((peerid,oldseqnum,oldR))
- shares_per_peer = DictOfSets()
- for shnum in range(total_shares):
- for oldplace in current_share_peers.get(shnum, []):
- (peerid, seqnum, R) = oldplace
- if seqnum >= self._new_seqnum:
- self.log("the sequence number has changed unexpectedly",
- level=log.WEIRD)
- self.log(format="peerid=%(peerid)s, theirs=%(seqnum)d, mine=%(new_seqnum)d",
- peerid=idlib.shortnodeid_b2a(peerid),
- seqnum=seqnum,
- new_seqnum=self._new_seqnum)
- raise UncoordinatedWriteError("the sequence number has changed unexpectedly")
- target_map.add(shnum, oldplace)
- shares_per_peer.add(peerid, shnum)
- if shnum in shares_needing_homes:
- shares_needing_homes.remove(shnum)
-
- # now choose homes for the remaining shares. We prefer peers with the
- # fewest target shares, then peers with the lowest permuted index. If
- # there are no shares already in place, this will assign them
- # one-per-peer in the normal permuted order.
- while shares_needing_homes:
- if not reachable_peers:
- prefix = storage.si_b2a(self._node.get_storage_index())[:5]
- raise NotEnoughPeersError("ran out of peers during upload of (%s); shares_needing_homes: %s, reachable_peers: %s" % (prefix, shares_needing_homes, reachable_peers,))
- shnum = shares_needing_homes.pop(0)
- possible_homes = reachable_peers.keys()
- possible_homes.sort(lambda a,b:
- cmp( (len(shares_per_peer.get(a, [])),
- reachable_peers[a]),
- (len(shares_per_peer.get(b, [])),
- reachable_peers[b]) ))
- target_peerid = possible_homes[0]
- target_map.add(shnum, (target_peerid, None, None) )
- shares_per_peer.add(target_peerid, shnum)
-
- assert not shares_needing_homes
-
- target_info = (target_map, shares_per_peer)
- return target_info
-
- def _query_peers_done(self, target_info):
- self._obtain_privkey_started = now = time.time()
- elapsed = time.time() - self._query_peers_started
- self._status.timings["query"] = elapsed
- return target_info
-
- def _obtain_privkey(self, target_info):
- # make sure we've got a copy of our private key.
- if self._privkey:
- # Must have picked it up during _query_peers. We're good to go.
- if "privkey_fetch" not in self._status.timings:
- self._status.timings["privkey_fetch"] = 0.0
- return target_info
-
- # Nope, we haven't managed to grab a copy, and we still need it. Ask
- # peers one at a time until we get a copy. Only bother asking peers
- # who've admitted to holding a share.
-
- self._privkey_fetch_started = time.time()
- target_map, shares_per_peer = target_info
- # pull shares from self._encprivkey_shares
- if not self._encprivkey_shares:
- raise NotEnoughPeersError("Unable to find a copy of the privkey")
-
- (peerid, shnum, offset, length) = self._encprivkey_shares.pop(0)
- ss = self._storage_servers[peerid]
- self.log("trying to obtain privkey from %s shnum %d" %
- (idlib.shortnodeid_b2a(peerid), shnum))
- d = self._do_privkey_query(ss, peerid, shnum, offset, length)
- d.addErrback(self.log_err)
- d.addCallback(lambda res: self._obtain_privkey(target_info))
- return d
-
- def _do_privkey_query(self, rref, peerid, shnum, offset, length):
- started = time.time()
- d = self._do_read(rref, peerid, self._storage_index,
- [shnum], [(offset, length)])
- d.addCallback(self._privkey_query_response, peerid, shnum, started)
- return d
-
- def _privkey_query_response(self, datav, peerid, shnum, started):
- elapsed = time.time() - started
- self._status.add_per_server_time(peerid, "read", elapsed)
-
- data = datav[shnum][0]
- self._try_to_validate_privkey(data, peerid, shnum)
-
- elapsed = time.time() - self._privkey_fetch_started
- self._status.timings["privkey_fetch"] = elapsed
- self._status.privkey_from = peerid
-
- def _obtain_privkey_done(self, target_info):
- elapsed = time.time() - self._obtain_privkey_started
- self._status.timings["privkey"] = elapsed
- return target_info
-
- def _encrypt_and_encode(self, target_info,
- newdata, readkey, IV,
- required_shares, total_shares):
+ # this is CPU intensive but easy to analyze. We create a sort order
+ # for each peerid. If the peerid is marked as bad, we don't even put
+ # them in the list. Then we care about the number of shares which
+ # have already been assigned to them. After that we care about their
+ # permutation order.
+ old_assignments = DictOfSets()
+ for (peerid, shnum) in self.goal:
+ old_assignments.add(peerid, shnum)
+
+ peerlist = []
+ for i, (peerid, ss) in enumerate(self.full_peerlist):
+ entry = (len(old_assignments[peerid]), i, peerid, ss)
+ peerlist.add(entry)
+ peerlist.sort()
+
+ new_assignments = []
+ # we then index this peerlist with an integer, because we may have to
+ # wrap. We update the goal as we go.
+ i = 0
+ for shnum in homeless_shares:
+ (ignored1, ignored2, peerid, ss) = peerlist[i]
+ self.goal.add( (peerid, shnum) )
+ i += 1
+ if i >= len(peerlist):
+ i = 0
+
+
+
+ def _encrypt_and_encode(self):
+ # this returns a Deferred that fires with a list of (sharedata,
+ # sharenum) tuples. TODO: cache the ciphertext, only produce the
+ # shares that we care about.
self.log("_encrypt_and_encode")
- started = time.time()
+ #started = time.time()
- key = hashutil.ssk_readkey_data_hash(IV, readkey)
+ key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
enc = AES(key)
- crypttext = enc.process(newdata)
- assert len(crypttext) == len(newdata)
+ crypttext = enc.process(self.newdata)
+ assert len(crypttext) == len(self.newdata)
- now = time.time()
- self._status.timings["encrypt"] = now - started
- started = now
+ #now = time.time()
+ #self._status.timings["encrypt"] = now - started
+ #started = now
# now apply FEC
- # we limit the segment size as usual to constrain our memory
- # footprint. The max segsize is higher for mutable files, because we
- # want to support dirnodes with up to 10k children, and each child
- # uses about 330 bytes. If you actually put that much into a
- # directory you'll be using a footprint of around 14MB, which is
- # higher than we'd like, but it is more important right now to
- # support large directories than to make memory usage small when you
- # use them. Once we implement MDMF (with multiple segments), we will
- # drop this back down, probably to 128KiB.
- self.MAX_SEGMENT_SIZE = 3500000
- data_length = len(crypttext)
-
- segment_size = min(self.MAX_SEGMENT_SIZE, len(crypttext))
- # this must be a multiple of self.required_shares
- segment_size = mathutil.next_multiple(segment_size, required_shares)
- if segment_size:
- self.num_segments = mathutil.div_ceil(len(crypttext), segment_size)
- else:
- self.num_segments = 0
- assert self.num_segments in [0, 1,] # SDMF restrictions
fec = codec.CRSEncoder()
- fec.set_params(segment_size, required_shares, total_shares)
+ fec.set_params(self.segment_size,
+ self.required_shares, self.total_shares)
piece_size = fec.get_block_size()
- crypttext_pieces = [None] * required_shares
+ crypttext_pieces = [None] * self.required_shares
for i in range(len(crypttext_pieces)):
offset = i * piece_size
piece = crypttext[offset:offset+piece_size]
d = fec.encode(crypttext_pieces)
def _done_encoding(res):
- elapsed = time.time() - started
- self._status.timings["encode"] = elapsed
+ #elapsed = time.time() - started
+ #self._status.timings["encode"] = elapsed
return res
d.addCallback(_done_encoding)
- d.addCallback(lambda shares_and_shareids:
- (shares_and_shareids,
- required_shares, total_shares,
- segment_size, data_length,
- target_info) )
return d
- def _generate_shares(self, (shares_and_shareids,
- required_shares, total_shares,
- segment_size, data_length,
- target_info),
- seqnum, IV):
+ def _generate_shares(self, shares_and_shareids):
+ # this sets self.shares and self.root_hash
self.log("_generate_shares")
- started = time.time()
+ #started = time.time()
# we should know these by now
privkey = self._privkey
(shares, share_ids) = shares_and_shareids
assert len(shares) == len(share_ids)
- assert len(shares) == total_shares
+ assert len(shares) == self.total_shares
all_shares = {}
block_hash_trees = {}
share_hash_leaves = [None] * len(shares)
assert leaf is not None
share_hash_tree = hashtree.HashTree(share_hash_leaves)
share_hash_chain = {}
- for shnum in range(total_shares):
+ for shnum in range(self.total_shares):
needed_hashes = share_hash_tree.needed_hashes(shnum)
share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
for i in needed_hashes ] )
assert len(root_hash) == 32
self.log("my new root_hash is %s" % base32.b2a(root_hash))
- prefix = pack_prefix(seqnum, root_hash, IV,
- required_shares, total_shares,
- segment_size, data_length)
+ prefix = pack_prefix(self._new_seqnum, root_hash, self.salt,
+ self.required_shares, self.total_shares,
+ self.segment_size, len(self.newdata))
# now pack the beginning of the share. All shares are the same up
# to the signature, then they have divergent share hash chains,
- # then completely different block hash trees + IV + share data,
+ # then completely different block hash trees + salt + share data,
# then they all share the same encprivkey at the end. The sizes
# of everything are the same for all shares.
- sign_started = time.time()
+ #sign_started = time.time()
signature = privkey.sign(prefix)
- self._status.timings["sign"] = time.time() - sign_started
+ #self._status.timings["sign"] = time.time() - sign_started
verification_key = pubkey.serialize()
final_shares = {}
- for shnum in range(total_shares):
+ for shnum in range(self.total_shares):
final_share = pack_share(prefix,
verification_key,
signature,
all_shares[shnum],
encprivkey)
final_shares[shnum] = final_share
- elapsed = time.time() - started
- self._status.timings["pack"] = elapsed
- return (seqnum, root_hash, final_shares, target_info)
+ #elapsed = time.time() - started
+ #self._status.timings["pack"] = elapsed
+ self.shares = final_shares
+ self.root_hash = root_hash
- def _send_shares(self, (seqnum, root_hash, final_shares, target_info), IV):
+ def _send_shares(self, needed):
self.log("_send_shares")
- started = time.time()
+ #started = time.time()
# we're finally ready to send out our shares. If we encounter any
# surprises here, it's because somebody else is writing at the same
# surprises here are *not* indications of UncoordinatedWriteError,
# and we'll need to respond to them more gracefully.)
- target_map, shares_per_peer = target_info
-
- my_checkstring = pack_checkstring(seqnum, root_hash, IV)
- peer_messages = {}
- expected_old_shares = {}
-
- for shnum, peers in target_map.items():
- for (peerid, old_seqnum, old_root_hash) in peers:
- testv = [(0, len(my_checkstring), "le", my_checkstring)]
- new_share = final_shares[shnum]
- writev = [(0, new_share)]
- if peerid not in peer_messages:
- peer_messages[peerid] = {}
- peer_messages[peerid][shnum] = (testv, writev, None)
- if peerid not in expected_old_shares:
- expected_old_shares[peerid] = {}
- expected_old_shares[peerid][shnum] = (old_seqnum, old_root_hash)
-
- read_vector = [(0, len(my_checkstring))]
+ # needed is a set of (peerid, shnum) tuples. The first thing we do is
+ # organize it by peerid.
+
+ peermap = DictOfSets()
+ for (peerid, shnum) in needed:
+ peermap[peerid].add(shnum)
+
+ # the next thing is to build up a bunch of test vectors. The
+ # semantics of Publish are that we perform the operation if the world
+ # hasn't changed since the ServerMap was constructed (more or less).
+ # For every share we're trying to place, we create a test vector that
+ # tests to see if the server*share still corresponds to the
+ # map.
+
+ all_tw_vectors = {} # maps peerid to tw_vectors
+ sm = self._servermap.servermap
+
+ for (peerid, shnum) in needed:
+ testvs = []
+ for (old_shnum, old_versionid, old_timestamp) in sm[peerid]:
+ if old_shnum == shnum:
+ # an old version of that share already exists on the
+ # server, according to our servermap. We will create a
+ # request that attempts to replace it.
+ (old_seqnum, old_root_hash, old_salt, old_segsize,
+ old_datalength, old_k, old_N, old_prefix,
+ old_offsets_tuple) = old_versionid
+ old_checkstring = pack_checkstring(old_seqnum,
+ old_root_hash,
+ old_salt)
+ testv = [ (0, len(old_checkstring), "eq", old_checkstring) ]
+ testvs.append(testv)
+ break
+ if not testvs:
+ # add a testv that requires the share not exist
+ testv = [ (0, 1, 'eq', "") ]
+ testvs.append(testv)
+
+ # the write vector is simply the share
+ writev = [(0, self.shares[shnum])]
+
+ if peerid not in all_tw_vectors:
+ all_tw_vectors[peerid] = {}
+ # maps shnum to (testvs, writevs, new_length)
+ assert shnum not in all_tw_vectors[peerid]
+
+ all_tw_vectors[peerid][shnum] = (testvs, writev, None)
+
+ # we read the checkstring back from each share, however we only use
+ # it to detect whether there was a new share that we didn't know
+ # about. The success or failure of the write will tell us whether
+ # there was a collision or not. If there is a collision, the first
+ # thing we'll do is update the servermap, which will find out what
+ # happened. We could conceivably reduce a roundtrip by using the
+ # readv checkstring to populate the servermap, but really we'd have
+ # to read enough data to validate the signatures too, so it wouldn't
+ # be an overall win.
+ read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
- dl = []
# ok, send the messages!
- self._surprised = False
- dispatch_map = DictOfSets()
-
- for peerid, tw_vectors in peer_messages.items():
+ started = time.time()
+ dl = []
+ for (peerid, tw_vectors) in all_tw_vectors.items():
write_enabler = self._node.get_write_enabler(peerid)
renew_secret = self._node.get_renewal_secret(peerid)
cancel_secret = self._node.get_cancel_secret(peerid)
secrets = (write_enabler, renew_secret, cancel_secret)
+ shnums = tw_vectors.keys()
d = self._do_testreadwrite(peerid, secrets,
tw_vectors, read_vector)
- d.addCallback(self._got_write_answer, tw_vectors, my_checkstring,
- peerid, expected_old_shares[peerid], dispatch_map,
- started)
+ d.addCallbacks(self._got_write_answer, self._got_write_error,
+ callbackArgs=(peerid, shnums, started),
+ errbackArgs=(peerid, shnums, started))
+ d.addErrback(self.error, peerid)
dl.append(d)
- d = defer.DeferredList(dl, fireOnOneErrback=True)
+ d = defer.DeferredList(dl)
def _done_sending(res):
elapsed = time.time() - started
self._status.timings["push"] = elapsed
def _do_testreadwrite(self, peerid, secrets,
tw_vectors, read_vector):
- storage_index = self._node._uri.storage_index
+ storage_index = self._storage_index
ss = self._storage_servers[peerid]
d = ss.callRemote("slot_testv_and_readv_and_writev",
read_vector)
return d
- def _got_write_answer(self, answer, tw_vectors, my_checkstring,
- peerid, expected_old_shares,
- dispatch_map, started):
+ def _got_write_answer(self, answer, peerid, shnums, started):
lp = self.log("_got_write_answer from %s" %
idlib.shortnodeid_b2a(peerid))
- elapsed = time.time() - started
- self._status.add_per_server_time(peerid, "write", elapsed)
+ for shnum in shnums:
+ self.outstanding.discard( (peerid, shnum) )
wrote, read_data = answer
- surprised = False
- (new_seqnum,new_root_hash,new_IV) = unpack_checkstring(my_checkstring)
+ if not wrote:
+ # TODO: use the checkstring to add information to the log message
+ #self.log("somebody modified the share on us:"
+ # " shnum=%d: I thought they had #%d:R=%s,"
+ # " but testv reported #%d:R=%s" %
+ # (shnum,
+ # seqnum, base32.b2a(root_hash)[:4],
+ # old_seqnum, base32.b2a(old_root_hash)[:4]),
+ # parent=lp, level=log.WEIRD)
+ self.log("our testv failed, so the write did not happen",
+ parent=lp, level=log.WEIRD)
+ self.surprised = True
+ self.bad_peers.add(peerid) # don't ask them again
+ # self.loop() will take care of finding new homes
+ return
- if wrote:
- for shnum in tw_vectors:
- dispatch_map.add(shnum, (peerid, new_seqnum, new_root_hash))
- else:
- # surprise! our testv failed, so the write did not happen
- self.log("our testv failed, that write did not happen",
+ sm = self._servermap.servermap
+ for shnum in shnums:
+ self.placed.add( (peerid, shnum) )
+ # and update the servermap. We strip the old entry out..
+ newset = set([ t
+ for t in sm[peerid]
+ if t[0] != shnum ])
+ sm[peerid] = newset
+ # and add a new one
+ sm[peerid].add( (shnum, self.versioninfo, started) )
+
+ surprise_shares = set(read_data.keys()) - set(shnums)
+ if surprise_shares:
+ self.log("they had shares %s that we didn't know about" %
+ (list(surprise_shares),),
parent=lp, level=log.WEIRD)
- surprised = True
-
- for shnum, (old_cs,) in read_data.items():
- (old_seqnum, old_root_hash, IV) = unpack_checkstring(old_cs)
-
- if not wrote:
- dispatch_map.add(shnum, (peerid, old_seqnum, old_root_hash))
-
- if shnum not in expected_old_shares:
- # surprise! there was a share we didn't know about
- self.log("they had share %d that we didn't know about" % shnum,
- parent=lp, level=log.WEIRD)
- surprised = True
- else:
- seqnum, root_hash = expected_old_shares[shnum]
- if seqnum is not None:
- if seqnum != old_seqnum or root_hash != old_root_hash:
- # surprise! somebody modified the share on us
- self.log("somebody modified the share on us:"
- " shnum=%d: I thought they had #%d:R=%s,"
- " but testv reported #%d:R=%s" %
- (shnum,
- seqnum, base32.b2a(root_hash)[:4],
- old_seqnum, base32.b2a(old_root_hash)[:4]),
- parent=lp, level=log.WEIRD)
- surprised = True
- if surprised:
- self._surprised = True
+ self.surprised = True
+ return
+
+ # self.loop() will take care of checking to see if we're done
+ return
+
+ def _got_write_error(self, f, peerid, shnums, started):
+ for shnum in shnums:
+ self.outstanding.discard( (peerid, shnum) )
+ self.bad_peers.add(peerid)
+ self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s",
+ shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid),
+ level=log.UNUSUAL)
+ # self.loop() will take care of checking to see if we're done
+ return
+
+
def _log_dispatch_map(self, dispatch_map):
for shnum, places in dispatch_map.items():
return self._status
+ def _do_privkey_query(self, rref, peerid, shnum, offset, length):
+ started = time.time()
+ d = self._do_read(rref, peerid, self._storage_index,
+ [shnum], [(offset, length)])
+ d.addCallback(self._privkey_query_response, peerid, shnum, started)
+ return d
+
+ def _privkey_query_response(self, datav, peerid, shnum, started):
+ elapsed = time.time() - started
+ self._status.add_per_server_time(peerid, "read", elapsed)
+
+ data = datav[shnum][0]
+ self._try_to_validate_privkey(data, peerid, shnum)
+
+ elapsed = time.time() - self._privkey_fetch_started
+ self._status.timings["privkey_fetch"] = elapsed
+ self._status.privkey_from = peerid
+
+ def _obtain_privkey_done(self, target_info):
+ elapsed = time.time() - self._obtain_privkey_started
+ self._status.timings["privkey"] = elapsed
+ return target_info
+
+
# use client.create_mutable_file() to make one of these
class MutableFileNode:
assert self._pubkey, "update_servermap must be called before publish"
d = self.obtain_lock()
d.addCallback(lambda res: Publish(self, servermap).publish(newdata))
+ # p = self.publish_class(self)
+ # self._client.notify_publish(p)
d.addCallback(self.release_lock)
return d
+ def modify(self, modifier, *args, **kwargs):
+ """I use a modifier callback to apply a change to the mutable file.
+ I implement the following pseudocode::
+
+ obtain_mutable_filenode_lock()
+ while True:
+ update_servermap(MODE_WRITE)
+ old = retrieve_best_version()
+ new = modifier(old, *args, **kwargs)
+ if new == old: break
+ try:
+ publish(new)
+ except UncoordinatedWriteError:
+ continue
+ break
+ release_mutable_filenode_lock()
+
+ The idea is that your modifier function can apply a delta of some
+ sort, and it will be re-run as necessary until it succeeds. The
+ modifier must inspect the old version to see whether its delta has
+ already been applied: if so it should return the contents unmodified.
+ """
+ NotImplementedError
+
#################################
def check(self):