From: Brian Warner Date: Fri, 11 Apr 2008 01:44:06 +0000 (-0700) Subject: mutable: WIP. make Publish work, remove some test scaffolding. test_system still... X-Git-Tag: allmydata-tahoe-1.1.0~247 X-Git-Url: https://git.rkrishnan.org/?a=commitdiff_plain;h=418407ee5cd18868425125385c8290087f283489;p=tahoe-lafs%2Ftahoe-lafs.git mutable: WIP. make Publish work, remove some test scaffolding. test_system still fails. --- diff --git a/src/allmydata/mutable.py b/src/allmydata/mutable.py index 5fa05816..956f83b6 100644 --- a/src/allmydata/mutable.py +++ b/src/allmydata/mutable.py @@ -1,5 +1,5 @@ -import os, struct, time, weakref +import os, sys, struct, time, weakref from itertools import count from zope.interface import implements from twisted.internet import defer @@ -32,6 +32,9 @@ class UncoordinatedWriteError(Exception): def __repr__(self): return "<%s -- You, oh user, tried to change a file or directory at the same time as another process was trying to change it. To avoid data loss, don't do this. Please see docs/write_coordination.html for details.>" % (self.__class__.__name__,) +class UnrecoverableFileError(Exception): + pass + class CorruptShareError(Exception): def __init__(self, peerid, shnum, reason): self.args = (peerid, shnum, reason) @@ -278,6 +281,18 @@ class ServerMap: self.last_update_mode = None self.last_update_time = 0 + def dump(self, out=sys.stdout): + print >>out, "servermap:" + for (peerid, shares) in self.servermap.items(): + for (shnum, versionid, timestamp) in sorted(shares): + (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, + offsets_tuple) = versionid + print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" % + (idlib.shortnodeid_b2a(peerid), shnum, + seqnum, base32.b2a(root_hash)[:4], k, N, + datalength)) + return out + def make_versionmap(self): """Return a dict that maps versionid to sets of (shnum, peerid, timestamp) tuples.""" @@ -287,6 +302,18 @@ class ServerMap: versionmap.add(verinfo, (shnum, peerid, timestamp)) return versionmap + def shares_on_peer(self, peerid): + return set([shnum + for (shnum, versionid, timestamp) + in self.servermap.get(peerid, [])]) + + def version_on_peer(self, peerid, shnum): + shares = self.servermap.get(peerid, []) + for (sm_shnum, sm_versionid, sm_timestamp) in shares: + if sm_shnum == shnum: + return sm_versionid + return None + def shares_available(self): """Return a dict that maps versionid to tuples of (num_distinct_shares, k) tuples.""" @@ -301,6 +328,13 @@ class ServerMap: all_shares[versionid] = (len(s), k) return all_shares + def highest_seqnum(self): + available = self.shares_available() + seqnums = [versionid[0] + for versionid in available.keys()] + seqnums.append(0) + return max(seqnums) + def recoverable_versions(self): """Return a set of versionids, one for each version that is currently recoverable.""" @@ -433,7 +467,10 @@ class ServermapUpdater: # * if we only need the checkstring, then [0:75] # * if we need to validate the checkstring sig, then [543ish:799ish] # * if we need the verification key, then [107:436ish] - # * the offset table at [75:107] tells us about the 'ish' + # * the offset table at [75:107] tells us about the 'ish' + # * if we need the encrypted private key, we want [-1216ish:] + # * but we can't read from negative offsets + # * the offset table tells us the 'ish', also the positive offset # A future version of the SMDF slot format should consider using # fixed-size slots so we can retrieve less data. For now, we'll just # read 2000 bytes, which also happens to read enough actual data to @@ -442,6 +479,9 @@ class ServermapUpdater: if mode == MODE_CHECK: # we use unpack_prefix_and_signature, so we need 1k self._read_size = 1000 + self._need_privkey = False + if mode == MODE_WRITE and not self._node._privkey: + self._need_privkey = True prefix = storage.si_b2a(self._storage_index)[:5] self._log_number = log.msg("SharemapUpdater(%s): starting" % prefix) @@ -494,9 +534,6 @@ class ServermapUpdater: # might not wait for all of their answers to come back) self.num_peers_to_query = k + self.EPSILON - # TODO: initial_peers_to_query needs to be ordered list of (peerid, - # ss) tuples - if self.mode == MODE_CHECK: initial_peers_to_query = dict(full_peerlist) must_query = set(initial_peers_to_query.keys()) @@ -509,12 +546,11 @@ class ServermapUpdater: initial_peers_to_query, must_query = self._build_initial_querylist() self.required_num_empty_peers = self.EPSILON - # TODO: also populate self._filenode._privkey + # TODO: arrange to read lots of data from k-ish servers, to avoid + # the extra round trip required to read large directories. This + # might also avoid the round trip required to read the encrypted + # private key. - # TODO: arrange to read 3KB from one peer who is likely to hold a - # share, so we can avoid the latency of that extra roundtrip. 3KB - # would get us the encprivkey from a dirnode with up to 7 - # entries, allowing us to make an update in 2 RTT instead of 3. else: initial_peers_to_query, must_query = self._build_initial_querylist() @@ -531,10 +567,8 @@ class ServermapUpdater: # contains the overflow (peers that we should tap if we don't get # enough responses) - d = defer.succeed(initial_peers_to_query) - d.addCallback(self._send_initial_requests) - d.addCallback(lambda res: self._done_deferred) - return d + self._send_initial_requests(initial_peers_to_query) + return self._done_deferred def _build_initial_querylist(self): initial_peers_to_query = {} @@ -566,10 +600,6 @@ class ServermapUpdater: # might produce a result. return None - def _do_read(self, ss, peerid, storage_index, shnums, readv): - d = ss.callRemote("slot_readv", storage_index, shnums, readv) - return d - def _do_query(self, ss, peerid, storage_index, readsize): self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d", peerid=idlib.shortnodeid_b2a(peerid), @@ -586,63 +616,18 @@ class ServermapUpdater: # _query_failed) get logged, but we still want to check for doneness. d.addErrback(log.err) d.addBoth(self._check_for_done) - d.addErrback(log.err) + d.addErrback(self._fatal_error) return d - def _deserialize_pubkey(self, pubkey_s): - verifier = rsa.create_verifying_key_from_string(pubkey_s) - return verifier - - def _try_to_extract_privkey(self, data, peerid, shnum): - try: - r = unpack_share(data) - except NeedMoreDataError, e: - # this share won't help us. oh well. - offset = e.encprivkey_offset - length = e.encprivkey_length - self.log("shnum %d on peerid %s: share was too short (%dB) " - "to get the encprivkey; [%d:%d] ought to hold it" % - (shnum, idlib.shortnodeid_b2a(peerid), len(data), - offset, offset+length)) - # NOTE: if uncoordinated writes are taking place, someone might - # change the share (and most probably move the encprivkey) before - # we get a chance to do one of these reads and fetch it. This - # will cause us to see a NotEnoughPeersError(unable to fetch - # privkey) instead of an UncoordinatedWriteError . This is a - # nuisance, but it will go away when we move to DSA-based mutable - # files (since the privkey will be small enough to fit in the - # write cap). - - self._encprivkey_shares.append( (peerid, shnum, offset, length)) - return - - (seqnum, root_hash, IV, k, N, segsize, datalen, - pubkey, signature, share_hash_chain, block_hash_tree, - share_data, enc_privkey) = r - - return self._try_to_validate_privkey(enc_privkey, peerid, shnum) - - def _try_to_validate_privkey(self, enc_privkey, peerid, shnum): - alleged_privkey_s = self._node._decrypt_privkey(enc_privkey) - alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s) - if alleged_writekey != self._writekey: - self.log("invalid privkey from %s shnum %d" % - (idlib.nodeid_b2a(peerid)[:8], shnum), level=log.WEIRD) - return - - # it's good - self.log("got valid privkey from shnum %d on peerid %s" % - (shnum, idlib.shortnodeid_b2a(peerid))) - self._privkey = rsa.create_signing_key_from_string(alleged_privkey_s) - self._encprivkey = enc_privkey - self._node._populate_encprivkey(self._encprivkey) - self._node._populate_privkey(self._privkey) + def _do_read(self, ss, peerid, storage_index, shnums, readv): + d = ss.callRemote("slot_readv", storage_index, shnums, readv) + return d def _got_results(self, datavs, peerid, readsize, stuff, started): - self.log(format="got result from [%(peerid)s], %(numshares)d shares", - peerid=idlib.shortnodeid_b2a(peerid), - numshares=len(datavs), - level=log.NOISY) + lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares", + peerid=idlib.shortnodeid_b2a(peerid), + numshares=len(datavs), + level=log.NOISY) self._queries_outstanding.discard(peerid) self._must_query.discard(peerid) self._queries_completed += 1 @@ -655,10 +640,14 @@ class ServermapUpdater: else: self._empty_peers.add(peerid) + last_verinfo = None + last_shnum = None for shnum,datav in datavs.items(): data = datav[0] try: - self._got_results_one_share(shnum, data, peerid) + verinfo = self._got_results_one_share(shnum, data, peerid) + last_verinfo = verinfo + last_shnum = shnum except CorruptShareError, e: # log it and give the other shares a chance to be processed f = failure.Failure() @@ -667,8 +656,27 @@ class ServermapUpdater: self._last_failure = f self._servermap.problems.append(f) pass + + if self._need_privkey and last_verinfo: + # send them a request for the privkey. We send one request per + # server. + (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, + offsets_tuple) = last_verinfo + o = dict(offsets_tuple) + + self._queries_outstanding.add(peerid) + readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ] + ss = self._servermap.connections[peerid] + d = self._do_read(ss, peerid, self._storage_index, + [last_shnum], readv) + d.addCallback(self._got_privkey_results, peerid, last_shnum) + d.addErrback(self._privkey_query_failed, peerid, last_shnum) + d.addErrback(log.err) + d.addCallback(self._check_for_done) + d.addErrback(self._fatal_error) + # all done! - self.log("DONE") + self.log("_got_results done", parent=lp) def _got_results_one_share(self, shnum, data, peerid): self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s", @@ -689,6 +697,9 @@ class ServermapUpdater: "pubkey doesn't match fingerprint") self._node._pubkey = self._deserialize_pubkey(pubkey_s) + if self._need_privkey: + self._try_to_extract_privkey(data, peerid, shnum) + (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N, ig_segsize, ig_datalen, offsets) = unpack_header(data) offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] ) @@ -716,6 +727,57 @@ class ServermapUpdater: self._servermap.servermap.add(peerid, (shnum, verinfo, timestamp)) # and the versionmap self.versionmap.add(verinfo, (shnum, peerid, timestamp)) + return verinfo + + def _deserialize_pubkey(self, pubkey_s): + verifier = rsa.create_verifying_key_from_string(pubkey_s) + return verifier + + def _try_to_extract_privkey(self, data, peerid, shnum): + try: + r = unpack_share(data) + except NeedMoreDataError, e: + # this share won't help us. oh well. + offset = e.encprivkey_offset + length = e.encprivkey_length + self.log("shnum %d on peerid %s: share was too short (%dB) " + "to get the encprivkey; [%d:%d] ought to hold it" % + (shnum, idlib.shortnodeid_b2a(peerid), len(data), + offset, offset+length)) + # NOTE: if uncoordinated writes are taking place, someone might + # change the share (and most probably move the encprivkey) before + # we get a chance to do one of these reads and fetch it. This + # will cause us to see a NotEnoughPeersError(unable to fetch + # privkey) instead of an UncoordinatedWriteError . This is a + # nuisance, but it will go away when we move to DSA-based mutable + # files (since the privkey will be small enough to fit in the + # write cap). + + self._encprivkey_shares.append( (peerid, shnum, offset, length)) + return + + (seqnum, root_hash, IV, k, N, segsize, datalen, + pubkey, signature, share_hash_chain, block_hash_tree, + share_data, enc_privkey) = r + + return self._try_to_validate_privkey(self, enc_privkey, peerid, shnum) + + def _try_to_validate_privkey(self, enc_privkey, peerid, shnum): + + alleged_privkey_s = self._node._decrypt_privkey(enc_privkey) + alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s) + if alleged_writekey != self._writekey: + self.log("invalid privkey from %s shnum %d" % + (idlib.nodeid_b2a(peerid)[:8], shnum), level=log.WEIRD) + return + + # it's good + self.log("got valid privkey from shnum %d on peerid %s" % + (shnum, idlib.shortnodeid_b2a(peerid))) + privkey = rsa.create_signing_key_from_string(alleged_privkey_s) + self._node._populate_encprivkey(enc_privkey) + self._node._populate_privkey(privkey) + self._need_privkey = False def _query_failed(self, f, peerid): @@ -730,6 +792,27 @@ class ServermapUpdater: self._queries_completed += 1 self._last_failure = f + def _got_privkey_results(self, datavs, peerid, shnum): + self._queries_outstanding.discard(peerid) + if not self._need_privkey: + return + if shnum not in datavs: + self.log("privkey wasn't there when we asked it", level=log.WEIRD) + return + datav = datavs[shnum] + enc_privkey = datav[0] + self._try_to_validate_privkey(enc_privkey, peerid, shnum) + + def _privkey_query_failed(self, f, peerid, shnum): + self._queries_outstanding.discard(peerid) + self.log("error during privkey query: %s %s" % (f, f.value), + level=log.WEIRD) + if not self._running: + return + self._queries_outstanding.discard(peerid) + self._servermap.problems.append(f) + self._last_failure = f + def _check_for_done(self, res): # exit paths: # return self._send_more_queries(outstanding) : send some more queries @@ -821,6 +904,8 @@ class ServermapUpdater: num_not_responded = 0 num_not_found = 0 states = [] + found_boundary = False + for i,(peerid,ss) in enumerate(self.full_peerlist): if peerid in self._bad_peers: # query failed @@ -835,14 +920,8 @@ class ServermapUpdater: if num_not_found >= self.EPSILON: self.log("MODE_WRITE: found our boundary, %s" % "".join(states)) - # we need to know that we've gotten answers from - # everybody to the left of here - if last_not_responded == -1: - # we're done - self.log("have all our answers") - return self._done() - # still waiting for somebody - return self._send_more_queries(num_not_responded) + found_boundary = True + break elif peerid in self._good_peers: # yes shares @@ -857,6 +936,25 @@ class ServermapUpdater: last_not_responded = i num_not_responded += 1 + if found_boundary: + # we need to know that we've gotten answers from + # everybody to the left of here + if last_not_responded == -1: + # we're done + self.log("have all our answers") + # .. unless we're still waiting on the privkey + if self._need_privkey: + self.log("but we're still waiting for the privkey") + # if we found the boundary but we haven't yet found + # the privkey, we may need to look further. If + # somehow all the privkeys were corrupted (but the + # shares were readable), then this is likely to do an + # exhaustive search. + return self._send_more_queries(MAX_IN_FLIGHT) + return self._done() + # still waiting for somebody + return self._send_more_queries(num_not_responded) + # if we hit here, we didn't find our boundary, so we're still # waiting for peers self.log("MODE_WRITE: no boundary yet, %s" % "".join(states)) @@ -869,11 +967,12 @@ class ServermapUpdater: return self._send_more_queries(MAX_IN_FLIGHT) def _send_more_queries(self, num_outstanding): - assert self.extra_peers # we shouldn't get here with nothing in reserve more_queries = [] while True: - self.log(" there are %d queries outstanding" % len(self._queries_outstanding)) + self.log(format=" there are %(outstanding)d queries outstanding", + outstanding=len(self._queries_outstanding), + level=log.NOISY) active_queries = len(self._queries_outstanding) + len(more_queries) if active_queries >= num_outstanding: break @@ -895,11 +994,15 @@ class ServermapUpdater: if not self._running: return self._running = False - self._servermap.last_update_mode = self._mode + self._servermap.last_update_mode = self.mode self._servermap.last_update_time = self._started # the servermap will not be touched after this eventually(self._done_deferred.callback, self._servermap) + def _fatal_error(self, f): + self.log("fatal error", failure=f, level=log.WEIRD) + self._done_deferred.errback(f) + class Marker: pass @@ -1270,9 +1373,9 @@ class Retrieve: self._running = False # res is either the new contents, or a Failure if isinstance(res, failure.Failure): - self.log("DONE, with failure", failure=res) + self.log("Retrieve done, with failure", failure=res) else: - self.log("DONE, success!: res=%s" % (res,)) + self.log("Retrieve done, success!: res=%s" % (res,)) eventually(self._done_deferred.callback, res) @@ -1345,6 +1448,16 @@ class Publish: To make the initial publish, set servermap to None. """ + # we limit the segment size as usual to constrain our memory footprint. + # The max segsize is higher for mutable files, because we want to support + # dirnodes with up to 10k children, and each child uses about 330 bytes. + # If you actually put that much into a directory you'll be using a + # footprint of around 14MB, which is higher than we'd like, but it is + # more important right now to support large directories than to make + # memory usage small when you use them. Once we implement MDMF (with + # multiple segments), we will drop this back down, probably to 128KiB. + MAX_SEGMENT_SIZE = 3500000 + def __init__(self, filenode, servermap): self._node = filenode self._servermap = servermap @@ -1352,6 +1465,7 @@ class Publish: self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5] num = self._node._client.log("Publish(%s): starting" % prefix) self._log_number = num + self._running = True def log(self, *args, **kwargs): if 'parent' not in kwargs: @@ -1381,6 +1495,8 @@ class Publish: self.log("starting publish, datalen is %s" % len(newdata)) + self.done_deferred = defer.Deferred() + self._writekey = self._node.get_writekey() assert self._writekey, "need write capability to publish" @@ -1433,28 +1549,7 @@ class Publish: current_share_peers) # TODO: add an errback too, probably to ignore that peer - # we limit the segment size as usual to constrain our memory - # footprint. The max segsize is higher for mutable files, because we - # want to support dirnodes with up to 10k children, and each child - # uses about 330 bytes. If you actually put that much into a - # directory you'll be using a footprint of around 14MB, which is - # higher than we'd like, but it is more important right now to - # support large directories than to make memory usage small when you - # use them. Once we implement MDMF (with multiple segments), we will - # drop this back down, probably to 128KiB. - self.MAX_SEGMENT_SIZE = 3500000 - - segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata)) - # this must be a multiple of self.required_shares - segment_size = mathutil.next_multiple(segment_size, - self.required_shares) - self.segment_size = segment_size - if segment_size: - self.num_segments = mathutil.div_ceil(len(self.newdata), - segment_size) - else: - self.num_segments = 0 - assert self.num_segments in [0, 1,] # SDMF restrictions + self.setup_encoding_parameters() self.surprised = False @@ -1474,11 +1569,17 @@ class Publish: # When self.placed == self.goal, we're done. self.placed = set() # (peerid, shnum) tuples + # we also keep a mapping from peerid to RemoteReference. Each time we + # pull a connection out of the full peerlist, we add it to this for + # use later. + self.connections = {} + # we use the servermap to populate the initial goal: this way we will # try to update each existing share in place. for (peerid, shares) in self._servermap.servermap.items(): for (shnum, versionid, timestamp) in shares: self.goal.add( (peerid, shnum) ) + self.connections[peerid] = self._servermap.connections[peerid] # create the shares. We'll discard these as they are delivered. SMDF: # we're allowed to hold everything in memory. @@ -1486,27 +1587,53 @@ class Publish: d = self._encrypt_and_encode() d.addCallback(self._generate_shares) d.addCallback(self.loop) # trigger delivery + d.addErrback(self._fatal_error) return self.done_deferred - def loop(self): + def setup_encoding_parameters(self): + segment_size = min(self.MAX_SEGMENT_SIZE, len(self.newdata)) + # this must be a multiple of self.required_shares + segment_size = mathutil.next_multiple(segment_size, + self.required_shares) + self.segment_size = segment_size + if segment_size: + self.num_segments = mathutil.div_ceil(len(self.newdata), + segment_size) + else: + self.num_segments = 0 + assert self.num_segments in [0, 1,] # SDMF restrictions + + def _fatal_error(self, f): + self.log("error during loop", failure=f, level=log.SCARY) + self._done(f) + + def loop(self, ignored=None): + self.log("entering loop", level=log.NOISY) self.update_goal() # how far are we from our goal? needed = self.goal - self.placed - self.outstanding if needed: # we need to send out new shares - d = self.send_shares(needed) + self.log(format="need to send %(needed)d new shares", + needed=len(needed), level=log.NOISY) + d = self._send_shares(needed) d.addCallback(self.loop) d.addErrback(self._fatal_error) return if self.outstanding: # queries are still pending, keep waiting + self.log(format="%(outstanding)d queries still outstanding", + outstanding=len(self.outstanding), + level=log.NOISY) return # no queries outstanding, no placements needed: we're done - return self._done() + self.log("no queries outstanding, no placements needed: done", + level=log.OPERATIONAL) + return self._done(None) def log_goal(self, goal): logmsg = [] @@ -1544,19 +1671,19 @@ class Publish: # TODO: 2: move those shares instead of copying them, to reduce future # update work - # this is CPU intensive but easy to analyze. We create a sort order - # for each peerid. If the peerid is marked as bad, we don't even put - # them in the list. Then we care about the number of shares which - # have already been assigned to them. After that we care about their - # permutation order. + # this is a bit CPU intensive but easy to analyze. We create a sort + # order for each peerid. If the peerid is marked as bad, we don't + # even put them in the list. Then we care about the number of shares + # which have already been assigned to them. After that we care about + # their permutation order. old_assignments = DictOfSets() for (peerid, shnum) in self.goal: old_assignments.add(peerid, shnum) peerlist = [] for i, (peerid, ss) in enumerate(self.full_peerlist): - entry = (len(old_assignments[peerid]), i, peerid, ss) - peerlist.add(entry) + entry = (len(old_assignments.get(peerid, [])), i, peerid, ss) + peerlist.append(entry) peerlist.sort() new_assignments = [] @@ -1566,6 +1693,7 @@ class Publish: for shnum in homeless_shares: (ignored1, ignored2, peerid, ss) = peerlist[i] self.goal.add( (peerid, shnum) ) + self.connections[peerid] = ss i += 1 if i >= len(peerlist): i = 0 @@ -1681,6 +1809,18 @@ class Publish: self.shares = final_shares self.root_hash = root_hash + # we also need to build up the version identifier for what we're + # pushing. Extract the offsets from one of our shares. + assert final_shares + offsets = unpack_header(final_shares.values()[0])[-1] + offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] ) + verinfo = (self._new_seqnum, root_hash, self.salt, + self.segment_size, len(self.newdata), + self.required_shares, self.total_shares, + prefix, offsets_tuple) + self.versioninfo = verinfo + + def _send_shares(self, needed): self.log("_send_shares") @@ -1698,7 +1838,7 @@ class Publish: peermap = DictOfSets() for (peerid, shnum) in needed: - peermap[peerid].add(shnum) + peermap.add(peerid, shnum) # the next thing is to build up a bunch of test vectors. The # semantics of Publish are that we perform the operation if the world @@ -1712,7 +1852,7 @@ class Publish: for (peerid, shnum) in needed: testvs = [] - for (old_shnum, old_versionid, old_timestamp) in sm[peerid]: + for (old_shnum, old_versionid, old_timestamp) in sm.get(peerid,[]): if old_shnum == shnum: # an old version of that share already exists on the # server, according to our servermap. We will create a @@ -1723,12 +1863,23 @@ class Publish: old_checkstring = pack_checkstring(old_seqnum, old_root_hash, old_salt) - testv = [ (0, len(old_checkstring), "eq", old_checkstring) ] + testv = (0, len(old_checkstring), "eq", old_checkstring) testvs.append(testv) break if not testvs: # add a testv that requires the share not exist - testv = [ (0, 1, 'eq', "") ] + #testv = (0, 1, 'eq', "") + + # Unfortunately, foolscap-0.2.5 has a bug in the way inbound + # constraints are handled. If the same object is referenced + # multiple times inside the arguments, foolscap emits a + # 'reference' token instead of a distinct copy of the + # argument. The bug is that these 'reference' tokens are not + # accepted by the inbound constraint code. To work around + # this, we need to prevent python from interning the + # (constant) tuple, by creating a new copy of this vector + # each time. This bug is fixed in later versions of foolscap. + testv = tuple([0, 1, 'eq', ""]) testvs.append(testv) # the write vector is simply the share @@ -1768,7 +1919,7 @@ class Publish: d.addCallbacks(self._got_write_answer, self._got_write_error, callbackArgs=(peerid, shnums, started), errbackArgs=(peerid, shnums, started)) - d.addErrback(self.error, peerid) + d.addErrback(self._fatal_error) dl.append(d) d = defer.DeferredList(dl) @@ -1784,8 +1935,9 @@ class Publish: def _do_testreadwrite(self, peerid, secrets, tw_vectors, read_vector): storage_index = self._storage_index - ss = self._storage_servers[peerid] + ss = self.connections[peerid] + #print "SS[%s] is %s" % (idlib.shortnodeid_b2a(peerid), ss), ss.tracker.interfaceName d = ss.callRemote("slot_testv_and_readv_and_writev", storage_index, secrets, @@ -1798,31 +1950,40 @@ class Publish: idlib.shortnodeid_b2a(peerid)) for shnum in shnums: self.outstanding.discard( (peerid, shnum) ) + sm = self._servermap.servermap wrote, read_data = answer if not wrote: - # TODO: use the checkstring to add information to the log message - #self.log("somebody modified the share on us:" - # " shnum=%d: I thought they had #%d:R=%s," - # " but testv reported #%d:R=%s" % - # (shnum, - # seqnum, base32.b2a(root_hash)[:4], - # old_seqnum, base32.b2a(old_root_hash)[:4]), - # parent=lp, level=log.WEIRD) self.log("our testv failed, so the write did not happen", parent=lp, level=log.WEIRD) self.surprised = True self.bad_peers.add(peerid) # don't ask them again + # use the checkstring to add information to the log message + for (shnum,readv) in read_data.items(): + checkstring = readv[0] + (other_seqnum, + other_roothash, + other_salt) = unpack_checkstring(checkstring) + expected_version = self._servermap.version_on_peer(peerid, + shnum) + (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, + offsets_tuple) = expected_version + self.log("somebody modified the share on us:" + " shnum=%d: I thought they had #%d:R=%s," + " but testv reported #%d:R=%s" % + (shnum, + seqnum, base32.b2a(root_hash)[:4], + other_seqnum, base32.b2a(other_roothash)[:4]), + parent=lp, level=log.NOISY) # self.loop() will take care of finding new homes return - sm = self._servermap.servermap for shnum in shnums: self.placed.add( (peerid, shnum) ) # and update the servermap. We strip the old entry out.. newset = set([ t - for t in sm[peerid] + for t in sm.get(peerid, []) if t[0] != shnum ]) sm[peerid] = newset # and add a new one @@ -1845,6 +2006,7 @@ class Publish: self.bad_peers.add(peerid) self.log(format="error while writing shares %(shnums)s to peerid %(peerid)s", shnums=list(shnums), peerid=idlib.shortnodeid_b2a(peerid), + failure=f, level=log.UNUSUAL) # self.loop() will take care of checking to see if we're done return @@ -1873,40 +2035,21 @@ class Publish: raise UncoordinatedWriteError("I was surprised!") def _done(self, res): - now = time.time() - self._status.timings["total"] = now - self._started - self._status.set_active(False) - self._status.set_status("Done") - self._status.set_progress(1.0) + if not self._running: + return + self._running = False + #now = time.time() + #self._status.timings["total"] = now - self._started + #self._status.set_active(False) + #self._status.set_status("Done") + #self._status.set_progress(1.0) + self.done_deferred.callback(res) return None def get_status(self): return self._status - def _do_privkey_query(self, rref, peerid, shnum, offset, length): - started = time.time() - d = self._do_read(rref, peerid, self._storage_index, - [shnum], [(offset, length)]) - d.addCallback(self._privkey_query_response, peerid, shnum, started) - return d - - def _privkey_query_response(self, datav, peerid, shnum, started): - elapsed = time.time() - started - self._status.add_per_server_time(peerid, "read", elapsed) - - data = datav[shnum][0] - self._try_to_validate_privkey(data, peerid, shnum) - - elapsed = time.time() - self._privkey_fetch_started - self._status.timings["privkey_fetch"] = elapsed - self._status.privkey_from = peerid - - def _obtain_privkey_done(self, target_info): - elapsed = time.time() - self._obtain_privkey_started - self._status.timings["privkey"] = elapsed - return target_info - # use client.create_mutable_file() to make one of these @@ -1991,13 +2134,6 @@ class MutableFileNode: verifier = signer.get_verifying_key() return verifier, signer - def _publish(self, initial_contents): - p = self.publish_class(self) - self._client.notify_publish(p) - d = p.publish(initial_contents) - d.addCallback(lambda res: self) - return d - def _encrypt_privkey(self, writekey, privkey): enc = AES(writekey) crypttext = enc.process(privkey) @@ -2114,7 +2250,7 @@ class MutableFileNode: d = self.obtain_lock() d.addCallback(lambda res: ServermapUpdater(self, servermap, mode).update()) - d.addCallback(self.release_lock) + d.addBoth(self.release_lock) return d def download_version(self, servermap, versionid): @@ -2122,7 +2258,7 @@ class MutableFileNode: d = self.obtain_lock() d.addCallback(lambda res: Retrieve(self, servermap, versionid).download()) - d.addCallback(self.release_lock) + d.addBoth(self.release_lock) return d def publish(self, servermap, newdata): @@ -2131,7 +2267,7 @@ class MutableFileNode: d.addCallback(lambda res: Publish(self, servermap).publish(newdata)) # p = self.publish_class(self) # self._client.notify_publish(p) - d.addCallback(self.release_lock) + d.addBoth(self.release_lock) return d def modify(self, modifier, *args, **kwargs): @@ -2178,17 +2314,32 @@ class MutableFileNode: def download_to_data(self): d = self.obtain_lock() d.addCallback(lambda res: self.update_servermap(mode=MODE_ENOUGH)) - d.addCallback(lambda smap: - self.download_version(smap, - smap.best_recoverable_version())) - d.addCallback(self.release_lock) + def _updated(smap): + goal = smap.best_recoverable_version() + if not goal: + raise UnrecoverableFileError("no recoverable versions") + return self.download_version(smap, goal) + d.addCallback(_updated) + d.addBoth(self.release_lock) + return d + + def _publish(self, initial_contents): + p = Publish(self, None) + d = p.publish(initial_contents) + d.addCallback(lambda res: self) return d def update(self, newdata): - return self._publish(newdata) + d = self.obtain_lock() + d.addCallback(lambda res: self.update_servermap(mode=MODE_WRITE)) + d.addCallback(lambda smap: + Publish(self, smap).publish(newdata)) + d.addBoth(self.release_lock) + return d def overwrite(self, newdata): - return self._publish(newdata) + return self.update(newdata) + class MutableWatcher(service.MultiService): MAX_PUBLISH_STATUSES = 20 diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index 1b188c10..32d228b6 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -1,57 +1,29 @@ -import itertools, struct, re +import struct from cStringIO import StringIO from twisted.trial import unittest from twisted.internet import defer, reactor from twisted.python import failure -from allmydata import mutable, uri, dirnode, download +from allmydata import mutable, uri, download from allmydata.util import base32 from allmydata.util.idlib import shortnodeid_b2a from allmydata.util.hashutil import tagged_hash from allmydata.encode import NotEnoughPeersError -from allmydata.interfaces import IURI, INewDirectoryURI, \ - IMutableFileURI, IUploadable, IFileURI -from allmydata.filenode import LiteralFileNode +from allmydata.interfaces import IURI, IMutableFileURI, IUploadable from foolscap.eventual import eventually, fireEventually from foolscap.logging import log import sha -#from allmydata.test.common import FakeMutableFileNode -#FakeFilenode = FakeMutableFileNode +# this "FastMutableFileNode" exists solely to speed up tests by using smaller +# public/private keys. Once we switch to fast DSA-based keys, we can get rid +# of this. -class FakeFilenode(mutable.MutableFileNode): - counter = itertools.count(1) - all_contents = {} - all_rw_friends = {} +class FastMutableFileNode(mutable.MutableFileNode): + SIGNATURE_KEY_SIZE = 522 - def create(self, initial_contents): - d = mutable.MutableFileNode.create(self, initial_contents) - def _then(res): - self.all_contents[self.get_uri()] = initial_contents - return res - d.addCallback(_then) - return d - def init_from_uri(self, myuri): - mutable.MutableFileNode.init_from_uri(self, myuri) - return self - def _generate_pubprivkeys(self, key_size): - count = self.counter.next() - return FakePubKey(count), FakePrivKey(count) - def _publish(self, initial_contents): - self.all_contents[self.get_uri()] = initial_contents - return defer.succeed(self) - - def download_to_data(self): - if self.is_readonly(): - assert self.all_rw_friends.has_key(self.get_uri()), (self.get_uri(), id(self.all_rw_friends)) - return defer.succeed(self.all_contents[self.all_rw_friends[self.get_uri()]]) - else: - return defer.succeed(self.all_contents[self.get_uri()]) - def update(self, newdata): - self.all_contents[self.get_uri()] = newdata - return defer.succeed(None) - def overwrite(self, newdata): - return self.update(newdata) +# this "FakeStorage" exists to put the share data in RAM and avoid using real +# network connections, both to speed up the tests and to reduce the amount of +# non-mutable.py code being exercised. class FakeStorage: # this class replaces the collection of storage servers, allowing the @@ -77,7 +49,7 @@ class FakeStorage: def read(self, peerid, storage_index): shares = self._peers.get(peerid, {}) if self._sequence is None: - return shares + return defer.succeed(shares) d = defer.Deferred() if not self._pending: reactor.callLater(1.0, self._fire_readers) @@ -106,42 +78,68 @@ class FakeStorage: shares[shnum] = f.getvalue() -class FakePublish(mutable.Publish): - - def _do_read(self, ss, peerid, storage_index, shnums, readv): - assert ss[0] == peerid - assert shnums == [] +class FakeStorageServer: + def __init__(self, peerid, storage): + self.peerid = peerid + self.storage = storage + self.queries = 0 + def callRemote(self, methname, *args, **kwargs): + def _call(): + meth = getattr(self, methname) + return meth(*args, **kwargs) d = fireEventually() - d.addCallback(lambda res: self._storage.read(peerid, storage_index)) + d.addCallback(lambda res: _call()) + return d + + def slot_readv(self, storage_index, shnums, readv): + d = self.storage.read(self.peerid, storage_index) + def _read(shares): + response = {} + for shnum in shares: + if shnums and shnum not in shnums: + continue + vector = response[shnum] = [] + for (offset, length) in readv: + assert isinstance(offset, (int, long)), offset + assert isinstance(length, (int, long)), length + vector.append(shares[shnum][offset:offset+length]) + return response + d.addCallback(_read) return d - def _do_testreadwrite(self, peerid, secrets, - tw_vectors, read_vector): - storage_index = self._node._uri.storage_index + def slot_testv_and_readv_and_writev(self, storage_index, secrets, + tw_vectors, read_vector): # always-pass: parrot the test vectors back to them. readv = {} for shnum, (testv, writev, new_length) in tw_vectors.items(): for (offset, length, op, specimen) in testv: assert op in ("le", "eq", "ge") + # TODO: this isn't right, the read is controlled by read_vector, + # not by testv readv[shnum] = [ specimen for (offset, length, op, specimen) in testv ] for (offset, data) in writev: - self._storage.write(peerid, storage_index, shnum, offset, data) + self.storage.write(self.peerid, storage_index, shnum, + offset, data) answer = (True, readv) - return defer.succeed(answer) - + return fireEventually(answer) - -class FakeNewDirectoryNode(dirnode.NewDirectoryNode): - filenode_class = FakeFilenode +# our "FakeClient" has just enough functionality of the real Client to let +# the tests run. class FakeClient: + mutable_file_node_class = FastMutableFileNode + def __init__(self, num_peers=10): + self._storage = FakeStorage() self._num_peers = num_peers self._peerids = [tagged_hash("peerid", "%d" % i)[:20] for i in range(self._num_peers)] + self._connections = dict([(peerid, FakeStorageServer(peerid, + self._storage)) + for peerid in self._peerids]) self.nodeid = "fakenodeid" def log(self, msg, **kw): @@ -152,17 +150,8 @@ class FakeClient: def get_cancel_secret(self): return "I hereby permit you to cancel my leases" - def create_empty_dirnode(self): - n = FakeNewDirectoryNode(self) - d = n.create() - d.addCallback(lambda res: n) - return d - - def create_dirnode_from_uri(self, u): - return FakeNewDirectoryNode(self).init_from_uri(u) - def create_mutable_file(self, contents=""): - n = FakeFilenode(self) + n = self.mutable_file_node_class(self) d = n.create(contents) d.addCallback(lambda res: n) return d @@ -172,25 +161,16 @@ class FakeClient: def create_node_from_uri(self, u): u = IURI(u) - if INewDirectoryURI.providedBy(u): - return self.create_dirnode_from_uri(u) - if IFileURI.providedBy(u): - if isinstance(u, uri.LiteralFileURI): - return LiteralFileNode(u, self) - else: - # CHK - raise RuntimeError("not simulated") assert IMutableFileURI.providedBy(u), u - res = FakeFilenode(self).init_from_uri(u) + res = self.mutable_file_node_class(self).init_from_uri(u) return res def get_permuted_peers(self, service_name, key): """ @return: list of (peerid, connection,) """ - peers_and_connections = [(pid, (pid,)) for pid in self._peerids] results = [] - for peerid, connection in peers_and_connections: + for (peerid, connection) in self._connections.items(): assert isinstance(peerid, str) permuted = sha.new(key + peerid).digest() results.append((permuted, peerid, connection)) @@ -205,33 +185,11 @@ class FakeClient: #d.addCallback(self.create_mutable_file) def _got_data(datav): data = "".join(datav) - #newnode = FakeFilenode(self) + #newnode = FastMutableFileNode(self) return uri.LiteralFileURI(data) d.addCallback(_got_data) return d -class FakePubKey: - def __init__(self, count): - self.count = count - def serialize(self): - return "PUBKEY-%d" % self.count - def verify(self, msg, signature): - if signature[:5] != "SIGN(": - return False - if signature[5:-1] != msg: - return False - if signature[-1] != ")": - return False - return True - -class FakePrivKey: - def __init__(self, count): - self.count = count - def serialize(self): - return "PRIVKEY-%d" % self.count - def sign(self, data): - return "SIGN(%s)" % data - class Filenode(unittest.TestCase): def setUp(self): @@ -240,7 +198,22 @@ class Filenode(unittest.TestCase): def test_create(self): d = self.client.create_mutable_file() def _created(n): - d = n.overwrite("contents 1") + self.failUnless(isinstance(n, FastMutableFileNode)) + peer0 = self.client._peerids[0] + shnums = self.client._storage._peers[peer0].keys() + self.failUnlessEqual(len(shnums), 1) + d.addCallback(_created) + return d + + def test_upload_and_download(self): + d = self.client.create_mutable_file() + def _created(n): + d = defer.succeed(None) + d.addCallback(lambda res: n.update_servermap()) + d.addCallback(lambda smap: smap.dump(StringIO())) + d.addCallback(lambda sio: + self.failUnless("3-of-10" in sio.getvalue())) + d.addCallback(lambda res: n.overwrite("contents 1")) d.addCallback(lambda res: self.failUnlessIdentical(res, None)) d.addCallback(lambda res: n.download_to_data()) d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1")) @@ -268,40 +241,61 @@ class Filenode(unittest.TestCase): d.addCallback(_created) return d + def test_upload_and_download_full_size_keys(self): + self.client.mutable_file_node_class = mutable.MutableFileNode + d = self.client.create_mutable_file() + def _created(n): + d = defer.succeed(None) + d.addCallback(lambda res: n.update_servermap()) + d.addCallback(lambda smap: smap.dump(StringIO())) + d.addCallback(lambda sio: + self.failUnless("3-of-10" in sio.getvalue())) + d.addCallback(lambda res: n.overwrite("contents 1")) + d.addCallback(lambda res: self.failUnlessIdentical(res, None)) + d.addCallback(lambda res: n.download_to_data()) + d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1")) + d.addCallback(lambda res: n.overwrite("contents 2")) + d.addCallback(lambda res: n.download_to_data()) + d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2")) + d.addCallback(lambda res: n.download(download.Data())) + d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2")) + d.addCallback(lambda res: n.update("contents 3")) + d.addCallback(lambda res: n.download_to_data()) + d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3")) + return d + d.addCallback(_created) + return d + class Publish(unittest.TestCase): def test_encrypt(self): c = FakeClient() - fn = FakeFilenode(c) - # .create usually returns a Deferred, but we happen to know it's - # synchronous + fn = FastMutableFileNode(c) CONTENTS = "some initial contents" - fn.create(CONTENTS) - p = mutable.Publish(fn) - target_info = None - d = defer.maybeDeferred(p._encrypt_and_encode, target_info, - CONTENTS, "READKEY", "IV"*8, 3, 10) - def _done( ((shares, share_ids), - required_shares, total_shares, - segsize, data_length, target_info2) ): + d = fn.create(CONTENTS) + def _created(res): + p = mutable.Publish(fn, None) + p.salt = "SALT" * 4 + p.readkey = "\x00" * 16 + p.newdata = CONTENTS + p.required_shares = 3 + p.total_shares = 10 + p.setup_encoding_parameters() + return p._encrypt_and_encode() + d.addCallback(_created) + def _done(shares_and_shareids): + (shares, share_ids) = shares_and_shareids self.failUnlessEqual(len(shares), 10) for sh in shares: self.failUnless(isinstance(sh, str)) self.failUnlessEqual(len(sh), 7) self.failUnlessEqual(len(share_ids), 10) - self.failUnlessEqual(required_shares, 3) - self.failUnlessEqual(total_shares, 10) - self.failUnlessEqual(segsize, 21) - self.failUnlessEqual(data_length, len(CONTENTS)) - self.failUnlessIdentical(target_info, target_info2) d.addCallback(_done) return d def test_generate(self): c = FakeClient() - fn = FakeFilenode(c) - # .create usually returns a Deferred, but we happen to know it's - # synchronous + fn = FastMutableFileNode(c) CONTENTS = "some initial contents" fn.create(CONTENTS) p = mutable.Publish(fn) @@ -328,7 +322,6 @@ class Publish(unittest.TestCase): self.failUnlessEqual(sorted(final_shares.keys()), range(10)) for i,sh in final_shares.items(): self.failUnless(isinstance(sh, str)) - self.failUnlessEqual(len(sh), 381) # feed the share through the unpacker as a sanity-check pieces = mutable.unpack_share(sh) (u_seqnum, u_root_hash, IV, k, N, segsize, datalen, @@ -340,12 +333,12 @@ class Publish(unittest.TestCase): self.failUnlessEqual(N, 10) self.failUnlessEqual(segsize, 21) self.failUnlessEqual(datalen, len(CONTENTS)) - self.failUnlessEqual(pubkey, FakePubKey(0).serialize()) + self.failUnlessEqual(pubkey, p._pubkey.serialize()) sig_material = struct.pack(">BQ32s16s BBQQ", - 0, seqnum, root_hash, IV, + 0, p._new_seqnum, root_hash, IV, k, N, segsize, datalen) - self.failUnlessEqual(signature, - FakePrivKey(0).sign(sig_material)) + self.failUnless(p._pubkey.verify(sig_material, signature)) + #self.failUnlessEqual(signature, p._privkey.sign(sig_material)) self.failUnless(isinstance(share_hash_chain, dict)) self.failUnlessEqual(len(share_hash_chain), 4) # ln2(10)++ for shnum,share_hash in share_hash_chain.items(): @@ -354,188 +347,36 @@ class Publish(unittest.TestCase): self.failUnlessEqual(len(share_hash), 32) self.failUnless(isinstance(block_hash_tree, list)) self.failUnlessEqual(len(block_hash_tree), 1) # very small tree - self.failUnlessEqual(IV, "IV"*8) + self.failUnlessEqual(IV, "SALT"*4) self.failUnlessEqual(len(share_data), len("%07d" % 1)) - self.failUnlessEqual(enc_privkey, "encprivkey") - self.failUnlessIdentical(target_info, target_info2) - d.addCallback(_done) - return d - - def setup_for_sharemap(self, num_peers): - c = FakeClient(num_peers) - fn = FakeFilenode(c) - s = FakeStorage() - # .create usually returns a Deferred, but we happen to know it's - # synchronous - CONTENTS = "some initial contents" - fn.create(CONTENTS) - p = FakePublish(fn) - p._storage_index = "\x00"*32 - p._new_seqnum = 3 - p._read_size = 1000 - #r = mutable.Retrieve(fn) - p._storage = s - return c, p - - def shouldFail(self, expected_failure, which, call, *args, **kwargs): - substring = kwargs.pop("substring", None) - d = defer.maybeDeferred(call, *args, **kwargs) - def _done(res): - if isinstance(res, failure.Failure): - res.trap(expected_failure) - if substring: - self.failUnless(substring in str(res), - "substring '%s' not in '%s'" - % (substring, str(res))) - else: - self.fail("%s was supposed to raise %s, not get '%s'" % - (which, expected_failure, res)) - d.addBoth(_done) - return d - - def test_sharemap_20newpeers(self): - c, p = self.setup_for_sharemap(20) - - total_shares = 10 - d = p._query_peers(total_shares) - def _done(target_info): - (target_map, shares_per_peer) = target_info - shares_per_peer = {} - for shnum in target_map: - for (peerid, old_seqnum, old_R) in target_map[shnum]: - #print "shnum[%d]: send to %s [oldseqnum=%s]" % \ - # (shnum, idlib.b2a(peerid), old_seqnum) - if peerid not in shares_per_peer: - shares_per_peer[peerid] = 1 - else: - shares_per_peer[peerid] += 1 - # verify that we're sending only one share per peer - for peerid, count in shares_per_peer.items(): - self.failUnlessEqual(count, 1) - d.addCallback(_done) + self.failUnlessEqual(enc_privkey, fn.get_encprivkey()) + d.addCallback(_generated) return d - def test_sharemap_3newpeers(self): - c, p = self.setup_for_sharemap(3) - - total_shares = 10 - d = p._query_peers(total_shares) - def _done(target_info): - (target_map, shares_per_peer) = target_info - shares_per_peer = {} - for shnum in target_map: - for (peerid, old_seqnum, old_R) in target_map[shnum]: - if peerid not in shares_per_peer: - shares_per_peer[peerid] = 1 - else: - shares_per_peer[peerid] += 1 - # verify that we're sending 3 or 4 shares per peer - for peerid, count in shares_per_peer.items(): - self.failUnless(count in (3,4), count) - d.addCallback(_done) - return d - - def test_sharemap_nopeers(self): - c, p = self.setup_for_sharemap(0) - - total_shares = 10 - d = self.shouldFail(NotEnoughPeersError, "test_sharemap_nopeers", - p._query_peers, total_shares) - return d - - def test_write(self): - total_shares = 10 - c, p = self.setup_for_sharemap(20) - p._privkey = FakePrivKey(0) - p._encprivkey = "encprivkey" - p._pubkey = FakePubKey(0) - # make some fake shares - CONTENTS = "some initial contents" - shares_and_ids = ( ["%07d" % i for i in range(10)], range(10) ) - d = defer.maybeDeferred(p._query_peers, total_shares) - IV = "IV"*8 - d.addCallback(lambda target_info: - p._generate_shares( (shares_and_ids, - 3, total_shares, - 21, # segsize - len(CONTENTS), - target_info), - 3, # seqnum - IV)) - d.addCallback(p._send_shares, IV) - def _done((surprised, dispatch_map)): - self.failIf(surprised, "surprised!") - d.addCallback(_done) - return d - -class FakeRetrieve(mutable.Retrieve): - def _do_read(self, ss, peerid, storage_index, shnums, readv): - d = fireEventually() - d.addCallback(lambda res: self._storage.read(peerid, storage_index)) - def _read(shares): - response = {} - for shnum in shares: - if shnums and shnum not in shnums: - continue - vector = response[shnum] = [] - for (offset, length) in readv: - assert isinstance(offset, (int, long)), offset - assert isinstance(length, (int, long)), length - vector.append(shares[shnum][offset:offset+length]) - return response - d.addCallback(_read) - return d - -class FakeServermapUpdater(mutable.ServermapUpdater): - - def _do_read(self, ss, peerid, storage_index, shnums, readv): - d = fireEventually() - d.addCallback(lambda res: self._storage.read(peerid, storage_index)) - def _read(shares): - response = {} - for shnum in shares: - if shnums and shnum not in shnums: - continue - vector = response[shnum] = [] - for (offset, length) in readv: - vector.append(shares[shnum][offset:offset+length]) - return response - d.addCallback(_read) - return d - - def _deserialize_pubkey(self, pubkey_s): - mo = re.search(r"^PUBKEY-(\d+)$", pubkey_s) - if not mo: - raise RuntimeError("mangled pubkey") - count = mo.group(1) - return FakePubKey(int(count)) + # TODO: when we publish to 20 peers, we should get one share per peer on 10 + # when we publish to 3 peers, we should get either 3 or 4 shares per peer + # when we publish to zero peers, we should get a NotEnoughPeersError -class Sharemap(unittest.TestCase): +class Servermap(unittest.TestCase): def setUp(self): # publish a file and create shares, which can then be manipulated # later. num_peers = 20 self._client = FakeClient(num_peers) - self._fn = FakeFilenode(self._client) - self._storage = FakeStorage() - d = self._fn.create("") - def _created(res): - p = FakePublish(self._fn) - p._storage = self._storage - contents = "New contents go here" - return p.publish(contents) + self._storage = self._client._storage + d = self._client.create_mutable_file("New contents go here") + def _created(node): + self._fn = node d.addCallback(_created) return d - def make_servermap(self, storage, mode=mutable.MODE_CHECK): - smu = FakeServermapUpdater(self._fn, mutable.ServerMap(), mode) - smu._storage = storage + def make_servermap(self, mode=mutable.MODE_CHECK): + smu = mutable.ServermapUpdater(self._fn, mutable.ServerMap(), mode) d = smu.update() return d - def update_servermap(self, storage, oldmap, mode=mutable.MODE_CHECK): - smu = FakeServermapUpdater(self._fn, oldmap, mode) - smu._storage = storage + def update_servermap(self, oldmap, mode=mutable.MODE_CHECK): + smu = mutable.ServermapUpdater(self._fn, oldmap, mode) d = smu.update() return d @@ -550,19 +391,18 @@ class Sharemap(unittest.TestCase): return sm def test_basic(self): - s = self._storage # unmangled d = defer.succeed(None) ms = self.make_servermap us = self.update_servermap - d.addCallback(lambda res: ms(s, mode=mutable.MODE_CHECK)) + d.addCallback(lambda res: ms(mode=mutable.MODE_CHECK)) d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10)) - d.addCallback(lambda res: ms(s, mode=mutable.MODE_WRITE)) + d.addCallback(lambda res: ms(mode=mutable.MODE_WRITE)) d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10)) - d.addCallback(lambda res: ms(s, mode=mutable.MODE_ENOUGH)) + d.addCallback(lambda res: ms(mode=mutable.MODE_ENOUGH)) # this more stops at k+epsilon, and epsilon=k, so 6 shares d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6)) - d.addCallback(lambda res: ms(s, mode=mutable.MODE_ANYTHING)) + d.addCallback(lambda res: ms(mode=mutable.MODE_ANYTHING)) # this mode stops at 'k' shares d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 3)) @@ -570,12 +410,12 @@ class Sharemap(unittest.TestCase): # increasing order of number of servers queried, since once a server # gets into the servermap, we'll always ask it for an update. d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 3)) - d.addCallback(lambda sm: us(s, sm, mode=mutable.MODE_ENOUGH)) + d.addCallback(lambda sm: us(sm, mode=mutable.MODE_ENOUGH)) d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6)) - d.addCallback(lambda sm: us(s, sm, mode=mutable.MODE_WRITE)) - d.addCallback(lambda sm: us(s, sm, mode=mutable.MODE_CHECK)) + d.addCallback(lambda sm: us(sm, mode=mutable.MODE_WRITE)) + d.addCallback(lambda sm: us(sm, mode=mutable.MODE_CHECK)) d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10)) - d.addCallback(lambda sm: us(s, sm, mode=mutable.MODE_ANYTHING)) + d.addCallback(lambda sm: us(sm, mode=mutable.MODE_ANYTHING)) d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10)) return d @@ -588,21 +428,20 @@ class Sharemap(unittest.TestCase): self.failUnlessEqual(len(sm.shares_available()), 0) def test_no_shares(self): - s = self._storage - s._peers = {} # delete all shares + self._client._storage._peers = {} # delete all shares ms = self.make_servermap d = defer.succeed(None) - d.addCallback(lambda res: ms(s, mode=mutable.MODE_CHECK)) + d.addCallback(lambda res: ms(mode=mutable.MODE_CHECK)) d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm)) - d.addCallback(lambda res: ms(s, mode=mutable.MODE_ANYTHING)) + d.addCallback(lambda res: ms(mode=mutable.MODE_ANYTHING)) d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm)) - d.addCallback(lambda res: ms(s, mode=mutable.MODE_WRITE)) + d.addCallback(lambda res: ms(mode=mutable.MODE_WRITE)) d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm)) - d.addCallback(lambda res: ms(s, mode=mutable.MODE_ENOUGH)) + d.addCallback(lambda res: ms(mode=mutable.MODE_ENOUGH)) d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm)) return d @@ -616,7 +455,7 @@ class Sharemap(unittest.TestCase): self.failUnlessEqual(sm.shares_available().values()[0], (2,3) ) def test_not_quite_enough_shares(self): - s = self._storage + s = self._client._storage ms = self.make_servermap num_shares = len(s._peers) for peerid in s._peers: @@ -629,13 +468,13 @@ class Sharemap(unittest.TestCase): d = defer.succeed(None) - d.addCallback(lambda res: ms(s, mode=mutable.MODE_CHECK)) + d.addCallback(lambda res: ms(mode=mutable.MODE_CHECK)) d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm)) - d.addCallback(lambda res: ms(s, mode=mutable.MODE_ANYTHING)) + d.addCallback(lambda res: ms(mode=mutable.MODE_ANYTHING)) d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm)) - d.addCallback(lambda res: ms(s, mode=mutable.MODE_WRITE)) + d.addCallback(lambda res: ms(mode=mutable.MODE_WRITE)) d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm)) - d.addCallback(lambda res: ms(s, mode=mutable.MODE_ENOUGH)) + d.addCallback(lambda res: ms(mode=mutable.MODE_ENOUGH)) d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm)) return d @@ -643,28 +482,23 @@ class Sharemap(unittest.TestCase): class Roundtrip(unittest.TestCase): - def setUp(self): # publish a file and create shares, which can then be manipulated # later. self.CONTENTS = "New contents go here" num_peers = 20 self._client = FakeClient(num_peers) - self._fn = FakeFilenode(self._client) - self._storage = FakeStorage() - d = self._fn.create("") - def _created(res): - p = FakePublish(self._fn) - p._storage = self._storage - return p.publish(self.CONTENTS) + self._storage = self._client._storage + d = self._client.create_mutable_file(self.CONTENTS) + def _created(node): + self._fn = node d.addCallback(_created) return d def make_servermap(self, mode=mutable.MODE_ENOUGH, oldmap=None): if oldmap is None: oldmap = mutable.ServerMap() - smu = FakeServermapUpdater(self._fn, oldmap, mode) - smu._storage = self._storage + smu = mutable.ServermapUpdater(self._fn, oldmap, mode) d = smu.update() return d @@ -693,8 +527,7 @@ class Roundtrip(unittest.TestCase): def do_download(self, servermap, version=None): if version is None: version = servermap.best_recoverable_version() - r = FakeRetrieve(self._fn, servermap, version) - r._storage = self._storage + r = mutable.Retrieve(self._fn, servermap, version) return r.download() def test_basic(self): @@ -796,8 +629,7 @@ class Roundtrip(unittest.TestCase): allproblems = [str(f) for f in servermap.problems] self.failUnless(substring in "".join(allproblems)) return - r = FakeRetrieve(self._fn, servermap, ver) - r._storage = self._storage + r = mutable.Retrieve(self._fn, servermap, ver) if should_succeed: d1 = r.download() d1.addCallback(lambda new_contents: @@ -900,20 +732,33 @@ class Roundtrip(unittest.TestCase): self.failUnless("pubkey doesn't match fingerprint" in str(servermap.problems[0])) ver = servermap.best_recoverable_version() - r = FakeRetrieve(self._fn, servermap, ver) - r._storage = self._storage + r = mutable.Retrieve(self._fn, servermap, ver) return r.download() d.addCallback(_do_retrieve) d.addCallback(lambda new_contents: self.failUnlessEqual(new_contents, self.CONTENTS)) return d - def _encode(self, c, s, fn, k, n, data): + +class MultipleEncodings(unittest.TestCase): + def setUp(self): + self.CONTENTS = "New contents go here" + num_peers = 20 + self._client = FakeClient(num_peers) + self._storage = self._client._storage + d = self._client.create_mutable_file(self.CONTENTS) + def _created(node): + self._fn = node + d.addCallback(_created) + return d + + def _encode(self, k, n, data): # encode 'data' into a peerid->shares dict. - fn2 = FakeFilenode(c) + fn2 = FastMutableFileNode(self._client) # init_from_uri populates _uri, _writekey, _readkey, _storage_index, # and _fingerprint + fn = self._fn fn2.init_from_uri(fn.get_uri()) # then we copy over other fields that are normally fetched from the # existing shares @@ -926,9 +771,9 @@ class Roundtrip(unittest.TestCase): fn2._required_shares = k fn2._total_shares = n - p2 = FakePublish(fn2) - p2._storage = s - p2._storage._peers = {} # clear existing storage + s = self._client._storage + s._peers = {} # clear existing storage + p2 = mutable.Publish(fn2, None) d = p2.publish(data) def _published(res): shares = s._peers @@ -937,29 +782,10 @@ class Roundtrip(unittest.TestCase): d.addCallback(_published) return d -class MultipleEncodings(unittest.TestCase): - - def publish(self): - # publish a file and create shares, which can then be manipulated - # later. - self.CONTENTS = "New contents go here" - num_peers = 20 - self._client = FakeClient(num_peers) - self._fn = FakeFilenode(self._client) - self._storage = FakeStorage() - d = self._fn.create("") - def _created(res): - p = FakePublish(self._fn) - p._storage = self._storage - return p.publish(self.CONTENTS) - d.addCallback(_created) - return d - def make_servermap(self, mode=mutable.MODE_ENOUGH, oldmap=None): if oldmap is None: oldmap = mutable.ServerMap() - smu = FakeServermapUpdater(self._fn, oldmap, mode) - smu._storage = self._storage + smu = mutable.ServermapUpdater(self._fn, oldmap, mode) d = smu.update() return d @@ -967,8 +793,6 @@ class MultipleEncodings(unittest.TestCase): # we encode the same file in two different ways (3-of-10 and 4-of-9), # then mix up the shares, to make sure that download survives seeing # a variety of encodings. This is actually kind of tricky to set up. - c, s, fn, p, r = self.setup_for_publish(20) - # we ignore fn, p, and r contents1 = "Contents for encoding 1 (3-of-10) go here" contents2 = "Contents for encoding 2 (4-of-9) go here" @@ -976,19 +800,19 @@ class MultipleEncodings(unittest.TestCase): # we make a retrieval object that doesn't know what encoding # parameters to use - fn3 = FakeFilenode(c) - fn3.init_from_uri(fn.get_uri()) + fn3 = FastMutableFileNode(self._client) + fn3.init_from_uri(self._fn.get_uri()) # now we upload a file through fn1, and grab its shares - d = self._encode(c, s, fn, 3, 10, contents1) + d = self._encode(3, 10, contents1) def _encoded_1(shares): self._shares1 = shares d.addCallback(_encoded_1) - d.addCallback(lambda res: self._encode(c, s, fn, 4, 9, contents2)) + d.addCallback(lambda res: self._encode(4, 9, contents2)) def _encoded_2(shares): self._shares2 = shares d.addCallback(_encoded_2) - d.addCallback(lambda res: self._encode(c, s, fn, 4, 7, contents3)) + d.addCallback(lambda res: self._encode(4, 7, contents3)) def _encoded_3(shares): self._shares3 = shares d.addCallback(_encoded_3) @@ -1021,14 +845,14 @@ class MultipleEncodings(unittest.TestCase): sharemap = {} - for i,peerid in enumerate(c._peerids): + for i,peerid in enumerate(self._client._peerids): peerid_s = shortnodeid_b2a(peerid) for shnum in self._shares1.get(peerid, {}): if shnum < len(places): which = places[shnum] else: which = "x" - s._peers[peerid] = peers = {} + self._client._storage._peers[peerid] = peers = {} in_1 = shnum in self._shares1[peerid] in_2 = shnum in self._shares2.get(peerid, {}) in_3 = shnum in self._shares3.get(peerid, {}) @@ -1050,14 +874,10 @@ class MultipleEncodings(unittest.TestCase): # now sort the sequence so that share 0 is returned first new_sequence = [sharemap[shnum] for shnum in sorted(sharemap.keys())] - s._sequence = new_sequence + self._client._storage._sequence = new_sequence log.msg("merge done") d.addCallback(_merge) - def _retrieve(res): - r3 = FakeRetrieve(fn3) - r3._storage = s - return r3.retrieve() - d.addCallback(_retrieve) + d.addCallback(lambda res: fn3.download_to_data()) def _retrieved(new_contents): # the current specified behavior is "first version recoverable" self.failUnlessEqual(new_contents, contents1) @@ -1147,4 +967,3 @@ class Utils(unittest.TestCase): c.add("v1", 1, 10, xdata[10:20], "time1") #self.failUnlessEqual(c.read("v1", 1, 0, 20), (xdata[:20], "time0")) -