from allmydata.mutable.common import NotWriteableError
from allmydata.mutable import layout as mutable_layout
from allmydata.mutable.publish import MutableData
-from foolscap.api import DeadReferenceError
+from foolscap.api import DeadReferenceError, fireEventually
from twisted.python.failure import Failure
from twisted.web.client import getPage
from twisted.web.error import Error
if convergence is not None:
d.addCallback(_upload_duplicate_with_helper)
+ d.addCallback(fireEventually)
+
def _upload_resumable(res):
DATA = "Data that needs help to upload and gets interrupted" * 1000
u1 = CountingDataUploadable(DATA, convergence=convergence)
u2 = CountingDataUploadable(DATA, convergence=convergence)
# we interrupt the connection after about 5kB by shutting down
- # the helper, then restartingit.
+ # the helper, then restarting it.
u1.interrupt_after = 5000
u1.interrupt_after_d = defer.Deferred()
- u1.interrupt_after_d.addCallback(lambda res:
- self.bounce_client(0))
+ bounced_d = defer.Deferred()
+ def _do_bounce(res):
+ d = self.bounce_client(0)
+ d.addBoth(bounced_d.callback)
+ u1.interrupt_after_d.addCallback(_do_bounce)
# sneak into the helper and reduce its chunk size, so that our
# debug_interrupt will sever the connection on about the fifth
# this same test run, but I'm not currently worried about it.
offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
- d = self.extra_node.upload(u1)
-
- def _should_not_finish(res):
- self.fail("interrupted upload should have failed, not finished"
- " with result %s" % (res,))
- def _interrupted(f):
- f.trap(DeadReferenceError)
-
- # make sure we actually interrupted it before finishing the
- # file
- self.failUnless(u1.bytes_read < len(DATA),
- "read %d out of %d total" % (u1.bytes_read,
- len(DATA)))
-
- log.msg("waiting for reconnect", level=log.NOISY,
- facility="tahoe.test.test_system")
- # now, we need to give the nodes a chance to notice that this
- # connection has gone away. When this happens, the storage
- # servers will be told to abort their uploads, removing the
- # partial shares. Unfortunately this involves TCP messages
- # going through the loopback interface, and we can't easily
- # predict how long that will take. If it were all local, we
- # could use fireEventually() to stall. Since we don't have
- # the right introduction hooks, the best we can do is use a
- # fixed delay. TODO: this is fragile.
- u1.interrupt_after_d.addCallback(self.stall, 2.0)
- return u1.interrupt_after_d
- d.addCallbacks(_should_not_finish, _interrupted)
+ upload_d = self.extra_node.upload(u1)
+ # The upload will start, and bounce_client() will be called after
+ # about 5kB. bounced_d will fire after bounce_client() finishes
+ # shutting down and restarting the node.
+ d = bounced_d
+ def _bounced(ign):
+ # By this point, the upload should have failed because of the
+ # interruption. upload_d will fire in a moment
+ def _should_not_finish(res):
+ self.fail("interrupted upload should have failed, not"
+ " finished with result %s" % (res,))
+ def _interrupted(f):
+ f.trap(DeadReferenceError)
+ # make sure we actually interrupted it before finishing
+ # the file
+ self.failUnless(u1.bytes_read < len(DATA),
+ "read %d out of %d total" %
+ (u1.bytes_read, len(DATA)))
+ upload_d.addCallbacks(_should_not_finish, _interrupted)
+ return upload_d
+ d.addCallback(_bounced)
def _disconnected(res):
# check to make sure the storage servers aren't still hanging
d.addCallback(lambda res:
log.msg("wait_for_connections", level=log.NOISY,
facility="tahoe.test.test_system"))
- d.addCallback(lambda res: self.wait_for_connections())
-
d.addCallback(lambda res:
log.msg("uploading again", level=log.NOISY,