]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
offloaded: improve logging, pass through options, get ready for testing interrupted...
authorBrian Warner <warner@allmydata.com>
Tue, 15 Jan 2008 04:24:26 +0000 (21:24 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 15 Jan 2008 04:24:26 +0000 (21:24 -0700)
src/allmydata/offloaded.py
src/allmydata/test/test_system.py

index 102ae44727b6924fbbbd84850f10aee985fc0910..979204f9b86f54e62b85911862ad9ba1efebb9a9 100644 (file)
@@ -4,6 +4,7 @@ from twisted.application import service
 from twisted.internet import defer
 from foolscap import Referenceable
 from allmydata import upload, interfaces
+from allmydata.util import idlib
 
 
 
@@ -14,17 +15,26 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
     """
     implements(interfaces.RICHKUploadHelper)
 
-    def __init__(self, storage_index, helper):
+    def __init__(self, storage_index, helper, log_number, options={}):
         self._finished = False
         self._storage_index = storage_index
         self._helper = helper
-        self._log_number = self._helper.log("CHKUploadHelper starting")
+        upload_id = idlib.b2a(storage_index)[:6]
+        self._log_number = log_number
+        self._helper.log("CHKUploadHelper starting for SI %s" % upload_id,
+                         parent=log_number)
 
         self._client = helper.parent
-        self._options = {}
+        self._options = options
+        self._readers = []
 
         self.set_params( (3,7,10) ) # GACK
 
+    def log(self, *args, **kwargs):
+        if 'facility' not in kwargs:
+            kwargs['facility'] = "tahoe.helper"
+        return upload.CHKUploader.log(self, *args, **kwargs)
+
     def start(self):
         # determine if we need to upload the file. If so, return ({},self) .
         # If not, return (UploadResults,None) .
@@ -35,6 +45,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
         # reader is an RIEncryptedUploadable. I am specified to return an
         # UploadResults dictionary.
 
+        self._readers.append(reader)
+        reader.notifyOnDisconnect(self._remove_reader, reader)
         eu = CiphertextReader(reader, self._storage_index)
         d = self.start_encrypted(eu)
         def _done(res):
@@ -44,6 +56,13 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
         d.addCallback(_done)
         return d
 
+    def _remove_reader(self, reader):
+        # NEEDS MORE
+        self._readers.remove(reader)
+        if not self._readers:
+            if not self._finished:
+                self.finished(None)
+
     def finished(self, res):
         self._finished = True
         self._helper.upload_finished(self._storage_index)
@@ -89,24 +108,31 @@ class Helper(Referenceable, service.MultiService):
     # and send the request off to them. If nobody has it, we'll choose a
     # helper at random.
 
+    name = "helper"
     chk_upload_helper_class = CHKUploadHelper
 
     def __init__(self, basedir):
         self._basedir = basedir
+        self._chk_options = {}
         self._active_uploads = {}
         service.MultiService.__init__(self)
 
-    def log(self, msg, **kwargs):
+    def log(self, *args, **kwargs):
         if 'facility' not in kwargs:
-            kwargs['facility'] = "helper"
-        return self.parent.log(msg, **kwargs)
+            kwargs['facility'] = "tahoe.helper"
+        return self.parent.log(*args, **kwargs)
 
     def remote_upload_chk(self, storage_index):
+        lp = self.log(format="helper: upload_chk query for SI %(si)s",
+                      si=idlib.b2a(storage_index))
         # TODO: look on disk
         if storage_index in self._active_uploads:
+            self.log("upload is currently active", parent=lp)
             uh = self._active_uploads[storage_index]
         else:
-            uh = self.chk_upload_helper_class(storage_index, self)
+            self.log("creating new upload helper", parent=lp)
+            uh = self.chk_upload_helper_class(storage_index, self, lp,
+                                              self._chk_options)
             self._active_uploads[storage_index] = uh
         return uh.start()
 
index 5a978d005e54eb292c137b27158ccf3ceab7dda5..d787fad36ff69fd3b855823309c23efd22b1188d 100644 (file)
@@ -5,6 +5,7 @@ from cStringIO import StringIO
 from twisted.trial import unittest
 from twisted.internet import defer, reactor
 from twisted.internet import threads # CLI tests use deferToThread
+from twisted.internet.error import ConnectionDone
 from twisted.application import service
 from allmydata import client, uri, download, upload, storage, mutable
 from allmydata.introducer import IntroducerNode
@@ -12,7 +13,8 @@ from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil
 from allmydata.scripts import runner
 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
 from allmydata.mutable import NotMutableError
-from foolscap.eventual import flushEventualQueue
+from foolscap.eventual import fireEventually, flushEventualQueue
+from foolscap import DeadReferenceError
 from twisted.python import log
 from twisted.python.failure import Failure
 from twisted.web.client import getPage
@@ -87,6 +89,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
             f = open(os.path.join(basedirs[0],"private","helper.furl"), "r")
             helper_furl = f.read()
             f.close()
+            self.helper_furl = helper_furl
             f = open(os.path.join(basedirs[3],"helper.furl"), "w")
             f.write(helper_furl)
             f.close()
@@ -107,18 +110,27 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
         d.addCallback(_connected)
         return d
 
-    def add_extra_node(self, client_num):
-        # this node is *not* parented to our self.sparent, so we can shut it
-        # down separately from the rest, to exercise the connection-lost code
+    def add_extra_node(self, client_num, helper_furl=None,
+                       add_to_sparent=False):
+        # usually this node is *not* parented to our self.sparent, so we can
+        # shut it down separately from the rest, to exercise the
+        # connection-lost code
         basedir = self.getdir("client%d" % client_num)
         if not os.path.isdir(basedir):
             fileutil.make_dirs(basedir)
         open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
+        if helper_furl:
+            f = open(os.path.join(basedir, "helper.furl") ,"w")
+            f.write(helper_furl+"\n")
+            f.close()
 
         c = client.Client(basedir=basedir)
         self.clients.append(c)
         self.numclients += 1
-        c.startService()
+        if add_to_sparent:
+            c.setServiceParent(self.sparent)
+        else:
+            c.startService()
         d = self.wait_for_connections()
         d.addCallback(lambda res: c)
         return d
@@ -257,10 +269,20 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
             return d1
         d.addCallback(_download_nonexistent_uri)
 
+        # add a new node, which doesn't accept shares, and only uses the
+        # helper for upload.
+        d.addCallback(lambda res: self.add_extra_node(self.numclients,
+                                                      self.helper_furl,
+                                                      add_to_sparent=True))
+        def _added(extra_node):
+            self.extra_node = extra_node
+            extra_node.getServiceNamed("storageserver").sizelimit = 0
+        d.addCallback(_added)
+
         def _upload_with_helper(res):
             DATA = "Data that needs help to upload" * 1000
             u = upload.Data(DATA)
-            d = self.clients[3].upload(u)
+            d = self.extra_node.upload(u)
             def _uploaded(uri):
                 return self.downloader.download_to_data(uri)
             d.addCallback(_uploaded)
@@ -270,6 +292,104 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
             return d
         d.addCallback(_upload_with_helper)
 
+        def _upload_resumable(res):
+            DATA = "Data that needs help to upload and gets interrupted" * 1000
+            u = upload.Data(DATA)
+            # interrupt the first upload after 5kB
+            print "GOING"
+            from allmydata.util import log
+            options = {"debug_interrupt": 5000,
+                       "debug_stash_RemoteEncryptedUploadable": True,
+                       }
+            # sneak into the helper and reduce its segment size, so that our
+            # debug_interrupt will sever the connection on about the fifth
+            # segment fetched. This makes sure that we've started to write
+            # the new shares before we abandon them, which exercises the
+            # abort/delete-partial-share code.
+            o2 = {"max_segment_size": 1000}
+            self.clients[0].getServiceNamed("helper")._chk_options = o2
+
+            d = self.extra_node.upload(u, options)
+            def _eee(res):
+                log.msg("EEE: %s" % (res,))
+                print "EEE", res
+                d2 = defer.Deferred()
+                reactor.callLater(3, d2.callback, None)
+                return d2
+            #d.addBoth(_eee)
+            #return d
+
+            def _should_not_finish(res):
+                self.fail("interrupted upload should have failed, not finished"
+                          " with result %s" % (res,))
+            def _interrupted(f):
+                print "interrupted"
+                log.msg("interrupted", level=log.WEIRD, failure=f)
+                f.trap(ConnectionDone, DeadReferenceError)
+                reu = options["RemoteEncryptedUploabable"]
+                print "REU.bytes", reu._bytes_read
+                # make sure we actually interrupted it before finishing the
+                # file
+                self.failUnless(reu._bytes_read < len(DATA),
+                                "read %d out of %d total" % (reu._bytes_read,
+                                                             len(DATA)))
+                log.msg("waiting for reconnect", level=log.WEIRD)
+                # now, we need to give the nodes a chance to notice that this
+                # connection has gone away. When this happens, the storage
+                # servers will be told to abort their uploads, removing the
+                # partial shares. Unfortunately this involves TCP messages
+                # going through the loopback interface, and we can't easily
+                # predict how long that will take. If it were all local, we
+                # could use fireEventually() to stall. Since we don't have
+                # the right introduction hooks, the best we can do is use a
+                # fixed delay. TODO: this is fragile.
+                return self.stall(None, 2.0)
+            d.addCallbacks(_should_not_finish, _interrupted)
+
+            def _disconnected(res):
+                # check to make sure the storage servers aren't still hanging
+                # on to the partial share: their incoming/ directories should
+                # now be empty.
+                print "disconnected"
+                log.msg("disconnected", level=log.WEIRD)
+                for i in range(self.numclients):
+                    incdir = os.path.join(self.getdir("client%d" % i),
+                                          "storage", "shares", "incoming")
+                    self.failUnlessEqual(os.listdir(incdir), [])
+            d.addCallback(_disconnected)
+
+            def _wait_for_reconnect(res):
+                # then we need to give the reconnector a chance to
+                # reestablish the connection to the helper.
+                d.addCallback(lambda res: log.msg("wait_for_connections",
+                                                  level=log.WEIRD))
+                d.addCallback(lambda res: self.wait_for_connections())
+            d.addCallback(_wait_for_reconnect)
+            options2 = {"debug_stash_RemoteEncryptedUploadable": True}
+            def _upload_again(res):
+                print "uploading again"
+                log.msg("uploading again", level=log.WEIRD)
+                return self.extra_node.upload(u, options2)
+            d.addCallbacks(_upload_again)
+
+            def _uploaded(uri):
+                log.msg("I think its uploaded", level=log.WEIRD)
+                print "I tunk its uploaded", uri
+                reu = options2["RemoteEncryptedUploabable"]
+                print "REU.bytes", reu._bytes_read
+                # make sure we didn't read the whole file the second time
+                # around
+                self.failUnless(reu._bytes_read < len(DATA),
+                                "resumption didn't save us any work: read %d bytes out of %d total" %
+                                (reu._bytes_read, len(DATA)))
+                return self.downloader.download_to_data(uri)
+            d.addCallback(_uploaded)
+            def _check(newdata):
+                self.failUnlessEqual(newdata, DATA)
+            d.addCallback(_check)
+            return d
+        #d.addCallback(_upload_resumable)
+
         return d
     test_upload_and_download.timeout = 4800