From: Brian Warner Date: Tue, 22 Apr 2008 18:49:53 +0000 (-0700) Subject: mutable: test write failures, uncoordinated write detection X-Git-Tag: allmydata-tahoe-1.1.0~196 X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/flags/%3C?a=commitdiff_plain;h=e6074f5dfca4721af43f9198662f7b2f3a96d720;p=tahoe-lafs%2Ftahoe-lafs.git mutable: test write failures, uncoordinated write detection --- diff --git a/src/allmydata/mutable/common.py b/src/allmydata/mutable/common.py index 4eeba9a6..a6e2acb3 100644 --- a/src/allmydata/mutable/common.py +++ b/src/allmydata/mutable/common.py @@ -30,6 +30,10 @@ class UncoordinatedWriteError(Exception): class UnrecoverableFileError(Exception): pass +class NotEnoughServersError(Exception): + """There were not enough functioning servers available to place shares + upon.""" + class CorruptShareError(Exception): def __init__(self, peerid, shnum, reason): self.args = (peerid, shnum, reason) diff --git a/src/allmydata/mutable/node.py b/src/allmydata/mutable/node.py index 749b8483..275ed377 100644 --- a/src/allmydata/mutable/node.py +++ b/src/allmydata/mutable/node.py @@ -101,23 +101,24 @@ class MutableFileNode: contents. Returns a Deferred that fires (with the MutableFileNode instance you should use) when it completes. """ - self._required_shares, self._total_shares = self.DEFAULT_ENCODING d = defer.maybeDeferred(self._generate_pubprivkeys, keypair_generator) - def _generated( (pubkey, privkey) ): - self._pubkey, self._privkey = pubkey, privkey - pubkey_s = self._pubkey.serialize() - privkey_s = self._privkey.serialize() - self._writekey = hashutil.ssk_writekey_hash(privkey_s) - self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s) - self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s) - self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint) - self._readkey = self._uri.readkey - self._storage_index = self._uri.storage_index - return self._upload(initial_contents, None) - d.addCallback(_generated) + d.addCallback(self._generated) + d.addCallback(lambda res: self._upload(initial_contents, None)) return d + def _generated(self, (pubkey, privkey) ): + self._pubkey, self._privkey = pubkey, privkey + pubkey_s = self._pubkey.serialize() + privkey_s = self._privkey.serialize() + self._writekey = hashutil.ssk_writekey_hash(privkey_s) + self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s) + self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s) + self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint) + self._readkey = self._uri.readkey + self._storage_index = self._uri.storage_index + self._required_shares, self._total_shares = self.DEFAULT_ENCODING + def _generate_pubprivkeys(self, keypair_generator): if keypair_generator: return keypair_generator(self.SIGNATURE_KEY_SIZE) diff --git a/src/allmydata/mutable/publish.py b/src/allmydata/mutable/publish.py index 3ef9f831..1d313918 100644 --- a/src/allmydata/mutable/publish.py +++ b/src/allmydata/mutable/publish.py @@ -11,7 +11,8 @@ from allmydata import hashtree, codec, storage from pycryptopp.cipher.aes import AES from foolscap.eventual import eventually -from common import MODE_WRITE, UncoordinatedWriteError, DictOfSets +from common import MODE_WRITE, DictOfSets, \ + UncoordinatedWriteError, NotEnoughServersError from servermap import ServerMap from layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \ unpack_checkstring, SIGNED_PREFIX @@ -110,16 +111,14 @@ class Publish: self._status.set_progress(0.0) self._status.set_active(True) + def get_status(self): + return self._status + def log(self, *args, **kwargs): if 'parent' not in kwargs: kwargs['parent'] = self._log_number return log.msg(*args, **kwargs) - def log_err(self, *args, **kwargs): - if 'parent' not in kwargs: - kwargs['parent'] = self._log_number - return log.err(*args, **kwargs) - def publish(self, newdata): """Publish the filenode's current contents. Returns a Deferred that fires (with None) when the publish has done as much work as it's ever @@ -191,8 +190,16 @@ class Publish: self.setup_encoding_parameters() + # if we experience any surprises (writes which were rejected because + # our test vector did not match, or shares which we didn't expect to + # see), we set this flag and report an UncoordinatedWriteError at the + # end of the publish process. self.surprised = False + # as a failsafe, refuse to iterate through self.loop more than a + # thousand times. + self.looplimit = 1000 + # we keep track of three tables. The first is our goal: which share # we want to see on which servers. This is initially populated by the # existing servermap. @@ -264,19 +271,28 @@ class Publish: self.log("entering loop", level=log.NOISY) if not self._running: return - self.update_goal() - # how far are we from our goal? - needed = self.goal - self.placed - self.outstanding - self._update_status() - if needed: - # we need to send out new shares - self.log(format="need to send %(needed)d new shares", - needed=len(needed), level=log.NOISY) - d = self._send_shares(needed) - d.addCallback(self.loop) - d.addErrback(self._fatal_error) - return + self.looplimit -= 1 + if self.looplimit <= 0: + raise RuntimeError("loop limit exceeded") + + if self.surprised: + # don't send out any new shares, just wait for the outstanding + # ones to be retired. + self.log("currently surprised, so don't send any new shares", + level=log.NOISY) + else: + self.update_goal() + # how far are we from our goal? + needed = self.goal - self.placed - self.outstanding + self._update_status() + + if needed: + # we need to send out new shares + self.log(format="need to send %(needed)d new shares", + needed=len(needed), level=log.NOISY) + self._send_shares(needed) + return if self.outstanding: # queries are still pending, keep waiting @@ -293,9 +309,9 @@ class Publish: self._status.timings["push"] = elapsed return self._done(None) - def log_goal(self, goal): - logmsg = [] - for (peerid, shnum) in goal: + def log_goal(self, goal, message=""): + logmsg = [message] + for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]): logmsg.append("sh%d to [%s]" % (shnum, idlib.shortnodeid_b2a(peerid))) self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY) @@ -303,6 +319,10 @@ class Publish: level=log.NOISY) def update_goal(self): + # if log.recording_noisy + if True: + self.log_goal(self.goal, "before update: ") + # first, remove any bad peers from our goal self.goal = set([ (peerid, shnum) for (peerid, shnum) in self.goal @@ -318,10 +338,6 @@ class Publish: if not homeless_shares: return - # if log.recording_noisy - if False: - self.log_goal(self.goal) - # if an old share X is on a node, put the new share X there too. # TODO: 1: redistribute shares to achieve one-per-peer, by copying # shares from existing peers to new (less-crowded) ones. The @@ -340,21 +356,35 @@ class Publish: peerlist = [] for i, (peerid, ss) in enumerate(self.full_peerlist): + if peerid in self.bad_peers: + continue entry = (len(old_assignments.get(peerid, [])), i, peerid, ss) peerlist.append(entry) peerlist.sort() + if not peerlist: + raise NotEnoughServersError("Ran out of non-bad servers") + new_assignments = [] # we then index this peerlist with an integer, because we may have to # wrap. We update the goal as we go. i = 0 for shnum in homeless_shares: (ignored1, ignored2, peerid, ss) = peerlist[i] + # TODO: if we are forced to send a share to a server that already + # has one, we may have two write requests in flight, and the + # servermap (which was computed before either request was sent) + # won't reflect the new shares, so the second response will cause + # us to be surprised ("unexpected share on peer"), causing the + # publish to fail with an UncoordinatedWriteError. This is + # troublesome but not really a bit problem. Fix it at some point. self.goal.add( (peerid, shnum) ) self.connections[peerid] = ss i += 1 if i >= len(peerlist): i = 0 + if True: + self.log_goal(self.goal, "after update: ") @@ -564,8 +594,8 @@ class Publish: read_vector = [(0, struct.calcsize(SIGNED_PREFIX))] # ok, send the messages! + self.log("sending %d shares" % len(all_tw_vectors), level=log.NOISY) started = time.time() - dl = [] for (peerid, tw_vectors) in all_tw_vectors.items(): write_enabler = self._node.get_write_enabler(peerid) @@ -574,16 +604,19 @@ class Publish: secrets = (write_enabler, renew_secret, cancel_secret) shnums = tw_vectors.keys() + for shnum in shnums: + self.outstanding.add( (peerid, shnum) ) + d = self._do_testreadwrite(peerid, secrets, tw_vectors, read_vector) d.addCallbacks(self._got_write_answer, self._got_write_error, callbackArgs=(peerid, shnums, started), errbackArgs=(peerid, shnums, started)) + d.addCallback(self.loop) d.addErrback(self._fatal_error) - dl.append(d) self._update_status() - return defer.DeferredList(dl, fireOnOneErrback=True) # just for testing + self.log("%d shares sent" % len(all_tw_vectors), level=log.NOISY) def _do_testreadwrite(self, peerid, secrets, tw_vectors, read_vector): @@ -610,7 +643,29 @@ class Publish: wrote, read_data = answer + surprise_shares = set(read_data.keys()) - set(shnums) + if surprise_shares: + self.log("they had shares %s that we didn't know about" % + (list(surprise_shares),), + parent=lp, level=log.WEIRD) + self.surprised = True + if not wrote: + # TODO: there are two possibilities. The first is that the server + # is full (or just doesn't want to give us any room), which means + # we shouldn't ask them again, but is *not* an indication of an + # uncoordinated write. The second is that our testv failed, which + # *does* indicate an uncoordinated write. We currently don't have + # a way to tell these two apart (in fact, the storage server code + # doesn't have the option of refusing our share). + # + # If the server is full, mark the peer as bad (so we don't ask + # them again), but don't set self.surprised. The loop() will find + # a new server. + # + # If the testv failed, log it, set self.surprised, but don't + # bother adding to self.bad_peers . + self.log("our testv failed, so the write did not happen", parent=lp, level=log.WEIRD) self.surprised = True @@ -623,15 +678,19 @@ class Publish: other_salt) = unpack_checkstring(checkstring) expected_version = self._servermap.version_on_peer(peerid, shnum) - (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, - offsets_tuple) = expected_version - self.log("somebody modified the share on us:" - " shnum=%d: I thought they had #%d:R=%s," - " but testv reported #%d:R=%s" % - (shnum, - seqnum, base32.b2a(root_hash)[:4], - other_seqnum, base32.b2a(other_roothash)[:4]), - parent=lp, level=log.NOISY) + if expected_version: + (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, + offsets_tuple) = expected_version + self.log("somebody modified the share on us:" + " shnum=%d: I thought they had #%d:R=%s," + " but testv reported #%d:R=%s" % + (shnum, + seqnum, base32.b2a(root_hash)[:4], + other_seqnum, base32.b2a(other_roothash)[:4]), + parent=lp, level=log.NOISY) + # if expected_version==None, then we didn't expect to see a + # share on that peer, and the 'surprise_shares' clause above + # will have logged it. # self.loop() will take care of finding new homes return @@ -641,14 +700,6 @@ class Publish: self._servermap.add_new_share(peerid, shnum, self.versioninfo, started) - surprise_shares = set(read_data.keys()) - set(shnums) - if surprise_shares: - self.log("they had shares %s that we didn't know about" % - (list(surprise_shares),), - parent=lp, level=log.WEIRD) - self.surprised = True - return - # self.loop() will take care of checking to see if we're done return @@ -664,28 +715,6 @@ class Publish: return - - def _log_dispatch_map(self, dispatch_map): - for shnum, places in dispatch_map.items(): - sent_to = [(idlib.shortnodeid_b2a(peerid), - seqnum, - base32.b2a(root_hash)[:4]) - for (peerid,seqnum,root_hash) in places] - self.log(" share %d sent to: %s" % (shnum, sent_to), - level=log.NOISY) - - def _maybe_recover(self, (surprised, dispatch_map)): - self.log("_maybe_recover, surprised=%s, dispatch_map:" % surprised, - level=log.NOISY) - self._log_dispatch_map(dispatch_map) - if not surprised: - self.log(" no recovery needed") - return - self.log("We need recovery!", level=log.WEIRD) - print "RECOVERY NOT YET IMPLEMENTED" - # but dispatch_map will help us do it - raise UncoordinatedWriteError("I was surprised!") - def _done(self, res): if not self._running: return @@ -694,14 +723,18 @@ class Publish: self._status.timings["total"] = now - self._started self._status.set_active(False) if isinstance(res, failure.Failure): - self.log("Retrieve done, with failure", failure=res) + self.log("Publish done, with failure", failure=res, level=log.WEIRD) self._status.set_status("Failed") + elif self.surprised: + self.log("Publish done, UncoordinatedWriteError", level=log.UNUSUAL) + self._status.set_status("UncoordinatedWriteError") + # deliver a failure + res = failure.Failure(UncoordinatedWriteError()) + # TODO: recovery else: + self.log("Publish done, success") self._status.set_status("Done") self._status.set_progress(1.0) eventually(self.done_deferred.callback, res) - def get_status(self): - return self._status - diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index bc587af5..7832000b 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -1,12 +1,13 @@ -import struct +import os, struct from cStringIO import StringIO from twisted.trial import unittest from twisted.internet import defer, reactor -from allmydata import uri, download -from allmydata.util import base32, testutil +from allmydata import uri, download, storage +from allmydata.util import base32, testutil, idlib from allmydata.util.idlib import shortnodeid_b2a from allmydata.util.hashutil import tagged_hash +from allmydata.util.fileutil import make_dirs from allmydata.encode import NotEnoughSharesError from allmydata.interfaces import IURI, IMutableFileURI, IUploadable from foolscap.eventual import eventually, fireEventually @@ -1331,3 +1332,140 @@ class Exceptions(unittest.TestCase): ucwe = UncoordinatedWriteError() self.failUnless("UncoordinatedWriteError" in repr(ucwe), repr(ucwe)) +# we can't do this test with a FakeClient, since it uses FakeStorageServer +# instances which always succeed. So we need a less-fake one. + +class IntentionalError(Exception): + pass + +class LocalWrapper: + def __init__(self, original): + self.original = original + self.broken = False + def callRemote(self, methname, *args, **kwargs): + def _call(): + if self.broken: + raise IntentionalError("I was asked to break") + meth = getattr(self.original, "remote_" + methname) + return meth(*args, **kwargs) + d = fireEventually() + d.addCallback(lambda res: _call()) + return d + +class LessFakeClient(FakeClient): + + def __init__(self, basedir, num_peers=10): + self._num_peers = num_peers + self._peerids = [tagged_hash("peerid", "%d" % i)[:20] + for i in range(self._num_peers)] + self._connections = {} + for peerid in self._peerids: + peerdir = os.path.join(basedir, idlib.shortnodeid_b2a(peerid)) + make_dirs(peerdir) + ss = storage.StorageServer(peerdir) + ss.setNodeID(peerid) + lw = LocalWrapper(ss) + self._connections[peerid] = lw + self.nodeid = "fakenodeid" + + +class Problems(unittest.TestCase, testutil.ShouldFailMixin): + def test_surprise(self): + basedir = os.path.join("mutable/CollidingWrites/test_surprise") + self.client = LessFakeClient(basedir) + d = self.client.create_mutable_file("contents 1") + def _created(n): + d = defer.succeed(None) + d.addCallback(lambda res: n.get_servermap(MODE_WRITE)) + def _got_smap1(smap): + # stash the old state of the file + self.old_map = smap + d.addCallback(_got_smap1) + # then modify the file, leaving the old map untouched + d.addCallback(lambda res: log.msg("starting winning write")) + d.addCallback(lambda res: n.overwrite("contents 2")) + # now attempt to modify the file with the old servermap. This + # will look just like an uncoordinated write, in which every + # single share got updated between our mapupdate and our publish + d.addCallback(lambda res: log.msg("starting doomed write")) + d.addCallback(lambda res: + self.shouldFail(UncoordinatedWriteError, + "test_surprise", None, + n.upload, + "contents 2a", self.old_map)) + return d + d.addCallback(_created) + return d + + def test_unexpected_shares(self): + # upload the file, take a servermap, shut down one of the servers, + # upload it again (causing shares to appear on a new server), then + # upload using the old servermap. The last upload should fail with an + # UncoordinatedWriteError, because of the shares that didn't appear + # in the servermap. + basedir = os.path.join("mutable/CollidingWrites/test_unexpexted_shares") + self.client = LessFakeClient(basedir) + d = self.client.create_mutable_file("contents 1") + def _created(n): + d = defer.succeed(None) + d.addCallback(lambda res: n.get_servermap(MODE_WRITE)) + def _got_smap1(smap): + # stash the old state of the file + self.old_map = smap + # now shut down one of the servers + peer0 = list(smap.make_sharemap()[0])[0] + self.client._connections.pop(peer0) + # then modify the file, leaving the old map untouched + log.msg("starting winning write") + return n.overwrite("contents 2") + d.addCallback(_got_smap1) + # now attempt to modify the file with the old servermap. This + # will look just like an uncoordinated write, in which every + # single share got updated between our mapupdate and our publish + d.addCallback(lambda res: log.msg("starting doomed write")) + d.addCallback(lambda res: + self.shouldFail(UncoordinatedWriteError, + "test_surprise", None, + n.upload, + "contents 2a", self.old_map)) + return d + d.addCallback(_created) + return d + + def test_bad_server(self): + # Break one server, then create the file: the initial publish should + # complete with an alternate server. Breaking a second server should + # not prevent an update from succeeding either. + basedir = os.path.join("mutable/CollidingWrites/test_bad_server") + self.client = LessFakeClient(basedir, 20) + # to make sure that one of the initial peers is broken, we have to + # get creative. We create the keys, so we can figure out the storage + # index, but we hold off on doing the initial publish until we've + # broken the server on which the first share wants to be stored. + n = FastMutableFileNode(self.client) + d = defer.succeed(None) + d.addCallback(n._generate_pubprivkeys) + d.addCallback(n._generated) + def _break_peer0(res): + si = n.get_storage_index() + peerlist = self.client.get_permuted_peers("storage", si) + peerid0, connection0 = peerlist[0] + peerid1, connection1 = peerlist[1] + connection0.broken = True + self.connection1 = connection1 + d.addCallback(_break_peer0) + # now let the initial publish finally happen + d.addCallback(lambda res: n._upload("contents 1", None)) + # that ought to work + d.addCallback(lambda res: n.download_best_version()) + d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1")) + # now break the second peer + def _break_peer1(res): + self.connection1.broken = True + d.addCallback(_break_peer1) + d.addCallback(lambda res: n.overwrite("contents 2")) + # that ought to work too + d.addCallback(lambda res: n.download_best_version()) + d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2")) + return d +