from twisted.internet import defer
from foolscap import Referenceable
from allmydata import upload, interfaces
+from allmydata.util import idlib
"""
implements(interfaces.RICHKUploadHelper)
- def __init__(self, storage_index, helper):
+ def __init__(self, storage_index, helper, log_number, options={}):
self._finished = False
self._storage_index = storage_index
self._helper = helper
- self._log_number = self._helper.log("CHKUploadHelper starting")
+ upload_id = idlib.b2a(storage_index)[:6]
+ self._log_number = log_number
+ self._helper.log("CHKUploadHelper starting for SI %s" % upload_id,
+ parent=log_number)
self._client = helper.parent
- self._options = {}
+ self._options = options
+ self._readers = []
self.set_params( (3,7,10) ) # GACK
+ def log(self, *args, **kwargs):
+ if 'facility' not in kwargs:
+ kwargs['facility'] = "tahoe.helper"
+ return upload.CHKUploader.log(self, *args, **kwargs)
+
def start(self):
# determine if we need to upload the file. If so, return ({},self) .
# If not, return (UploadResults,None) .
# reader is an RIEncryptedUploadable. I am specified to return an
# UploadResults dictionary.
+ self._readers.append(reader)
+ reader.notifyOnDisconnect(self._remove_reader, reader)
eu = CiphertextReader(reader, self._storage_index)
d = self.start_encrypted(eu)
def _done(res):
d.addCallback(_done)
return d
+ def _remove_reader(self, reader):
+ # NEEDS MORE
+ self._readers.remove(reader)
+ if not self._readers:
+ if not self._finished:
+ self.finished(None)
+
def finished(self, res):
self._finished = True
self._helper.upload_finished(self._storage_index)
# and send the request off to them. If nobody has it, we'll choose a
# helper at random.
+ name = "helper"
chk_upload_helper_class = CHKUploadHelper
def __init__(self, basedir):
self._basedir = basedir
+ self._chk_options = {}
self._active_uploads = {}
service.MultiService.__init__(self)
- def log(self, msg, **kwargs):
+ def log(self, *args, **kwargs):
if 'facility' not in kwargs:
- kwargs['facility'] = "helper"
- return self.parent.log(msg, **kwargs)
+ kwargs['facility'] = "tahoe.helper"
+ return self.parent.log(*args, **kwargs)
def remote_upload_chk(self, storage_index):
+ lp = self.log(format="helper: upload_chk query for SI %(si)s",
+ si=idlib.b2a(storage_index))
# TODO: look on disk
if storage_index in self._active_uploads:
+ self.log("upload is currently active", parent=lp)
uh = self._active_uploads[storage_index]
else:
- uh = self.chk_upload_helper_class(storage_index, self)
+ self.log("creating new upload helper", parent=lp)
+ uh = self.chk_upload_helper_class(storage_index, self, lp,
+ self._chk_options)
self._active_uploads[storage_index] = uh
return uh.start()
from twisted.trial import unittest
from twisted.internet import defer, reactor
from twisted.internet import threads # CLI tests use deferToThread
+from twisted.internet.error import ConnectionDone
from twisted.application import service
from allmydata import client, uri, download, upload, storage, mutable
from allmydata.introducer import IntroducerNode
from allmydata.scripts import runner
from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
from allmydata.mutable import NotMutableError
-from foolscap.eventual import flushEventualQueue
+from foolscap.eventual import fireEventually, flushEventualQueue
+from foolscap import DeadReferenceError
from twisted.python import log
from twisted.python.failure import Failure
from twisted.web.client import getPage
f = open(os.path.join(basedirs[0],"private","helper.furl"), "r")
helper_furl = f.read()
f.close()
+ self.helper_furl = helper_furl
f = open(os.path.join(basedirs[3],"helper.furl"), "w")
f.write(helper_furl)
f.close()
d.addCallback(_connected)
return d
- def add_extra_node(self, client_num):
- # this node is *not* parented to our self.sparent, so we can shut it
- # down separately from the rest, to exercise the connection-lost code
+ def add_extra_node(self, client_num, helper_furl=None,
+ add_to_sparent=False):
+ # usually this node is *not* parented to our self.sparent, so we can
+ # shut it down separately from the rest, to exercise the
+ # connection-lost code
basedir = self.getdir("client%d" % client_num)
if not os.path.isdir(basedir):
fileutil.make_dirs(basedir)
open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
+ if helper_furl:
+ f = open(os.path.join(basedir, "helper.furl") ,"w")
+ f.write(helper_furl+"\n")
+ f.close()
c = client.Client(basedir=basedir)
self.clients.append(c)
self.numclients += 1
- c.startService()
+ if add_to_sparent:
+ c.setServiceParent(self.sparent)
+ else:
+ c.startService()
d = self.wait_for_connections()
d.addCallback(lambda res: c)
return d
return d1
d.addCallback(_download_nonexistent_uri)
+ # add a new node, which doesn't accept shares, and only uses the
+ # helper for upload.
+ d.addCallback(lambda res: self.add_extra_node(self.numclients,
+ self.helper_furl,
+ add_to_sparent=True))
+ def _added(extra_node):
+ self.extra_node = extra_node
+ extra_node.getServiceNamed("storageserver").sizelimit = 0
+ d.addCallback(_added)
+
def _upload_with_helper(res):
DATA = "Data that needs help to upload" * 1000
u = upload.Data(DATA)
- d = self.clients[3].upload(u)
+ d = self.extra_node.upload(u)
def _uploaded(uri):
return self.downloader.download_to_data(uri)
d.addCallback(_uploaded)
return d
d.addCallback(_upload_with_helper)
+ def _upload_resumable(res):
+ DATA = "Data that needs help to upload and gets interrupted" * 1000
+ u = upload.Data(DATA)
+ # interrupt the first upload after 5kB
+ print "GOING"
+ from allmydata.util import log
+ options = {"debug_interrupt": 5000,
+ "debug_stash_RemoteEncryptedUploadable": True,
+ }
+ # sneak into the helper and reduce its segment size, so that our
+ # debug_interrupt will sever the connection on about the fifth
+ # segment fetched. This makes sure that we've started to write
+ # the new shares before we abandon them, which exercises the
+ # abort/delete-partial-share code.
+ o2 = {"max_segment_size": 1000}
+ self.clients[0].getServiceNamed("helper")._chk_options = o2
+
+ d = self.extra_node.upload(u, options)
+ def _eee(res):
+ log.msg("EEE: %s" % (res,))
+ print "EEE", res
+ d2 = defer.Deferred()
+ reactor.callLater(3, d2.callback, None)
+ return d2
+ #d.addBoth(_eee)
+ #return d
+
+ def _should_not_finish(res):
+ self.fail("interrupted upload should have failed, not finished"
+ " with result %s" % (res,))
+ def _interrupted(f):
+ print "interrupted"
+ log.msg("interrupted", level=log.WEIRD, failure=f)
+ f.trap(ConnectionDone, DeadReferenceError)
+ reu = options["RemoteEncryptedUploabable"]
+ print "REU.bytes", reu._bytes_read
+ # make sure we actually interrupted it before finishing the
+ # file
+ self.failUnless(reu._bytes_read < len(DATA),
+ "read %d out of %d total" % (reu._bytes_read,
+ len(DATA)))
+ log.msg("waiting for reconnect", level=log.WEIRD)
+ # now, we need to give the nodes a chance to notice that this
+ # connection has gone away. When this happens, the storage
+ # servers will be told to abort their uploads, removing the
+ # partial shares. Unfortunately this involves TCP messages
+ # going through the loopback interface, and we can't easily
+ # predict how long that will take. If it were all local, we
+ # could use fireEventually() to stall. Since we don't have
+ # the right introduction hooks, the best we can do is use a
+ # fixed delay. TODO: this is fragile.
+ return self.stall(None, 2.0)
+ d.addCallbacks(_should_not_finish, _interrupted)
+
+ def _disconnected(res):
+ # check to make sure the storage servers aren't still hanging
+ # on to the partial share: their incoming/ directories should
+ # now be empty.
+ print "disconnected"
+ log.msg("disconnected", level=log.WEIRD)
+ for i in range(self.numclients):
+ incdir = os.path.join(self.getdir("client%d" % i),
+ "storage", "shares", "incoming")
+ self.failUnlessEqual(os.listdir(incdir), [])
+ d.addCallback(_disconnected)
+
+ def _wait_for_reconnect(res):
+ # then we need to give the reconnector a chance to
+ # reestablish the connection to the helper.
+ d.addCallback(lambda res: log.msg("wait_for_connections",
+ level=log.WEIRD))
+ d.addCallback(lambda res: self.wait_for_connections())
+ d.addCallback(_wait_for_reconnect)
+ options2 = {"debug_stash_RemoteEncryptedUploadable": True}
+ def _upload_again(res):
+ print "uploading again"
+ log.msg("uploading again", level=log.WEIRD)
+ return self.extra_node.upload(u, options2)
+ d.addCallbacks(_upload_again)
+
+ def _uploaded(uri):
+ log.msg("I think its uploaded", level=log.WEIRD)
+ print "I tunk its uploaded", uri
+ reu = options2["RemoteEncryptedUploabable"]
+ print "REU.bytes", reu._bytes_read
+ # make sure we didn't read the whole file the second time
+ # around
+ self.failUnless(reu._bytes_read < len(DATA),
+ "resumption didn't save us any work: read %d bytes out of %d total" %
+ (reu._bytes_read, len(DATA)))
+ return self.downloader.download_to_data(uri)
+ d.addCallback(_uploaded)
+ def _check(newdata):
+ self.failUnlessEqual(newdata, DATA)
+ d.addCallback(_check)
+ return d
+ #d.addCallback(_upload_resumable)
+
return d
test_upload_and_download.timeout = 4800