from pycryptopp.cipher.aes import AES
from foolscap.eventual import eventually
-from common import MODE_WRITE, UncoordinatedWriteError, DictOfSets
+from common import MODE_WRITE, DictOfSets, \
+ UncoordinatedWriteError, NotEnoughServersError
from servermap import ServerMap
from layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
unpack_checkstring, SIGNED_PREFIX
self._status.set_progress(0.0)
self._status.set_active(True)
+ def get_status(self):
+ return self._status
+
def log(self, *args, **kwargs):
if 'parent' not in kwargs:
kwargs['parent'] = self._log_number
return log.msg(*args, **kwargs)
- def log_err(self, *args, **kwargs):
- if 'parent' not in kwargs:
- kwargs['parent'] = self._log_number
- return log.err(*args, **kwargs)
-
def publish(self, newdata):
"""Publish the filenode's current contents. Returns a Deferred that
fires (with None) when the publish has done as much work as it's ever
self.setup_encoding_parameters()
+ # if we experience any surprises (writes which were rejected because
+ # our test vector did not match, or shares which we didn't expect to
+ # see), we set this flag and report an UncoordinatedWriteError at the
+ # end of the publish process.
self.surprised = False
+ # as a failsafe, refuse to iterate through self.loop more than a
+ # thousand times.
+ self.looplimit = 1000
+
# we keep track of three tables. The first is our goal: which share
# we want to see on which servers. This is initially populated by the
# existing servermap.
self.log("entering loop", level=log.NOISY)
if not self._running:
return
- self.update_goal()
- # how far are we from our goal?
- needed = self.goal - self.placed - self.outstanding
- self._update_status()
- if needed:
- # we need to send out new shares
- self.log(format="need to send %(needed)d new shares",
- needed=len(needed), level=log.NOISY)
- d = self._send_shares(needed)
- d.addCallback(self.loop)
- d.addErrback(self._fatal_error)
- return
+ self.looplimit -= 1
+ if self.looplimit <= 0:
+ raise RuntimeError("loop limit exceeded")
+
+ if self.surprised:
+ # don't send out any new shares, just wait for the outstanding
+ # ones to be retired.
+ self.log("currently surprised, so don't send any new shares",
+ level=log.NOISY)
+ else:
+ self.update_goal()
+ # how far are we from our goal?
+ needed = self.goal - self.placed - self.outstanding
+ self._update_status()
+
+ if needed:
+ # we need to send out new shares
+ self.log(format="need to send %(needed)d new shares",
+ needed=len(needed), level=log.NOISY)
+ self._send_shares(needed)
+ return
if self.outstanding:
# queries are still pending, keep waiting
self._status.timings["push"] = elapsed
return self._done(None)
- def log_goal(self, goal):
- logmsg = []
- for (peerid, shnum) in goal:
+ def log_goal(self, goal, message=""):
+ logmsg = [message]
+ for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]):
logmsg.append("sh%d to [%s]" % (shnum,
idlib.shortnodeid_b2a(peerid)))
self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
level=log.NOISY)
def update_goal(self):
+ # if log.recording_noisy
+ if True:
+ self.log_goal(self.goal, "before update: ")
+
# first, remove any bad peers from our goal
self.goal = set([ (peerid, shnum)
for (peerid, shnum) in self.goal
if not homeless_shares:
return
- # if log.recording_noisy
- if False:
- self.log_goal(self.goal)
-
# if an old share X is on a node, put the new share X there too.
# TODO: 1: redistribute shares to achieve one-per-peer, by copying
# shares from existing peers to new (less-crowded) ones. The
peerlist = []
for i, (peerid, ss) in enumerate(self.full_peerlist):
+ if peerid in self.bad_peers:
+ continue
entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
peerlist.append(entry)
peerlist.sort()
+ if not peerlist:
+ raise NotEnoughServersError("Ran out of non-bad servers")
+
new_assignments = []
# we then index this peerlist with an integer, because we may have to
# wrap. We update the goal as we go.
i = 0
for shnum in homeless_shares:
(ignored1, ignored2, peerid, ss) = peerlist[i]
+ # TODO: if we are forced to send a share to a server that already
+ # has one, we may have two write requests in flight, and the
+ # servermap (which was computed before either request was sent)
+ # won't reflect the new shares, so the second response will cause
+ # us to be surprised ("unexpected share on peer"), causing the
+ # publish to fail with an UncoordinatedWriteError. This is
+ # troublesome but not really a bit problem. Fix it at some point.
self.goal.add( (peerid, shnum) )
self.connections[peerid] = ss
i += 1
if i >= len(peerlist):
i = 0
+ if True:
+ self.log_goal(self.goal, "after update: ")
read_vector = [(0, struct.calcsize(SIGNED_PREFIX))]
# ok, send the messages!
+ self.log("sending %d shares" % len(all_tw_vectors), level=log.NOISY)
started = time.time()
- dl = []
for (peerid, tw_vectors) in all_tw_vectors.items():
write_enabler = self._node.get_write_enabler(peerid)
secrets = (write_enabler, renew_secret, cancel_secret)
shnums = tw_vectors.keys()
+ for shnum in shnums:
+ self.outstanding.add( (peerid, shnum) )
+
d = self._do_testreadwrite(peerid, secrets,
tw_vectors, read_vector)
d.addCallbacks(self._got_write_answer, self._got_write_error,
callbackArgs=(peerid, shnums, started),
errbackArgs=(peerid, shnums, started))
+ d.addCallback(self.loop)
d.addErrback(self._fatal_error)
- dl.append(d)
self._update_status()
- return defer.DeferredList(dl, fireOnOneErrback=True) # just for testing
+ self.log("%d shares sent" % len(all_tw_vectors), level=log.NOISY)
def _do_testreadwrite(self, peerid, secrets,
tw_vectors, read_vector):
wrote, read_data = answer
+ surprise_shares = set(read_data.keys()) - set(shnums)
+ if surprise_shares:
+ self.log("they had shares %s that we didn't know about" %
+ (list(surprise_shares),),
+ parent=lp, level=log.WEIRD)
+ self.surprised = True
+
if not wrote:
+ # TODO: there are two possibilities. The first is that the server
+ # is full (or just doesn't want to give us any room), which means
+ # we shouldn't ask them again, but is *not* an indication of an
+ # uncoordinated write. The second is that our testv failed, which
+ # *does* indicate an uncoordinated write. We currently don't have
+ # a way to tell these two apart (in fact, the storage server code
+ # doesn't have the option of refusing our share).
+ #
+ # If the server is full, mark the peer as bad (so we don't ask
+ # them again), but don't set self.surprised. The loop() will find
+ # a new server.
+ #
+ # If the testv failed, log it, set self.surprised, but don't
+ # bother adding to self.bad_peers .
+
self.log("our testv failed, so the write did not happen",
parent=lp, level=log.WEIRD)
self.surprised = True
other_salt) = unpack_checkstring(checkstring)
expected_version = self._servermap.version_on_peer(peerid,
shnum)
- (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
- offsets_tuple) = expected_version
- self.log("somebody modified the share on us:"
- " shnum=%d: I thought they had #%d:R=%s,"
- " but testv reported #%d:R=%s" %
- (shnum,
- seqnum, base32.b2a(root_hash)[:4],
- other_seqnum, base32.b2a(other_roothash)[:4]),
- parent=lp, level=log.NOISY)
+ if expected_version:
+ (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+ offsets_tuple) = expected_version
+ self.log("somebody modified the share on us:"
+ " shnum=%d: I thought they had #%d:R=%s,"
+ " but testv reported #%d:R=%s" %
+ (shnum,
+ seqnum, base32.b2a(root_hash)[:4],
+ other_seqnum, base32.b2a(other_roothash)[:4]),
+ parent=lp, level=log.NOISY)
+ # if expected_version==None, then we didn't expect to see a
+ # share on that peer, and the 'surprise_shares' clause above
+ # will have logged it.
# self.loop() will take care of finding new homes
return
self._servermap.add_new_share(peerid, shnum,
self.versioninfo, started)
- surprise_shares = set(read_data.keys()) - set(shnums)
- if surprise_shares:
- self.log("they had shares %s that we didn't know about" %
- (list(surprise_shares),),
- parent=lp, level=log.WEIRD)
- self.surprised = True
- return
-
# self.loop() will take care of checking to see if we're done
return
return
-
- def _log_dispatch_map(self, dispatch_map):
- for shnum, places in dispatch_map.items():
- sent_to = [(idlib.shortnodeid_b2a(peerid),
- seqnum,
- base32.b2a(root_hash)[:4])
- for (peerid,seqnum,root_hash) in places]
- self.log(" share %d sent to: %s" % (shnum, sent_to),
- level=log.NOISY)
-
- def _maybe_recover(self, (surprised, dispatch_map)):
- self.log("_maybe_recover, surprised=%s, dispatch_map:" % surprised,
- level=log.NOISY)
- self._log_dispatch_map(dispatch_map)
- if not surprised:
- self.log(" no recovery needed")
- return
- self.log("We need recovery!", level=log.WEIRD)
- print "RECOVERY NOT YET IMPLEMENTED"
- # but dispatch_map will help us do it
- raise UncoordinatedWriteError("I was surprised!")
-
def _done(self, res):
if not self._running:
return
self._status.timings["total"] = now - self._started
self._status.set_active(False)
if isinstance(res, failure.Failure):
- self.log("Retrieve done, with failure", failure=res)
+ self.log("Publish done, with failure", failure=res, level=log.WEIRD)
self._status.set_status("Failed")
+ elif self.surprised:
+ self.log("Publish done, UncoordinatedWriteError", level=log.UNUSUAL)
+ self._status.set_status("UncoordinatedWriteError")
+ # deliver a failure
+ res = failure.Failure(UncoordinatedWriteError())
+ # TODO: recovery
else:
+ self.log("Publish done, success")
self._status.set_status("Done")
self._status.set_progress(1.0)
eventually(self.done_deferred.callback, res)
- def get_status(self):
- return self._status
-
-import struct
+import os, struct
from cStringIO import StringIO
from twisted.trial import unittest
from twisted.internet import defer, reactor
-from allmydata import uri, download
-from allmydata.util import base32, testutil
+from allmydata import uri, download, storage
+from allmydata.util import base32, testutil, idlib
from allmydata.util.idlib import shortnodeid_b2a
from allmydata.util.hashutil import tagged_hash
+from allmydata.util.fileutil import make_dirs
from allmydata.encode import NotEnoughSharesError
from allmydata.interfaces import IURI, IMutableFileURI, IUploadable
from foolscap.eventual import eventually, fireEventually
ucwe = UncoordinatedWriteError()
self.failUnless("UncoordinatedWriteError" in repr(ucwe), repr(ucwe))
+# we can't do this test with a FakeClient, since it uses FakeStorageServer
+# instances which always succeed. So we need a less-fake one.
+
+class IntentionalError(Exception):
+ pass
+
+class LocalWrapper:
+ def __init__(self, original):
+ self.original = original
+ self.broken = False
+ def callRemote(self, methname, *args, **kwargs):
+ def _call():
+ if self.broken:
+ raise IntentionalError("I was asked to break")
+ meth = getattr(self.original, "remote_" + methname)
+ return meth(*args, **kwargs)
+ d = fireEventually()
+ d.addCallback(lambda res: _call())
+ return d
+
+class LessFakeClient(FakeClient):
+
+ def __init__(self, basedir, num_peers=10):
+ self._num_peers = num_peers
+ self._peerids = [tagged_hash("peerid", "%d" % i)[:20]
+ for i in range(self._num_peers)]
+ self._connections = {}
+ for peerid in self._peerids:
+ peerdir = os.path.join(basedir, idlib.shortnodeid_b2a(peerid))
+ make_dirs(peerdir)
+ ss = storage.StorageServer(peerdir)
+ ss.setNodeID(peerid)
+ lw = LocalWrapper(ss)
+ self._connections[peerid] = lw
+ self.nodeid = "fakenodeid"
+
+
+class Problems(unittest.TestCase, testutil.ShouldFailMixin):
+ def test_surprise(self):
+ basedir = os.path.join("mutable/CollidingWrites/test_surprise")
+ self.client = LessFakeClient(basedir)
+ d = self.client.create_mutable_file("contents 1")
+ def _created(n):
+ d = defer.succeed(None)
+ d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
+ def _got_smap1(smap):
+ # stash the old state of the file
+ self.old_map = smap
+ d.addCallback(_got_smap1)
+ # then modify the file, leaving the old map untouched
+ d.addCallback(lambda res: log.msg("starting winning write"))
+ d.addCallback(lambda res: n.overwrite("contents 2"))
+ # now attempt to modify the file with the old servermap. This
+ # will look just like an uncoordinated write, in which every
+ # single share got updated between our mapupdate and our publish
+ d.addCallback(lambda res: log.msg("starting doomed write"))
+ d.addCallback(lambda res:
+ self.shouldFail(UncoordinatedWriteError,
+ "test_surprise", None,
+ n.upload,
+ "contents 2a", self.old_map))
+ return d
+ d.addCallback(_created)
+ return d
+
+ def test_unexpected_shares(self):
+ # upload the file, take a servermap, shut down one of the servers,
+ # upload it again (causing shares to appear on a new server), then
+ # upload using the old servermap. The last upload should fail with an
+ # UncoordinatedWriteError, because of the shares that didn't appear
+ # in the servermap.
+ basedir = os.path.join("mutable/CollidingWrites/test_unexpexted_shares")
+ self.client = LessFakeClient(basedir)
+ d = self.client.create_mutable_file("contents 1")
+ def _created(n):
+ d = defer.succeed(None)
+ d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
+ def _got_smap1(smap):
+ # stash the old state of the file
+ self.old_map = smap
+ # now shut down one of the servers
+ peer0 = list(smap.make_sharemap()[0])[0]
+ self.client._connections.pop(peer0)
+ # then modify the file, leaving the old map untouched
+ log.msg("starting winning write")
+ return n.overwrite("contents 2")
+ d.addCallback(_got_smap1)
+ # now attempt to modify the file with the old servermap. This
+ # will look just like an uncoordinated write, in which every
+ # single share got updated between our mapupdate and our publish
+ d.addCallback(lambda res: log.msg("starting doomed write"))
+ d.addCallback(lambda res:
+ self.shouldFail(UncoordinatedWriteError,
+ "test_surprise", None,
+ n.upload,
+ "contents 2a", self.old_map))
+ return d
+ d.addCallback(_created)
+ return d
+
+ def test_bad_server(self):
+ # Break one server, then create the file: the initial publish should
+ # complete with an alternate server. Breaking a second server should
+ # not prevent an update from succeeding either.
+ basedir = os.path.join("mutable/CollidingWrites/test_bad_server")
+ self.client = LessFakeClient(basedir, 20)
+ # to make sure that one of the initial peers is broken, we have to
+ # get creative. We create the keys, so we can figure out the storage
+ # index, but we hold off on doing the initial publish until we've
+ # broken the server on which the first share wants to be stored.
+ n = FastMutableFileNode(self.client)
+ d = defer.succeed(None)
+ d.addCallback(n._generate_pubprivkeys)
+ d.addCallback(n._generated)
+ def _break_peer0(res):
+ si = n.get_storage_index()
+ peerlist = self.client.get_permuted_peers("storage", si)
+ peerid0, connection0 = peerlist[0]
+ peerid1, connection1 = peerlist[1]
+ connection0.broken = True
+ self.connection1 = connection1
+ d.addCallback(_break_peer0)
+ # now let the initial publish finally happen
+ d.addCallback(lambda res: n._upload("contents 1", None))
+ # that ought to work
+ d.addCallback(lambda res: n.download_best_version())
+ d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
+ # now break the second peer
+ def _break_peer1(res):
+ self.connection1.broken = True
+ d.addCallback(_break_peer1)
+ d.addCallback(lambda res: n.overwrite("contents 2"))
+ # that ought to work too
+ d.addCallback(lambda res: n.download_best_version())
+ d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
+ return d
+