From: Brian Warner Date: Tue, 15 Jan 2008 04:24:26 +0000 (-0700) Subject: offloaded: improve logging, pass through options, get ready for testing interrupted... X-Git-Url: https://git.rkrishnan.org/?a=commitdiff_plain;h=168a8c3b73fd4dfae0e4a7bc463b3f00ab3227c0;p=tahoe-lafs%2Ftahoe-lafs.git offloaded: improve logging, pass through options, get ready for testing interrupted uploads. test_system: add (disabled) interrupted-upload test --- diff --git a/src/allmydata/offloaded.py b/src/allmydata/offloaded.py index 102ae447..979204f9 100644 --- a/src/allmydata/offloaded.py +++ b/src/allmydata/offloaded.py @@ -4,6 +4,7 @@ from twisted.application import service from twisted.internet import defer from foolscap import Referenceable from allmydata import upload, interfaces +from allmydata.util import idlib @@ -14,17 +15,26 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): """ implements(interfaces.RICHKUploadHelper) - def __init__(self, storage_index, helper): + def __init__(self, storage_index, helper, log_number, options={}): self._finished = False self._storage_index = storage_index self._helper = helper - self._log_number = self._helper.log("CHKUploadHelper starting") + upload_id = idlib.b2a(storage_index)[:6] + self._log_number = log_number + self._helper.log("CHKUploadHelper starting for SI %s" % upload_id, + parent=log_number) self._client = helper.parent - self._options = {} + self._options = options + self._readers = [] self.set_params( (3,7,10) ) # GACK + def log(self, *args, **kwargs): + if 'facility' not in kwargs: + kwargs['facility'] = "tahoe.helper" + return upload.CHKUploader.log(self, *args, **kwargs) + def start(self): # determine if we need to upload the file. If so, return ({},self) . # If not, return (UploadResults,None) . @@ -35,6 +45,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): # reader is an RIEncryptedUploadable. I am specified to return an # UploadResults dictionary. + self._readers.append(reader) + reader.notifyOnDisconnect(self._remove_reader, reader) eu = CiphertextReader(reader, self._storage_index) d = self.start_encrypted(eu) def _done(res): @@ -44,6 +56,13 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): d.addCallback(_done) return d + def _remove_reader(self, reader): + # NEEDS MORE + self._readers.remove(reader) + if not self._readers: + if not self._finished: + self.finished(None) + def finished(self, res): self._finished = True self._helper.upload_finished(self._storage_index) @@ -89,24 +108,31 @@ class Helper(Referenceable, service.MultiService): # and send the request off to them. If nobody has it, we'll choose a # helper at random. + name = "helper" chk_upload_helper_class = CHKUploadHelper def __init__(self, basedir): self._basedir = basedir + self._chk_options = {} self._active_uploads = {} service.MultiService.__init__(self) - def log(self, msg, **kwargs): + def log(self, *args, **kwargs): if 'facility' not in kwargs: - kwargs['facility'] = "helper" - return self.parent.log(msg, **kwargs) + kwargs['facility'] = "tahoe.helper" + return self.parent.log(*args, **kwargs) def remote_upload_chk(self, storage_index): + lp = self.log(format="helper: upload_chk query for SI %(si)s", + si=idlib.b2a(storage_index)) # TODO: look on disk if storage_index in self._active_uploads: + self.log("upload is currently active", parent=lp) uh = self._active_uploads[storage_index] else: - uh = self.chk_upload_helper_class(storage_index, self) + self.log("creating new upload helper", parent=lp) + uh = self.chk_upload_helper_class(storage_index, self, lp, + self._chk_options) self._active_uploads[storage_index] = uh return uh.start() diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 5a978d00..d787fad3 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -5,6 +5,7 @@ from cStringIO import StringIO from twisted.trial import unittest from twisted.internet import defer, reactor from twisted.internet import threads # CLI tests use deferToThread +from twisted.internet.error import ConnectionDone from twisted.application import service from allmydata import client, uri, download, upload, storage, mutable from allmydata.introducer import IntroducerNode @@ -12,7 +13,8 @@ from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil from allmydata.scripts import runner from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI from allmydata.mutable import NotMutableError -from foolscap.eventual import flushEventualQueue +from foolscap.eventual import fireEventually, flushEventualQueue +from foolscap import DeadReferenceError from twisted.python import log from twisted.python.failure import Failure from twisted.web.client import getPage @@ -87,6 +89,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): f = open(os.path.join(basedirs[0],"private","helper.furl"), "r") helper_furl = f.read() f.close() + self.helper_furl = helper_furl f = open(os.path.join(basedirs[3],"helper.furl"), "w") f.write(helper_furl) f.close() @@ -107,18 +110,27 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): d.addCallback(_connected) return d - def add_extra_node(self, client_num): - # this node is *not* parented to our self.sparent, so we can shut it - # down separately from the rest, to exercise the connection-lost code + def add_extra_node(self, client_num, helper_furl=None, + add_to_sparent=False): + # usually this node is *not* parented to our self.sparent, so we can + # shut it down separately from the rest, to exercise the + # connection-lost code basedir = self.getdir("client%d" % client_num) if not os.path.isdir(basedir): fileutil.make_dirs(basedir) open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl) + if helper_furl: + f = open(os.path.join(basedir, "helper.furl") ,"w") + f.write(helper_furl+"\n") + f.close() c = client.Client(basedir=basedir) self.clients.append(c) self.numclients += 1 - c.startService() + if add_to_sparent: + c.setServiceParent(self.sparent) + else: + c.startService() d = self.wait_for_connections() d.addCallback(lambda res: c) return d @@ -257,10 +269,20 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): return d1 d.addCallback(_download_nonexistent_uri) + # add a new node, which doesn't accept shares, and only uses the + # helper for upload. + d.addCallback(lambda res: self.add_extra_node(self.numclients, + self.helper_furl, + add_to_sparent=True)) + def _added(extra_node): + self.extra_node = extra_node + extra_node.getServiceNamed("storageserver").sizelimit = 0 + d.addCallback(_added) + def _upload_with_helper(res): DATA = "Data that needs help to upload" * 1000 u = upload.Data(DATA) - d = self.clients[3].upload(u) + d = self.extra_node.upload(u) def _uploaded(uri): return self.downloader.download_to_data(uri) d.addCallback(_uploaded) @@ -270,6 +292,104 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): return d d.addCallback(_upload_with_helper) + def _upload_resumable(res): + DATA = "Data that needs help to upload and gets interrupted" * 1000 + u = upload.Data(DATA) + # interrupt the first upload after 5kB + print "GOING" + from allmydata.util import log + options = {"debug_interrupt": 5000, + "debug_stash_RemoteEncryptedUploadable": True, + } + # sneak into the helper and reduce its segment size, so that our + # debug_interrupt will sever the connection on about the fifth + # segment fetched. This makes sure that we've started to write + # the new shares before we abandon them, which exercises the + # abort/delete-partial-share code. + o2 = {"max_segment_size": 1000} + self.clients[0].getServiceNamed("helper")._chk_options = o2 + + d = self.extra_node.upload(u, options) + def _eee(res): + log.msg("EEE: %s" % (res,)) + print "EEE", res + d2 = defer.Deferred() + reactor.callLater(3, d2.callback, None) + return d2 + #d.addBoth(_eee) + #return d + + def _should_not_finish(res): + self.fail("interrupted upload should have failed, not finished" + " with result %s" % (res,)) + def _interrupted(f): + print "interrupted" + log.msg("interrupted", level=log.WEIRD, failure=f) + f.trap(ConnectionDone, DeadReferenceError) + reu = options["RemoteEncryptedUploabable"] + print "REU.bytes", reu._bytes_read + # make sure we actually interrupted it before finishing the + # file + self.failUnless(reu._bytes_read < len(DATA), + "read %d out of %d total" % (reu._bytes_read, + len(DATA))) + log.msg("waiting for reconnect", level=log.WEIRD) + # now, we need to give the nodes a chance to notice that this + # connection has gone away. When this happens, the storage + # servers will be told to abort their uploads, removing the + # partial shares. Unfortunately this involves TCP messages + # going through the loopback interface, and we can't easily + # predict how long that will take. If it were all local, we + # could use fireEventually() to stall. Since we don't have + # the right introduction hooks, the best we can do is use a + # fixed delay. TODO: this is fragile. + return self.stall(None, 2.0) + d.addCallbacks(_should_not_finish, _interrupted) + + def _disconnected(res): + # check to make sure the storage servers aren't still hanging + # on to the partial share: their incoming/ directories should + # now be empty. + print "disconnected" + log.msg("disconnected", level=log.WEIRD) + for i in range(self.numclients): + incdir = os.path.join(self.getdir("client%d" % i), + "storage", "shares", "incoming") + self.failUnlessEqual(os.listdir(incdir), []) + d.addCallback(_disconnected) + + def _wait_for_reconnect(res): + # then we need to give the reconnector a chance to + # reestablish the connection to the helper. + d.addCallback(lambda res: log.msg("wait_for_connections", + level=log.WEIRD)) + d.addCallback(lambda res: self.wait_for_connections()) + d.addCallback(_wait_for_reconnect) + options2 = {"debug_stash_RemoteEncryptedUploadable": True} + def _upload_again(res): + print "uploading again" + log.msg("uploading again", level=log.WEIRD) + return self.extra_node.upload(u, options2) + d.addCallbacks(_upload_again) + + def _uploaded(uri): + log.msg("I think its uploaded", level=log.WEIRD) + print "I tunk its uploaded", uri + reu = options2["RemoteEncryptedUploabable"] + print "REU.bytes", reu._bytes_read + # make sure we didn't read the whole file the second time + # around + self.failUnless(reu._bytes_read < len(DATA), + "resumption didn't save us any work: read %d bytes out of %d total" % + (reu._bytes_read, len(DATA))) + return self.downloader.download_to_data(uri) + d.addCallback(_uploaded) + def _check(newdata): + self.failUnlessEqual(newdata, DATA) + d.addCallback(_check) + return d + #d.addCallback(_upload_resumable) + return d test_upload_and_download.timeout = 4800