From: Brian Warner Date: Tue, 8 Apr 2008 02:01:28 +0000 (-0700) Subject: mutable.py: checkpointing #303 work part 2, Publish is sketched out X-Git-Tag: allmydata-tahoe-1.1.0~248 X-Git-Url: https://git.rkrishnan.org/module-simplejson-index.html?a=commitdiff_plain;h=b53bbaa43cd6c734891a695d1ffc9989d2dcc754;p=tahoe-lafs%2Ftahoe-lafs.git mutable.py: checkpointing #303 work part 2, Publish is sketched out --- diff --git a/src/allmydata/mutable.py b/src/allmydata/mutable.py index b5d002f9..5fa05816 100644 --- a/src/allmydata/mutable.py +++ b/src/allmydata/mutable.py @@ -273,7 +273,10 @@ class ServerMap: # datalength, k, N, signed_prefix, offsets) tuple self.servermap = DictOfSets() self.connections = {} # maps peerid to a RemoteReference + self.unreachable_peers = set() # peerids that didn't respond to queries self.problems = [] # mostly for debugging + self.last_update_mode = None + self.last_update_time = 0 def make_versionmap(self): """Return a dict that maps versionid to sets of (shnum, peerid, @@ -505,6 +508,13 @@ class ServermapUpdater: self.num_peers_to_query = N + self.EPSILON initial_peers_to_query, must_query = self._build_initial_querylist() self.required_num_empty_peers = self.EPSILON + + # TODO: also populate self._filenode._privkey + + # TODO: arrange to read 3KB from one peer who is likely to hold a + # share, so we can avoid the latency of that extra roundtrip. 3KB + # would get us the encprivkey from a dirnode with up to 7 + # entries, allowing us to make an update in 2 RTT instead of 3. else: initial_peers_to_query, must_query = self._build_initial_querylist() @@ -583,6 +593,51 @@ class ServermapUpdater: verifier = rsa.create_verifying_key_from_string(pubkey_s) return verifier + def _try_to_extract_privkey(self, data, peerid, shnum): + try: + r = unpack_share(data) + except NeedMoreDataError, e: + # this share won't help us. oh well. + offset = e.encprivkey_offset + length = e.encprivkey_length + self.log("shnum %d on peerid %s: share was too short (%dB) " + "to get the encprivkey; [%d:%d] ought to hold it" % + (shnum, idlib.shortnodeid_b2a(peerid), len(data), + offset, offset+length)) + # NOTE: if uncoordinated writes are taking place, someone might + # change the share (and most probably move the encprivkey) before + # we get a chance to do one of these reads and fetch it. This + # will cause us to see a NotEnoughPeersError(unable to fetch + # privkey) instead of an UncoordinatedWriteError . This is a + # nuisance, but it will go away when we move to DSA-based mutable + # files (since the privkey will be small enough to fit in the + # write cap). + + self._encprivkey_shares.append( (peerid, shnum, offset, length)) + return + + (seqnum, root_hash, IV, k, N, segsize, datalen, + pubkey, signature, share_hash_chain, block_hash_tree, + share_data, enc_privkey) = r + + return self._try_to_validate_privkey(enc_privkey, peerid, shnum) + + def _try_to_validate_privkey(self, enc_privkey, peerid, shnum): + alleged_privkey_s = self._node._decrypt_privkey(enc_privkey) + alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s) + if alleged_writekey != self._writekey: + self.log("invalid privkey from %s shnum %d" % + (idlib.nodeid_b2a(peerid)[:8], shnum), level=log.WEIRD) + return + + # it's good + self.log("got valid privkey from shnum %d on peerid %s" % + (shnum, idlib.shortnodeid_b2a(peerid))) + self._privkey = rsa.create_signing_key_from_string(alleged_privkey_s) + self._encprivkey = enc_privkey + self._node._populate_encprivkey(self._encprivkey) + self._node._populate_privkey(self._privkey) + def _got_results(self, datavs, peerid, readsize, stuff, started): self.log(format="got result from [%(peerid)s], %(numshares)d shares", peerid=idlib.shortnodeid_b2a(peerid), @@ -671,6 +726,7 @@ class ServermapUpdater: self._queries_outstanding.discard(peerid) self._bad_peers.add(peerid) self._servermap.problems.append(f) + self._servermap.unreachable_peers.add(peerid) # TODO: overkill? self._queries_completed += 1 self._last_failure = f @@ -839,6 +895,8 @@ class ServermapUpdater: if not self._running: return self._running = False + self._servermap.last_update_mode = self._mode + self._servermap.last_update_time = self._started # the servermap will not be touched after this eventually(self._done_deferred.callback, self._servermap) @@ -1280,24 +1338,16 @@ class PublishStatus: self.active = value class Publish: - """I represent a single act of publishing the mutable file to the grid.""" + """I represent a single act of publishing the mutable file to the grid. I + will only publish my data if the servermap I am using still represents + the current state of the world. - def __init__(self, filenode): - self._node = filenode - self._storage_index = self._node.get_storage_index() - self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5] - num = self._node._client.log("Publish(%s): starting" % prefix) - self._log_number = num - self._status = PublishStatus() - self._status.set_storage_index(self._storage_index) - self._status.set_helper(False) - self._status.set_progress(0.0) - self._status.set_active(True) - self._started = time.time() + To make the initial publish, set servermap to None. + """ - def new__init__(self, filenode, servermap): + def __init__(self, filenode, servermap): self._node = filenode - self._servermap =servermap + self._servermap = servermap self._storage_index = self._node.get_storage_index() self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5] num = self._node._client.log("Publish(%s): starting" % prefix) @@ -1330,93 +1380,43 @@ class Publish: # 5: when enough responses are back, we're done self.log("starting publish, datalen is %s" % len(newdata)) - self._status.set_size(len(newdata)) self._writekey = self._node.get_writekey() assert self._writekey, "need write capability to publish" - old_roothash = self._node._current_roothash - old_seqnum = self._node._current_seqnum - assert old_seqnum is not None, "must read before replace" - self._new_seqnum = old_seqnum + 1 - - # read-before-replace also guarantees these fields are available - readkey = self._node.get_readkey() - required_shares = self._node.get_required_shares() - total_shares = self._node.get_total_shares() + # first, which servers will we publish to? We require that the + # servermap was updated in MODE_WRITE, so we can depend upon the + # peerlist computed by that process instead of computing our own. + if self._servermap: + assert self._servermap.last_update_mode == MODE_WRITE + # we will push a version that is one larger than anything present + # in the grid, according to the servermap. + self._new_seqnum = self._servermap.highest_seqnum() + 1 + else: + # If we don't have a servermap, that's because we're doing the + # initial publish + self._new_seqnum = 1 + self._servermap = ServerMap() + + # having an up-to-date servermap (or using a filenode that was just + # created for the first time) also guarantees that the following + # fields are available + self.readkey = self._node.get_readkey() + self.required_shares = self._node.get_required_shares() + assert self.required_shares is not None + self.total_shares = self._node.get_total_shares() + assert self.total_shares is not None self._pubkey = self._node.get_pubkey() - self._status.set_encoding(required_shares, total_shares) - - # these two may not be, we might have to get them from the first peer + assert self._pubkey self._privkey = self._node.get_privkey() + assert self._privkey self._encprivkey = self._node.get_encprivkey() - IV = os.urandom(16) - - # we read only 1KB because all we generally care about is the seqnum - # ("prefix") info, so we know which shares are where. We need to get - # the privkey from somebody, which means reading more like 3KB, but - # the code in _obtain_privkey will ensure that we manage that even if - # we need an extra roundtrip. TODO: arrange to read 3KB from one peer - # who is likely to hold a share, so we can avoid the latency of that - # extra roundtrip. 3KB would get us the encprivkey from a dirnode - # with up to 7 entries, allowing us to make an update in 2 RTT - # instead of 3. - self._read_size = 1000 - self._status.initial_read_size = self._read_size - - d = defer.succeed(total_shares) - d.addCallback(self._query_peers) - d.addCallback(self._query_peers_done) - d.addCallback(self._obtain_privkey) - d.addCallback(self._obtain_privkey_done) - - d.addCallback(self._encrypt_and_encode, newdata, readkey, IV, - required_shares, total_shares) - d.addCallback(self._generate_shares, self._new_seqnum, IV) - - d.addCallback(self._send_shares, IV) - d.addCallback(self._maybe_recover) - d.addCallback(self._done) - return d - - def _query_peers(self, total_shares): - self.log("_query_peers") - self._query_peers_started = now = time.time() - elapsed = now - self._started - self._status.timings["setup"] = elapsed - - storage_index = self._storage_index - - # In 0.7.0, we went through extra work to make sure that we include - # ourselves in the peerlist, mainly to match Retrieve (which did the - # same thing. With the post-0.7.0 Introducer refactoring, we got rid - # of the include-myself flags, and standardized on the - # uploading/downloading node not being special. - - # One nice feature of the old approach was that by putting a share on - # the local storage server, we're more likely to be able to retrieve - # a copy of the encrypted private key (even if all the old servers - # have gone away), so we can regenerate new shares even if we can't - # retrieve the old contents. This need will eventually go away when - # we switch to DSA-based mutable files (which store the private key - # in the URI). - - peerlist = self._node._client.get_permuted_peers("storage", - storage_index) - - current_share_peers = DictOfSets() - reachable_peers = {} - # list of (peerid, shnum, offset, length) where the encprivkey might - # be found - self._encprivkey_shares = [] - - EPSILON = total_shares / 2 - #partial_peerlist = islice(peerlist, total_shares + EPSILON) - partial_peerlist = peerlist[:total_shares+EPSILON] - self._status.peers_queried = len(partial_peerlist) - - self._storage_servers = {} + client = self._node._client + full_peerlist = client.get_permuted_peers("storage", + self._storage_index) + self.full_peerlist = full_peerlist # for use later, immutable + self.bad_peers = set() # peerids who have errbacked/refused requests started = time.time() dl = [] @@ -1427,130 +1427,115 @@ class Publish: peerid, permutedid, reachable_peers, current_share_peers, started) dl.append(d) - d = defer.DeferredList(dl, fireOnOneErrback=True) + d = defer.DeferredList(dl) d.addCallback(self._got_all_query_results, total_shares, reachable_peers, current_share_peers) # TODO: add an errback too, probably to ignore that peer - # TODO: if we can't get a privkey from these servers, consider - # looking farther afield. Be aware of the old 0.7.0 behavior that - # causes us to create our initial directory before we've connected to - # anyone but ourselves.. those old directories may not be - # retrieveable if our own server is no longer in the early part of - # the permuted peerlist. - return d + # we limit the segment size as usual to constrain our memory + # footprint. The max segsize is higher for mutable files, because we + # want to support dirnodes with up to 10k children, and each child + # uses about 330 bytes. If you actually put that much into a + # directory you'll be using a footprint of around 14MB, which is + # higher than we'd like, but it is more important right now to + # support large directories than to make memory usage small when you + # use them. Once we implement MDMF (with multiple segments), we will + # drop this back down, probably to 128KiB. + self.MAX_SEGMENT_SIZE = 3500000 - def _do_read(self, ss, peerid, storage_index, shnums, readv): - # isolate the callRemote to a separate method, so tests can subclass - # Publish and override it - d = ss.callRemote("slot_readv", storage_index, shnums, readv) - return d + segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata)) + # this must be a multiple of self.required_shares + segment_size = mathutil.next_multiple(segment_size, + self.required_shares) + self.segment_size = segment_size + if segment_size: + self.num_segments = mathutil.div_ceil(len(self.newdata), + segment_size) + else: + self.num_segments = 0 + assert self.num_segments in [0, 1,] # SDMF restrictions - def _do_query(self, ss, peerid, storage_index): - self.log("querying %s" % idlib.shortnodeid_b2a(peerid)) - d = self._do_read(ss, peerid, storage_index, [], [(0, self._read_size)]) - return d + self.surprised = False - def _got_query_results(self, datavs, peerid, permutedid, - reachable_peers, current_share_peers, started): + # we keep track of three tables. The first is our goal: which share + # we want to see on which servers. This is initially populated by the + # existing servermap. + self.goal = set() # pairs of (peerid, shnum) tuples - lp = self.log(format="_got_query_results from %(peerid)s", - peerid=idlib.shortnodeid_b2a(peerid)) - elapsed = time.time() - started - self._status.add_per_server_time(peerid, "read", elapsed) + # the second table is our list of outstanding queries: those which + # are in flight and may or may not be delivered, accepted, or + # acknowledged. Items are added to this table when the request is + # sent, and removed when the response returns (or errbacks). + self.outstanding = set() # (peerid, shnum) tuples - assert isinstance(datavs, dict) - reachable_peers[peerid] = permutedid - if not datavs: - self.log("peer has no shares", parent=lp) - for shnum, datav in datavs.items(): - lp2 = self.log("peer has shnum %d" % shnum, parent=lp) - assert len(datav) == 1 - data = datav[0] - # We want (seqnum, root_hash, IV) from all servers to know what - # versions we are replacing. We want the encprivkey from one - # server (assuming it's valid) so we know our own private key, so - # we can sign our update. SMDF: read the whole share from each - # server. TODO: later we can optimize this to transfer less data. - - # we assume that we have enough data to extract the signature. - # TODO: if this raises NeedMoreDataError, arrange to do another - # read pass. - r = unpack_prefix_and_signature(data) - (seqnum, root_hash, IV, k, N, segsize, datalen, - pubkey_s, signature, prefix) = r - - # self._pubkey is present because we require read-before-replace - valid = self._pubkey.verify(prefix, signature) - if not valid: - self.log(format="bad signature from %(peerid)s shnum %(shnum)d", - peerid=idlib.shortnodeid_b2a(peerid), shnum=shnum, - parent=lp2, level=log.WEIRD) - continue - self.log(format="peer has goodsig shnum %(shnum)d seqnum %(seqnum)d", - shnum=shnum, seqnum=seqnum, - parent=lp2, level=log.NOISY) + # the third is a table of successes: share which have actually been + # placed. These are populated when responses come back with success. + # When self.placed == self.goal, we're done. + self.placed = set() # (peerid, shnum) tuples - share = (shnum, seqnum, root_hash) - current_share_peers.add(shnum, (peerid, seqnum, root_hash) ) + # we use the servermap to populate the initial goal: this way we will + # try to update each existing share in place. + for (peerid, shares) in self._servermap.servermap.items(): + for (shnum, versionid, timestamp) in shares: + self.goal.add( (peerid, shnum) ) - if not self._privkey: - self._try_to_extract_privkey(data, peerid, shnum) + # create the shares. We'll discard these as they are delivered. SMDF: + # we're allowed to hold everything in memory. + d = self._encrypt_and_encode() + d.addCallback(self._generate_shares) + d.addCallback(self.loop) # trigger delivery - def _try_to_extract_privkey(self, data, peerid, shnum): - try: - r = unpack_share(data) - except NeedMoreDataError, e: - # this share won't help us. oh well. - offset = e.encprivkey_offset - length = e.encprivkey_length - self.log("shnum %d on peerid %s: share was too short (%dB) " - "to get the encprivkey; [%d:%d] ought to hold it" % - (shnum, idlib.shortnodeid_b2a(peerid), len(data), - offset, offset+length)) - # NOTE: if uncoordinated writes are taking place, someone might - # change the share (and most probably move the encprivkey) before - # we get a chance to do one of these reads and fetch it. This - # will cause us to see a NotEnoughPeersError(unable to fetch - # privkey) instead of an UncoordinatedWriteError . This is a - # nuisance, but it will go away when we move to DSA-based mutable - # files (since the privkey will be small enough to fit in the - # write cap). + return self.done_deferred - self._encprivkey_shares.append( (peerid, shnum, offset, length)) + def loop(self): + self.update_goal() + # how far are we from our goal? + needed = self.goal - self.placed - self.outstanding + + if needed: + # we need to send out new shares + d = self.send_shares(needed) + d.addCallback(self.loop) + d.addErrback(self._fatal_error) return - (seqnum, root_hash, IV, k, N, segsize, datalen, - pubkey, signature, share_hash_chain, block_hash_tree, - share_data, enc_privkey) = r + if self.outstanding: + # queries are still pending, keep waiting + return - return self._try_to_validate_privkey(enc_privkey, peerid, shnum) + # no queries outstanding, no placements needed: we're done + return self._done() - def _try_to_validate_privkey(self, enc_privkey, peerid, shnum): - alleged_privkey_s = self._node._decrypt_privkey(enc_privkey) - alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s) - if alleged_writekey != self._writekey: - self.log("invalid privkey from %s shnum %d" % - (idlib.nodeid_b2a(peerid)[:8], shnum), level=log.WEIRD) - return + def log_goal(self, goal): + logmsg = [] + for (peerid, shnum) in goal: + logmsg.append("sh%d to [%s]" % (shnum, + idlib.shortnodeid_b2a(peerid))) + self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY) + self.log("we are planning to push new seqnum=#%d" % self._new_seqnum, + level=log.NOISY) - # it's good - self.log("got valid privkey from shnum %d on peerid %s" % - (shnum, idlib.shortnodeid_b2a(peerid))) - self._privkey = rsa.create_signing_key_from_string(alleged_privkey_s) - self._encprivkey = enc_privkey - self._node._populate_encprivkey(self._encprivkey) - self._node._populate_privkey(self._privkey) + def update_goal(self): + # first, remove any bad peers from our goal + self.goal = set([ (peerid, shnum) + for (peerid, shnum) in self.goal + if peerid not in self.bad_peers ]) - def _got_all_query_results(self, res, - total_shares, reachable_peers, - current_share_peers): - self.log("_got_all_query_results") + # find the homeless shares: + homefull_shares = set([shnum for (peerid, shnum) in self.goal]) + homeless_shares = set(range(self.total_shares)) - homefull_shares + homeless_shares = sorted(list(homeless_shares)) + # place them somewhere. We prefer unused servers at the beginning of + # the available peer list. + + if not homeless_shares: + return - # now that we know everything about the shares currently out there, - # decide where to place the new shares. + # if log.recording_noisy + if False: + self.log_goal(self.goal) # if an old share X is on a node, put the new share X there too. # TODO: 1: redistribute shares to achieve one-per-peer, by copying @@ -1559,160 +1544,58 @@ class Publish: # TODO: 2: move those shares instead of copying them, to reduce future # update work - # if log.recording_noisy - logmsg = [] - for shnum in range(total_shares): - logmsg2 = [] - for oldplace in current_share_peers.get(shnum, []): - (peerid, seqnum, R) = oldplace - logmsg2.append("%s:#%d:R=%s" % (idlib.shortnodeid_b2a(peerid), - seqnum, base32.b2a(R)[:4])) - logmsg.append("sh%d on (%s)" % (shnum, "/".join(logmsg2))) - self.log("sharemap: %s" % (", ".join(logmsg)), level=log.NOISY) - self.log("we are planning to push new seqnum=#%d" % self._new_seqnum, - level=log.NOISY) - - shares_needing_homes = range(total_shares) - target_map = DictOfSets() # maps shnum to set((peerid,oldseqnum,oldR)) - shares_per_peer = DictOfSets() - for shnum in range(total_shares): - for oldplace in current_share_peers.get(shnum, []): - (peerid, seqnum, R) = oldplace - if seqnum >= self._new_seqnum: - self.log("the sequence number has changed unexpectedly", - level=log.WEIRD) - self.log(format="peerid=%(peerid)s, theirs=%(seqnum)d, mine=%(new_seqnum)d", - peerid=idlib.shortnodeid_b2a(peerid), - seqnum=seqnum, - new_seqnum=self._new_seqnum) - raise UncoordinatedWriteError("the sequence number has changed unexpectedly") - target_map.add(shnum, oldplace) - shares_per_peer.add(peerid, shnum) - if shnum in shares_needing_homes: - shares_needing_homes.remove(shnum) - - # now choose homes for the remaining shares. We prefer peers with the - # fewest target shares, then peers with the lowest permuted index. If - # there are no shares already in place, this will assign them - # one-per-peer in the normal permuted order. - while shares_needing_homes: - if not reachable_peers: - prefix = storage.si_b2a(self._node.get_storage_index())[:5] - raise NotEnoughPeersError("ran out of peers during upload of (%s); shares_needing_homes: %s, reachable_peers: %s" % (prefix, shares_needing_homes, reachable_peers,)) - shnum = shares_needing_homes.pop(0) - possible_homes = reachable_peers.keys() - possible_homes.sort(lambda a,b: - cmp( (len(shares_per_peer.get(a, [])), - reachable_peers[a]), - (len(shares_per_peer.get(b, [])), - reachable_peers[b]) )) - target_peerid = possible_homes[0] - target_map.add(shnum, (target_peerid, None, None) ) - shares_per_peer.add(target_peerid, shnum) - - assert not shares_needing_homes - - target_info = (target_map, shares_per_peer) - return target_info - - def _query_peers_done(self, target_info): - self._obtain_privkey_started = now = time.time() - elapsed = time.time() - self._query_peers_started - self._status.timings["query"] = elapsed - return target_info - - def _obtain_privkey(self, target_info): - # make sure we've got a copy of our private key. - if self._privkey: - # Must have picked it up during _query_peers. We're good to go. - if "privkey_fetch" not in self._status.timings: - self._status.timings["privkey_fetch"] = 0.0 - return target_info - - # Nope, we haven't managed to grab a copy, and we still need it. Ask - # peers one at a time until we get a copy. Only bother asking peers - # who've admitted to holding a share. - - self._privkey_fetch_started = time.time() - target_map, shares_per_peer = target_info - # pull shares from self._encprivkey_shares - if not self._encprivkey_shares: - raise NotEnoughPeersError("Unable to find a copy of the privkey") - - (peerid, shnum, offset, length) = self._encprivkey_shares.pop(0) - ss = self._storage_servers[peerid] - self.log("trying to obtain privkey from %s shnum %d" % - (idlib.shortnodeid_b2a(peerid), shnum)) - d = self._do_privkey_query(ss, peerid, shnum, offset, length) - d.addErrback(self.log_err) - d.addCallback(lambda res: self._obtain_privkey(target_info)) - return d - - def _do_privkey_query(self, rref, peerid, shnum, offset, length): - started = time.time() - d = self._do_read(rref, peerid, self._storage_index, - [shnum], [(offset, length)]) - d.addCallback(self._privkey_query_response, peerid, shnum, started) - return d - - def _privkey_query_response(self, datav, peerid, shnum, started): - elapsed = time.time() - started - self._status.add_per_server_time(peerid, "read", elapsed) - - data = datav[shnum][0] - self._try_to_validate_privkey(data, peerid, shnum) - - elapsed = time.time() - self._privkey_fetch_started - self._status.timings["privkey_fetch"] = elapsed - self._status.privkey_from = peerid - - def _obtain_privkey_done(self, target_info): - elapsed = time.time() - self._obtain_privkey_started - self._status.timings["privkey"] = elapsed - return target_info - - def _encrypt_and_encode(self, target_info, - newdata, readkey, IV, - required_shares, total_shares): + # this is CPU intensive but easy to analyze. We create a sort order + # for each peerid. If the peerid is marked as bad, we don't even put + # them in the list. Then we care about the number of shares which + # have already been assigned to them. After that we care about their + # permutation order. + old_assignments = DictOfSets() + for (peerid, shnum) in self.goal: + old_assignments.add(peerid, shnum) + + peerlist = [] + for i, (peerid, ss) in enumerate(self.full_peerlist): + entry = (len(old_assignments[peerid]), i, peerid, ss) + peerlist.add(entry) + peerlist.sort() + + new_assignments = [] + # we then index this peerlist with an integer, because we may have to + # wrap. We update the goal as we go. + i = 0 + for shnum in homeless_shares: + (ignored1, ignored2, peerid, ss) = peerlist[i] + self.goal.add( (peerid, shnum) ) + i += 1 + if i >= len(peerlist): + i = 0 + + + + def _encrypt_and_encode(self): + # this returns a Deferred that fires with a list of (sharedata, + # sharenum) tuples. TODO: cache the ciphertext, only produce the + # shares that we care about. self.log("_encrypt_and_encode") - started = time.time() + #started = time.time() - key = hashutil.ssk_readkey_data_hash(IV, readkey) + key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey) enc = AES(key) - crypttext = enc.process(newdata) - assert len(crypttext) == len(newdata) + crypttext = enc.process(self.newdata) + assert len(crypttext) == len(self.newdata) - now = time.time() - self._status.timings["encrypt"] = now - started - started = now + #now = time.time() + #self._status.timings["encrypt"] = now - started + #started = now # now apply FEC - # we limit the segment size as usual to constrain our memory - # footprint. The max segsize is higher for mutable files, because we - # want to support dirnodes with up to 10k children, and each child - # uses about 330 bytes. If you actually put that much into a - # directory you'll be using a footprint of around 14MB, which is - # higher than we'd like, but it is more important right now to - # support large directories than to make memory usage small when you - # use them. Once we implement MDMF (with multiple segments), we will - # drop this back down, probably to 128KiB. - self.MAX_SEGMENT_SIZE = 3500000 - data_length = len(crypttext) - - segment_size = min(self.MAX_SEGMENT_SIZE, len(crypttext)) - # this must be a multiple of self.required_shares - segment_size = mathutil.next_multiple(segment_size, required_shares) - if segment_size: - self.num_segments = mathutil.div_ceil(len(crypttext), segment_size) - else: - self.num_segments = 0 - assert self.num_segments in [0, 1,] # SDMF restrictions fec = codec.CRSEncoder() - fec.set_params(segment_size, required_shares, total_shares) + fec.set_params(self.segment_size, + self.required_shares, self.total_shares) piece_size = fec.get_block_size() - crypttext_pieces = [None] * required_shares + crypttext_pieces = [None] * self.required_shares for i in range(len(crypttext_pieces)): offset = i * piece_size piece = crypttext[offset:offset+piece_size] @@ -1722,24 +1605,16 @@ class Publish: d = fec.encode(crypttext_pieces) def _done_encoding(res): - elapsed = time.time() - started - self._status.timings["encode"] = elapsed + #elapsed = time.time() - started + #self._status.timings["encode"] = elapsed return res d.addCallback(_done_encoding) - d.addCallback(lambda shares_and_shareids: - (shares_and_shareids, - required_shares, total_shares, - segment_size, data_length, - target_info) ) return d - def _generate_shares(self, (shares_and_shareids, - required_shares, total_shares, - segment_size, data_length, - target_info), - seqnum, IV): + def _generate_shares(self, shares_and_shareids): + # this sets self.shares and self.root_hash self.log("_generate_shares") - started = time.time() + #started = time.time() # we should know these by now privkey = self._privkey @@ -1749,7 +1624,7 @@ class Publish: (shares, share_ids) = shares_and_shareids assert len(shares) == len(share_ids) - assert len(shares) == total_shares + assert len(shares) == self.total_shares all_shares = {} block_hash_trees = {} share_hash_leaves = [None] * len(shares) @@ -1767,7 +1642,7 @@ class Publish: assert leaf is not None share_hash_tree = hashtree.HashTree(share_hash_leaves) share_hash_chain = {} - for shnum in range(total_shares): + for shnum in range(self.total_shares): needed_hashes = share_hash_tree.needed_hashes(shnum) share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i]) for i in needed_hashes ] ) @@ -1775,24 +1650,24 @@ class Publish: assert len(root_hash) == 32 self.log("my new root_hash is %s" % base32.b2a(root_hash)) - prefix = pack_prefix(seqnum, root_hash, IV, - required_shares, total_shares, - segment_size, data_length) + prefix = pack_prefix(self._new_seqnum, root_hash, self.salt, + self.required_shares, self.total_shares, + self.segment_size, len(self.newdata)) # now pack the beginning of the share. All shares are the same up # to the signature, then they have divergent share hash chains, - # then completely different block hash trees + IV + share data, + # then completely different block hash trees + salt + share data, # then they all share the same encprivkey at the end. The sizes # of everything are the same for all shares. - sign_started = time.time() + #sign_started = time.time() signature = privkey.sign(prefix) - self._status.timings["sign"] = time.time() - sign_started + #self._status.timings["sign"] = time.time() - sign_started verification_key = pubkey.serialize() final_shares = {} - for shnum in range(total_shares): + for shnum in range(self.total_shares): final_share = pack_share(prefix, verification_key, signature, @@ -1801,14 +1676,15 @@ class Publish: all_shares[shnum], encprivkey) final_shares[shnum] = final_share - elapsed = time.time() - started - self._status.timings["pack"] = elapsed - return (seqnum, root_hash, final_shares, target_info) + #elapsed = time.time() - started + #self._status.timings["pack"] = elapsed + self.shares = final_shares + self.root_hash = root_hash - def _send_shares(self, (seqnum, root_hash, final_shares, target_info), IV): + def _send_shares(self, needed): self.log("_send_shares") - started = time.time() + #started = time.time() # we're finally ready to send out our shares. If we encounter any # surprises here, it's because somebody else is writing at the same @@ -1817,46 +1693,85 @@ class Publish: # surprises here are *not* indications of UncoordinatedWriteError, # and we'll need to respond to them more gracefully.) - target_map, shares_per_peer = target_info - - my_checkstring = pack_checkstring(seqnum, root_hash, IV) - peer_messages = {} - expected_old_shares = {} - - for shnum, peers in target_map.items(): - for (peerid, old_seqnum, old_root_hash) in peers: - testv = [(0, len(my_checkstring), "le", my_checkstring)] - new_share = final_shares[shnum] - writev = [(0, new_share)] - if peerid not in peer_messages: - peer_messages[peerid] = {} - peer_messages[peerid][shnum] = (testv, writev, None) - if peerid not in expected_old_shares: - expected_old_shares[peerid] = {} - expected_old_shares[peerid][shnum] = (old_seqnum, old_root_hash) - - read_vector = [(0, len(my_checkstring))] + # needed is a set of (peerid, shnum) tuples. The first thing we do is + # organize it by peerid. + + peermap = DictOfSets() + for (peerid, shnum) in needed: + peermap[peerid].add(shnum) + + # the next thing is to build up a bunch of test vectors. The + # semantics of Publish are that we perform the operation if the world + # hasn't changed since the ServerMap was constructed (more or less). + # For every share we're trying to place, we create a test vector that + # tests to see if the server*share still corresponds to the + # map. + + all_tw_vectors = {} # maps peerid to tw_vectors + sm = self._servermap.servermap + + for (peerid, shnum) in needed: + testvs = [] + for (old_shnum, old_versionid, old_timestamp) in sm[peerid]: + if old_shnum == shnum: + # an old version of that share already exists on the + # server, according to our servermap. We will create a + # request that attempts to replace it. + (old_seqnum, old_root_hash, old_salt, old_segsize, + old_datalength, old_k, old_N, old_prefix, + old_offsets_tuple) = old_versionid + old_checkstring = pack_checkstring(old_seqnum, + old_root_hash, + old_salt) + testv = [ (0, len(old_checkstring), "eq", old_checkstring) ] + testvs.append(testv) + break + if not testvs: + # add a testv that requires the share not exist + testv = [ (0, 1, 'eq', "") ] + testvs.append(testv) + + # the write vector is simply the share + writev = [(0, self.shares[shnum])] + + if peerid not in all_tw_vectors: + all_tw_vectors[peerid] = {} + # maps shnum to (testvs, writevs, new_length) + assert shnum not in all_tw_vectors[peerid] + + all_tw_vectors[peerid][shnum] = (testvs, writev, None) + + # we read the checkstring back from each share, however we only use + # it to detect whether there was a new share that we didn't know + # about. The success or failure of the write will tell us whether + # there was a collision or not. If there is a collision, the first + # thing we'll do is update the servermap, which will find out what + # happened. We could conceivably reduce a roundtrip by using the + # readv checkstring to populate the servermap, but really we'd have + # to read enough data to validate the signatures too, so it wouldn't + # be an overall win. + read_vector = [(0, struct.calcsize(SIGNED_PREFIX))] - dl = [] # ok, send the messages! - self._surprised = False - dispatch_map = DictOfSets() - - for peerid, tw_vectors in peer_messages.items(): + started = time.time() + dl = [] + for (peerid, tw_vectors) in all_tw_vectors.items(): write_enabler = self._node.get_write_enabler(peerid) renew_secret = self._node.get_renewal_secret(peerid) cancel_secret = self._node.get_cancel_secret(peerid) secrets = (write_enabler, renew_secret, cancel_secret) + shnums = tw_vectors.keys() d = self._do_testreadwrite(peerid, secrets, tw_vectors, read_vector) - d.addCallback(self._got_write_answer, tw_vectors, my_checkstring, - peerid, expected_old_shares[peerid], dispatch_map, - started) + d.addCallbacks(self._got_write_answer, self._got_write_error, + callbackArgs=(peerid, shnums, started), + errbackArgs=(peerid, shnums, started)) + d.addErrback(self.error, peerid) dl.append(d) - d = defer.DeferredList(dl, fireOnOneErrback=True) + d = defer.DeferredList(dl) def _done_sending(res): elapsed = time.time() - started self._status.timings["push"] = elapsed @@ -1868,7 +1783,7 @@ class Publish: def _do_testreadwrite(self, peerid, secrets, tw_vectors, read_vector): - storage_index = self._node._uri.storage_index + storage_index = self._storage_index ss = self._storage_servers[peerid] d = ss.callRemote("slot_testv_and_readv_and_writev", @@ -1878,54 +1793,63 @@ class Publish: read_vector) return d - def _got_write_answer(self, answer, tw_vectors, my_checkstring, - peerid, expected_old_shares, - dispatch_map, started): + def _got_write_answer(self, answer, peerid, shnums, started): lp = self.log("_got_write_answer from %s" % idlib.shortnodeid_b2a(peerid)) - elapsed = time.time() - started - self._status.add_per_server_time(peerid, "write", elapsed) + for shnum in shnums: + self.outstanding.discard( (peerid, shnum) ) wrote, read_data = answer - surprised = False - (new_seqnum,new_root_hash,new_IV) = unpack_checkstring(my_checkstring) + if not wrote: + # TODO: use the checkstring to add information to the log message + #self.log("somebody modified the share on us:" + # " shnum=%d: I thought they had #%d:R=%s," + # " but testv reported #%d:R=%s" % + # (shnum, + # seqnum, base32.b2a(root_hash)[:4], + # old_seqnum, base32.b2a(old_root_hash)[:4]), + # parent=lp, level=log.WEIRD) + self.log("our testv failed, so the write did not happen", + parent=lp, level=log.WEIRD) + self.surprised = True + self.bad_peers.add(peerid) # don't ask them again + # self.loop() will take care of finding new homes + return - if wrote: - for shnum in tw_vectors: - dispatch_map.add(shnum, (peerid, new_seqnum, new_root_hash)) - else: - # surprise! our testv failed, so the write did not happen - self.log("our testv failed, that write did not happen", + sm = self._servermap.servermap + for shnum in shnums: + self.placed.add( (peerid, shnum) ) + # and update the servermap. We strip the old entry out.. + newset = set([ t + for t in sm[peerid] + if t[0] != shnum ]) + sm[peerid] = newset + # and add a new one + sm[peerid].add( (shnum, self.versioninfo, started) ) + + surprise_shares = set(read_data.keys()) - set(shnums) + if surprise_shares: + self.log("they had shares %s that we didn't know about" % + (list(surprise_shares),), parent=lp, level=log.WEIRD) - surprised = True - - for shnum, (old_cs,) in read_data.items(): - (old_seqnum, old_root_hash, IV) = unpack_checkstring(old_cs) - - if not wrote: - dispatch_map.add(shnum, (peerid, old_seqnum, old_root_hash)) - - if shnum not in expected_old_shares: - # surprise! there was a share we didn't know about - self.log("they had share %d that we didn't know about" % shnum, - parent=lp, level=log.WEIRD) - surprised = True - else: - seqnum, root_hash = expected_old_shares[shnum] - if seqnum is not None: - if seqnum != old_seqnum or root_hash != old_root_hash: - # surprise! somebody modified the share on us - self.log("somebody modified the share on us:" - " shnum=%d: I thought they had #%d:R=%s," - " but testv reported #%d:R=%s" % - (shnum, - seqnum, base32.b2a(root_hash)[:4], - old_seqnum, base32.b2a(old_root_hash)[:4]), - parent=lp, level=log.WEIRD) - surprised = True - if surprised: - self._surprised = True + self.surprised = True + return + + # self.loop() will take care of checking to see if we're done + return + + def _got_write_error(self, f, peerid, shnums, started): + for shnum in shnums: + self.outstanding.discard( (peerid, shnum) ) + self.bad_peers.add(peerid) + self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s", + shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid), + level=log.UNUSUAL) + # self.loop() will take care of checking to see if we're done + return + + def _log_dispatch_map(self, dispatch_map): for shnum, places in dispatch_map.items(): @@ -1960,6 +1884,30 @@ class Publish: return self._status + def _do_privkey_query(self, rref, peerid, shnum, offset, length): + started = time.time() + d = self._do_read(rref, peerid, self._storage_index, + [shnum], [(offset, length)]) + d.addCallback(self._privkey_query_response, peerid, shnum, started) + return d + + def _privkey_query_response(self, datav, peerid, shnum, started): + elapsed = time.time() - started + self._status.add_per_server_time(peerid, "read", elapsed) + + data = datav[shnum][0] + self._try_to_validate_privkey(data, peerid, shnum) + + elapsed = time.time() - self._privkey_fetch_started + self._status.timings["privkey_fetch"] = elapsed + self._status.privkey_from = peerid + + def _obtain_privkey_done(self, target_info): + elapsed = time.time() - self._obtain_privkey_started + self._status.timings["privkey"] = elapsed + return target_info + + # use client.create_mutable_file() to make one of these class MutableFileNode: @@ -2181,9 +2129,35 @@ class MutableFileNode: assert self._pubkey, "update_servermap must be called before publish" d = self.obtain_lock() d.addCallback(lambda res: Publish(self, servermap).publish(newdata)) + # p = self.publish_class(self) + # self._client.notify_publish(p) d.addCallback(self.release_lock) return d + def modify(self, modifier, *args, **kwargs): + """I use a modifier callback to apply a change to the mutable file. + I implement the following pseudocode:: + + obtain_mutable_filenode_lock() + while True: + update_servermap(MODE_WRITE) + old = retrieve_best_version() + new = modifier(old, *args, **kwargs) + if new == old: break + try: + publish(new) + except UncoordinatedWriteError: + continue + break + release_mutable_filenode_lock() + + The idea is that your modifier function can apply a delta of some + sort, and it will be re-run as necessary until it succeeds. The + modifier must inspect the old version to see whether its delta has + already been applied: if so it should return the contents unmodified. + """ + NotImplementedError + ################################# def check(self):