]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
offloaded: update unit tests: assert that interrupt/resume works, and that the helper...
authorBrian Warner <warner@lothar.com>
Thu, 17 Jan 2008 08:18:10 +0000 (01:18 -0700)
committerBrian Warner <warner@lothar.com>
Thu, 17 Jan 2008 08:18:10 +0000 (01:18 -0700)
src/allmydata/test/test_helper.py
src/allmydata/test/test_system.py

index 6eee330ca8c0b300c5d3de51f03159d269552ac8..7707a15158a1318810c8486d1d5992921904d2ef 100644 (file)
@@ -1,4 +1,5 @@
 
+import os
 from twisted.trial import unittest
 from twisted.application import service
 
@@ -6,7 +7,7 @@ from foolscap import Tub, eventual
 from foolscap.logging import log
 
 from allmydata import upload, offloaded
-from allmydata.util import hashutil
+from allmydata.util import hashutil, fileutil
 
 MiB = 1024*1024
 
@@ -61,10 +62,12 @@ class AssistedUpload(unittest.TestCase):
         # bogus host/port
         t.setLocation("bogus:1234")
 
-        self.helper = h = offloaded.Helper(".")
+    def setUpHelper(self, basedir):
+        fileutil.make_dirs(basedir)
+        self.helper = h = offloaded.Helper(basedir)
         h.chk_upload_helper_class = CHKUploadHelper_fake
         h.setServiceParent(self.s)
-        self.helper_furl = t.registerReference(h)
+        self.helper_furl = self.tub.registerReference(h)
 
     def tearDown(self):
         d = self.s.stopService()
@@ -74,6 +77,8 @@ class AssistedUpload(unittest.TestCase):
 
 
     def test_one(self):
+        self.basedir = "helper/AssistedUpload/test_one"
+        self.setUpHelper(self.basedir)
         u = upload.Uploader(self.helper_furl)
         u.setServiceParent(self.s)
 
@@ -92,10 +97,19 @@ class AssistedUpload(unittest.TestCase):
             assert "CHK" in uri
         d.addCallback(_uploaded)
 
+        def _check_empty(res):
+            files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
+            self.failUnlessEqual(files, [])
+            files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
+            self.failUnlessEqual(files, [])
+        d.addCallback(_check_empty)
+
         return d
 
 
     def test_already_uploaded(self):
+        self.basedir = "helper/AssistedUpload/test_already_uploaded"
+        self.setUpHelper(self.basedir)
         self.helper.chk_upload_helper_class = CHKUploadHelper_already_uploaded
         u = upload.Uploader(self.helper_furl)
         u.setServiceParent(self.s)
@@ -115,4 +129,11 @@ class AssistedUpload(unittest.TestCase):
             assert "CHK" in uri
         d.addCallback(_uploaded)
 
+        def _check_empty(res):
+            files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
+            self.failUnlessEqual(files, [])
+            files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
+            self.failUnlessEqual(files, [])
+        d.addCallback(_check_empty)
+
         return d
index 95085b1ade00722409d828579793717c59ac4c26..518184b080002ca3f2157e54d65a1e8d03e7a8fb 100644 (file)
@@ -7,15 +7,15 @@ from twisted.internet import defer, reactor
 from twisted.internet import threads # CLI tests use deferToThread
 from twisted.internet.error import ConnectionDone
 from twisted.application import service
-from allmydata import client, uri, download, upload, storage, mutable
+from allmydata import client, uri, download, upload, storage, mutable, offloaded
 from allmydata.introducer import IntroducerNode
 from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil
+from allmydata.util import log
 from allmydata.scripts import runner
 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
 from allmydata.mutable import NotMutableError
 from foolscap.eventual import flushEventualQueue
 from foolscap import DeadReferenceError
-from twisted.python import log
 from twisted.python.failure import Failure
 from twisted.web.client import getPage
 from twisted.web.error import Error
@@ -32,13 +32,6 @@ This is some data to publish to the virtual drive, which needs to be large
 enough to not fit inside a LIT uri.
 """
 
-class SmallSegmentDataUploadable(upload.Data):
-    def __init__(self, max_segment_size, *args, **kwargs):
-        self._max_segment_size = max_segment_size
-        upload.Data.__init__(self, *args, **kwargs)
-    def get_maximum_segment_size(self):
-        return defer.succeed(self._max_segment_size)
-
 class SystemTest(testutil.SignalMixin, unittest.TestCase):
 
     def setUp(self):
@@ -210,7 +203,9 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
             # tail segment is not the same length as the others. This actualy
             # gets rounded up to 1025 to be a multiple of the number of
             # required shares (since we use 25 out of 100 FEC).
-            d1 = u.upload(SmallSegmentDataUploadable(1024, DATA))
+            up = upload.Data(DATA)
+            up.max_segment_size = 1024
+            d1 = u.upload(up)
             return d1
         d.addCallback(_do_upload)
         def _upload_done(uri):
@@ -226,7 +221,9 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
             # the roothash), we have to do all of the encoding work, and only
             # get to save on the upload part.
             log.msg("UPLOADING AGAIN")
-            d1 = self.uploader.upload(SmallSegmentDataUploadable(1024, DATA))
+            up = upload.Data(DATA)
+            up.max_segment_size = 1024
+            d1 = self.uploader.upload(up)
         d.addCallback(_upload_again)
 
         def _download_to_data(res):
@@ -299,38 +296,37 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
 
         def _upload_resumable(res):
             DATA = "Data that needs help to upload and gets interrupted" * 1000
-            u = upload.Data(DATA)
-            # interrupt the first upload after 5kB
-            print "GOING"
-            from allmydata.util import log
-            options = {"debug_interrupt": 5000,
-                       "debug_stash_RemoteEncryptedUploadable": True,
-                       }
-            # sneak into the helper and reduce its segment size, so that our
+            u1 = upload.Data(DATA)
+            u2 = upload.Data(DATA)
+
+            # tell the upload to drop the connection after about 5kB
+            u1.debug_interrupt = 5000
+            u1.debug_stash_RemoteEncryptedUploadable = True
+            u2.debug_stash_RemoteEncryptedUploadable = True
+            # sneak into the helper and reduce its chunk size, so that our
             # debug_interrupt will sever the connection on about the fifth
-            # segment fetched. This makes sure that we've started to write
-            # the new shares before we abandon them, which exercises the
-            # abort/delete-partial-share code.
-            o2 = {"max_segment_size": 1000}
-            self.clients[0].getServiceNamed("helper")._chk_options = o2
+            # chunk fetched. This makes sure that we've started to write the
+            # new shares before we abandon them, which exercises the
+            # abort/delete-partial-share code. TODO: find a cleaner way to do
+            # this. I know that this will affect later uses of the helper in
+            # this same test run, but I'm not currently worried about it.
+            offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
 
-            d = self.extra_node.upload(u, options)
+            d = self.extra_node.upload(u1)
 
             def _should_not_finish(res):
                 self.fail("interrupted upload should have failed, not finished"
                           " with result %s" % (res,))
             def _interrupted(f):
-                print "interrupted"
-                log.msg("interrupted", level=log.WEIRD, failure=f)
                 f.trap(ConnectionDone, DeadReferenceError)
-                reu = options["RemoteEncryptedUploadable"]
-                print "REU.bytes", reu._bytes_read
+                reu = u1.debug_RemoteEncryptedUploadable
                 # make sure we actually interrupted it before finishing the
                 # file
-                self.failUnless(reu._bytes_read < len(DATA),
-                                "read %d out of %d total" % (reu._bytes_read,
+                self.failUnless(reu._bytes_sent < len(DATA),
+                                "read %d out of %d total" % (reu._bytes_sent,
                                                              len(DATA)))
-                log.msg("waiting for reconnect", level=log.WEIRD)
+                log.msg("waiting for reconnect", level=log.NOISY,
+                        facility="tahoe.test.test_system")
                 # now, we need to give the nodes a chance to notice that this
                 # connection has gone away. When this happens, the storage
                 # servers will be told to abort their uploads, removing the
@@ -347,8 +343,8 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
                 # check to make sure the storage servers aren't still hanging
                 # on to the partial share: their incoming/ directories should
                 # now be empty.
-                print "disconnected"
-                log.msg("disconnected", level=log.WEIRD)
+                log.msg("disconnected", level=log.NOISY,
+                        facility="tahoe.test.test_system")
                 for i in range(self.numclients):
                     incdir = os.path.join(self.getdir("client%d" % i),
                                           "storage", "shares", "incoming")
@@ -358,35 +354,43 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
             def _wait_for_reconnect(res):
                 # then we need to give the reconnector a chance to
                 # reestablish the connection to the helper.
-                d.addCallback(lambda res: log.msg("wait_for_connections",
-                                                  level=log.WEIRD))
+                d.addCallback(lambda res:
+                              log.msg("wait_for_connections", level=log.NOISY,
+                                      facility="tahoe.test.test_system"))
                 d.addCallback(lambda res: self.wait_for_connections())
             d.addCallback(_wait_for_reconnect)
-            options2 = {"debug_stash_RemoteEncryptedUploadable": True}
+
             def _upload_again(res):
-                print "uploading again"
-                log.msg("uploading again", level=log.WEIRD)
-                return self.extra_node.upload(u, options2)
+                log.msg("uploading again", level=log.NOISY,
+                        facility="tahoe.test.test_system")
+                return self.extra_node.upload(u2)
             d.addCallbacks(_upload_again)
 
             def _uploaded(uri):
-                log.msg("I think its uploaded", level=log.WEIRD)
-                print "I tunk its uploaded", uri
-                reu = options2["RemoteEncryptedUploadable"]
-                print "REU.bytes", reu._bytes_read
+                log.msg("Second upload complete", level=log.NOISY,
+                        facility="tahoe.test.test_system")
+                reu = u2.debug_RemoteEncryptedUploadable
                 # make sure we didn't read the whole file the second time
                 # around
-                #self.failUnless(reu._bytes_read < len(DATA),
-                #                "resumption didn't save us any work:"
-                #                " read %d bytes out of %d total" %
-                #                (reu._bytes_read, len(DATA)))
+                self.failUnless(reu._bytes_sent < len(DATA),
+                                "resumption didn't save us any work:"
+                                " read %d bytes out of %d total" %
+                                (reu._bytes_sent, len(DATA)))
                 return self.downloader.download_to_data(uri)
             d.addCallback(_uploaded)
+
             def _check(newdata):
                 self.failUnlessEqual(newdata, DATA)
+                # also check that the helper has removed the temp file from
+                # its directories
+                basedir = os.path.join(self.getdir("client0"), "helper")
+                files = os.listdir(os.path.join(basedir, "CHK_encoding"))
+                self.failUnlessEqual(files, [])
+                files = os.listdir(os.path.join(basedir, "CHK_incoming"))
+                self.failUnlessEqual(files, [])
             d.addCallback(_check)
             return d
-        #d.addCallback(_upload_resumable)
+        d.addCallback(_upload_resumable)
 
         return d
     test_upload_and_download.timeout = 4800