From: Brian Warner Date: Thu, 17 Jan 2008 08:18:10 +0000 (-0700) Subject: offloaded: update unit tests: assert that interrupt/resume works, and that the helper... X-Git-Tag: allmydata-tahoe-0.8.0~296 X-Git-Url: https://git.rkrishnan.org/pf/index.php?a=commitdiff_plain;h=fd0dc3013c3da1bb800974a1b56deda2ea86bb6b;p=tahoe-lafs%2Ftahoe-lafs.git offloaded: update unit tests: assert that interrupt/resume works, and that the helper deletes tempfiles --- diff --git a/src/allmydata/test/test_helper.py b/src/allmydata/test/test_helper.py index 6eee330c..7707a151 100644 --- a/src/allmydata/test/test_helper.py +++ b/src/allmydata/test/test_helper.py @@ -1,4 +1,5 @@ +import os from twisted.trial import unittest from twisted.application import service @@ -6,7 +7,7 @@ from foolscap import Tub, eventual from foolscap.logging import log from allmydata import upload, offloaded -from allmydata.util import hashutil +from allmydata.util import hashutil, fileutil MiB = 1024*1024 @@ -61,10 +62,12 @@ class AssistedUpload(unittest.TestCase): # bogus host/port t.setLocation("bogus:1234") - self.helper = h = offloaded.Helper(".") + def setUpHelper(self, basedir): + fileutil.make_dirs(basedir) + self.helper = h = offloaded.Helper(basedir) h.chk_upload_helper_class = CHKUploadHelper_fake h.setServiceParent(self.s) - self.helper_furl = t.registerReference(h) + self.helper_furl = self.tub.registerReference(h) def tearDown(self): d = self.s.stopService() @@ -74,6 +77,8 @@ class AssistedUpload(unittest.TestCase): def test_one(self): + self.basedir = "helper/AssistedUpload/test_one" + self.setUpHelper(self.basedir) u = upload.Uploader(self.helper_furl) u.setServiceParent(self.s) @@ -92,10 +97,19 @@ class AssistedUpload(unittest.TestCase): assert "CHK" in uri d.addCallback(_uploaded) + def _check_empty(res): + files = os.listdir(os.path.join(self.basedir, "CHK_encoding")) + self.failUnlessEqual(files, []) + files = os.listdir(os.path.join(self.basedir, "CHK_incoming")) + self.failUnlessEqual(files, []) + d.addCallback(_check_empty) + return d def test_already_uploaded(self): + self.basedir = "helper/AssistedUpload/test_already_uploaded" + self.setUpHelper(self.basedir) self.helper.chk_upload_helper_class = CHKUploadHelper_already_uploaded u = upload.Uploader(self.helper_furl) u.setServiceParent(self.s) @@ -115,4 +129,11 @@ class AssistedUpload(unittest.TestCase): assert "CHK" in uri d.addCallback(_uploaded) + def _check_empty(res): + files = os.listdir(os.path.join(self.basedir, "CHK_encoding")) + self.failUnlessEqual(files, []) + files = os.listdir(os.path.join(self.basedir, "CHK_incoming")) + self.failUnlessEqual(files, []) + d.addCallback(_check_empty) + return d diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 95085b1a..518184b0 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -7,15 +7,15 @@ from twisted.internet import defer, reactor from twisted.internet import threads # CLI tests use deferToThread from twisted.internet.error import ConnectionDone from twisted.application import service -from allmydata import client, uri, download, upload, storage, mutable +from allmydata import client, uri, download, upload, storage, mutable, offloaded from allmydata.introducer import IntroducerNode from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil +from allmydata.util import log from allmydata.scripts import runner from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI from allmydata.mutable import NotMutableError from foolscap.eventual import flushEventualQueue from foolscap import DeadReferenceError -from twisted.python import log from twisted.python.failure import Failure from twisted.web.client import getPage from twisted.web.error import Error @@ -32,13 +32,6 @@ This is some data to publish to the virtual drive, which needs to be large enough to not fit inside a LIT uri. """ -class SmallSegmentDataUploadable(upload.Data): - def __init__(self, max_segment_size, *args, **kwargs): - self._max_segment_size = max_segment_size - upload.Data.__init__(self, *args, **kwargs) - def get_maximum_segment_size(self): - return defer.succeed(self._max_segment_size) - class SystemTest(testutil.SignalMixin, unittest.TestCase): def setUp(self): @@ -210,7 +203,9 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): # tail segment is not the same length as the others. This actualy # gets rounded up to 1025 to be a multiple of the number of # required shares (since we use 25 out of 100 FEC). - d1 = u.upload(SmallSegmentDataUploadable(1024, DATA)) + up = upload.Data(DATA) + up.max_segment_size = 1024 + d1 = u.upload(up) return d1 d.addCallback(_do_upload) def _upload_done(uri): @@ -226,7 +221,9 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): # the roothash), we have to do all of the encoding work, and only # get to save on the upload part. log.msg("UPLOADING AGAIN") - d1 = self.uploader.upload(SmallSegmentDataUploadable(1024, DATA)) + up = upload.Data(DATA) + up.max_segment_size = 1024 + d1 = self.uploader.upload(up) d.addCallback(_upload_again) def _download_to_data(res): @@ -299,38 +296,37 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): def _upload_resumable(res): DATA = "Data that needs help to upload and gets interrupted" * 1000 - u = upload.Data(DATA) - # interrupt the first upload after 5kB - print "GOING" - from allmydata.util import log - options = {"debug_interrupt": 5000, - "debug_stash_RemoteEncryptedUploadable": True, - } - # sneak into the helper and reduce its segment size, so that our + u1 = upload.Data(DATA) + u2 = upload.Data(DATA) + + # tell the upload to drop the connection after about 5kB + u1.debug_interrupt = 5000 + u1.debug_stash_RemoteEncryptedUploadable = True + u2.debug_stash_RemoteEncryptedUploadable = True + # sneak into the helper and reduce its chunk size, so that our # debug_interrupt will sever the connection on about the fifth - # segment fetched. This makes sure that we've started to write - # the new shares before we abandon them, which exercises the - # abort/delete-partial-share code. - o2 = {"max_segment_size": 1000} - self.clients[0].getServiceNamed("helper")._chk_options = o2 + # chunk fetched. This makes sure that we've started to write the + # new shares before we abandon them, which exercises the + # abort/delete-partial-share code. TODO: find a cleaner way to do + # this. I know that this will affect later uses of the helper in + # this same test run, but I'm not currently worried about it. + offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000 - d = self.extra_node.upload(u, options) + d = self.extra_node.upload(u1) def _should_not_finish(res): self.fail("interrupted upload should have failed, not finished" " with result %s" % (res,)) def _interrupted(f): - print "interrupted" - log.msg("interrupted", level=log.WEIRD, failure=f) f.trap(ConnectionDone, DeadReferenceError) - reu = options["RemoteEncryptedUploadable"] - print "REU.bytes", reu._bytes_read + reu = u1.debug_RemoteEncryptedUploadable # make sure we actually interrupted it before finishing the # file - self.failUnless(reu._bytes_read < len(DATA), - "read %d out of %d total" % (reu._bytes_read, + self.failUnless(reu._bytes_sent < len(DATA), + "read %d out of %d total" % (reu._bytes_sent, len(DATA))) - log.msg("waiting for reconnect", level=log.WEIRD) + log.msg("waiting for reconnect", level=log.NOISY, + facility="tahoe.test.test_system") # now, we need to give the nodes a chance to notice that this # connection has gone away. When this happens, the storage # servers will be told to abort their uploads, removing the @@ -347,8 +343,8 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): # check to make sure the storage servers aren't still hanging # on to the partial share: their incoming/ directories should # now be empty. - print "disconnected" - log.msg("disconnected", level=log.WEIRD) + log.msg("disconnected", level=log.NOISY, + facility="tahoe.test.test_system") for i in range(self.numclients): incdir = os.path.join(self.getdir("client%d" % i), "storage", "shares", "incoming") @@ -358,35 +354,43 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): def _wait_for_reconnect(res): # then we need to give the reconnector a chance to # reestablish the connection to the helper. - d.addCallback(lambda res: log.msg("wait_for_connections", - level=log.WEIRD)) + d.addCallback(lambda res: + log.msg("wait_for_connections", level=log.NOISY, + facility="tahoe.test.test_system")) d.addCallback(lambda res: self.wait_for_connections()) d.addCallback(_wait_for_reconnect) - options2 = {"debug_stash_RemoteEncryptedUploadable": True} + def _upload_again(res): - print "uploading again" - log.msg("uploading again", level=log.WEIRD) - return self.extra_node.upload(u, options2) + log.msg("uploading again", level=log.NOISY, + facility="tahoe.test.test_system") + return self.extra_node.upload(u2) d.addCallbacks(_upload_again) def _uploaded(uri): - log.msg("I think its uploaded", level=log.WEIRD) - print "I tunk its uploaded", uri - reu = options2["RemoteEncryptedUploadable"] - print "REU.bytes", reu._bytes_read + log.msg("Second upload complete", level=log.NOISY, + facility="tahoe.test.test_system") + reu = u2.debug_RemoteEncryptedUploadable # make sure we didn't read the whole file the second time # around - #self.failUnless(reu._bytes_read < len(DATA), - # "resumption didn't save us any work:" - # " read %d bytes out of %d total" % - # (reu._bytes_read, len(DATA))) + self.failUnless(reu._bytes_sent < len(DATA), + "resumption didn't save us any work:" + " read %d bytes out of %d total" % + (reu._bytes_sent, len(DATA))) return self.downloader.download_to_data(uri) d.addCallback(_uploaded) + def _check(newdata): self.failUnlessEqual(newdata, DATA) + # also check that the helper has removed the temp file from + # its directories + basedir = os.path.join(self.getdir("client0"), "helper") + files = os.listdir(os.path.join(basedir, "CHK_encoding")) + self.failUnlessEqual(files, []) + files = os.listdir(os.path.join(basedir, "CHK_incoming")) + self.failUnlessEqual(files, []) d.addCallback(_check) return d - #d.addCallback(_upload_resumable) + d.addCallback(_upload_resumable) return d test_upload_and_download.timeout = 4800