+import os
from twisted.trial import unittest
from twisted.application import service
from foolscap.logging import log
from allmydata import upload, offloaded
-from allmydata.util import hashutil
+from allmydata.util import hashutil, fileutil
MiB = 1024*1024
# bogus host/port
t.setLocation("bogus:1234")
- self.helper = h = offloaded.Helper(".")
+ def setUpHelper(self, basedir):
+ fileutil.make_dirs(basedir)
+ self.helper = h = offloaded.Helper(basedir)
h.chk_upload_helper_class = CHKUploadHelper_fake
h.setServiceParent(self.s)
- self.helper_furl = t.registerReference(h)
+ self.helper_furl = self.tub.registerReference(h)
def tearDown(self):
d = self.s.stopService()
def test_one(self):
+ self.basedir = "helper/AssistedUpload/test_one"
+ self.setUpHelper(self.basedir)
u = upload.Uploader(self.helper_furl)
u.setServiceParent(self.s)
assert "CHK" in uri
d.addCallback(_uploaded)
+ def _check_empty(res):
+ files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
+ self.failUnlessEqual(files, [])
+ files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
+ self.failUnlessEqual(files, [])
+ d.addCallback(_check_empty)
+
return d
def test_already_uploaded(self):
+ self.basedir = "helper/AssistedUpload/test_already_uploaded"
+ self.setUpHelper(self.basedir)
self.helper.chk_upload_helper_class = CHKUploadHelper_already_uploaded
u = upload.Uploader(self.helper_furl)
u.setServiceParent(self.s)
assert "CHK" in uri
d.addCallback(_uploaded)
+ def _check_empty(res):
+ files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
+ self.failUnlessEqual(files, [])
+ files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
+ self.failUnlessEqual(files, [])
+ d.addCallback(_check_empty)
+
return d
from twisted.internet import threads # CLI tests use deferToThread
from twisted.internet.error import ConnectionDone
from twisted.application import service
-from allmydata import client, uri, download, upload, storage, mutable
+from allmydata import client, uri, download, upload, storage, mutable, offloaded
from allmydata.introducer import IntroducerNode
from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil
+from allmydata.util import log
from allmydata.scripts import runner
from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
from allmydata.mutable import NotMutableError
from foolscap.eventual import flushEventualQueue
from foolscap import DeadReferenceError
-from twisted.python import log
from twisted.python.failure import Failure
from twisted.web.client import getPage
from twisted.web.error import Error
enough to not fit inside a LIT uri.
"""
-class SmallSegmentDataUploadable(upload.Data):
- def __init__(self, max_segment_size, *args, **kwargs):
- self._max_segment_size = max_segment_size
- upload.Data.__init__(self, *args, **kwargs)
- def get_maximum_segment_size(self):
- return defer.succeed(self._max_segment_size)
-
class SystemTest(testutil.SignalMixin, unittest.TestCase):
def setUp(self):
# tail segment is not the same length as the others. This actualy
# gets rounded up to 1025 to be a multiple of the number of
# required shares (since we use 25 out of 100 FEC).
- d1 = u.upload(SmallSegmentDataUploadable(1024, DATA))
+ up = upload.Data(DATA)
+ up.max_segment_size = 1024
+ d1 = u.upload(up)
return d1
d.addCallback(_do_upload)
def _upload_done(uri):
# the roothash), we have to do all of the encoding work, and only
# get to save on the upload part.
log.msg("UPLOADING AGAIN")
- d1 = self.uploader.upload(SmallSegmentDataUploadable(1024, DATA))
+ up = upload.Data(DATA)
+ up.max_segment_size = 1024
+ d1 = self.uploader.upload(up)
d.addCallback(_upload_again)
def _download_to_data(res):
def _upload_resumable(res):
DATA = "Data that needs help to upload and gets interrupted" * 1000
- u = upload.Data(DATA)
- # interrupt the first upload after 5kB
- print "GOING"
- from allmydata.util import log
- options = {"debug_interrupt": 5000,
- "debug_stash_RemoteEncryptedUploadable": True,
- }
- # sneak into the helper and reduce its segment size, so that our
+ u1 = upload.Data(DATA)
+ u2 = upload.Data(DATA)
+
+ # tell the upload to drop the connection after about 5kB
+ u1.debug_interrupt = 5000
+ u1.debug_stash_RemoteEncryptedUploadable = True
+ u2.debug_stash_RemoteEncryptedUploadable = True
+ # sneak into the helper and reduce its chunk size, so that our
# debug_interrupt will sever the connection on about the fifth
- # segment fetched. This makes sure that we've started to write
- # the new shares before we abandon them, which exercises the
- # abort/delete-partial-share code.
- o2 = {"max_segment_size": 1000}
- self.clients[0].getServiceNamed("helper")._chk_options = o2
+ # chunk fetched. This makes sure that we've started to write the
+ # new shares before we abandon them, which exercises the
+ # abort/delete-partial-share code. TODO: find a cleaner way to do
+ # this. I know that this will affect later uses of the helper in
+ # this same test run, but I'm not currently worried about it.
+ offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
- d = self.extra_node.upload(u, options)
+ d = self.extra_node.upload(u1)
def _should_not_finish(res):
self.fail("interrupted upload should have failed, not finished"
" with result %s" % (res,))
def _interrupted(f):
- print "interrupted"
- log.msg("interrupted", level=log.WEIRD, failure=f)
f.trap(ConnectionDone, DeadReferenceError)
- reu = options["RemoteEncryptedUploadable"]
- print "REU.bytes", reu._bytes_read
+ reu = u1.debug_RemoteEncryptedUploadable
# make sure we actually interrupted it before finishing the
# file
- self.failUnless(reu._bytes_read < len(DATA),
- "read %d out of %d total" % (reu._bytes_read,
+ self.failUnless(reu._bytes_sent < len(DATA),
+ "read %d out of %d total" % (reu._bytes_sent,
len(DATA)))
- log.msg("waiting for reconnect", level=log.WEIRD)
+ log.msg("waiting for reconnect", level=log.NOISY,
+ facility="tahoe.test.test_system")
# now, we need to give the nodes a chance to notice that this
# connection has gone away. When this happens, the storage
# servers will be told to abort their uploads, removing the
# check to make sure the storage servers aren't still hanging
# on to the partial share: their incoming/ directories should
# now be empty.
- print "disconnected"
- log.msg("disconnected", level=log.WEIRD)
+ log.msg("disconnected", level=log.NOISY,
+ facility="tahoe.test.test_system")
for i in range(self.numclients):
incdir = os.path.join(self.getdir("client%d" % i),
"storage", "shares", "incoming")
def _wait_for_reconnect(res):
# then we need to give the reconnector a chance to
# reestablish the connection to the helper.
- d.addCallback(lambda res: log.msg("wait_for_connections",
- level=log.WEIRD))
+ d.addCallback(lambda res:
+ log.msg("wait_for_connections", level=log.NOISY,
+ facility="tahoe.test.test_system"))
d.addCallback(lambda res: self.wait_for_connections())
d.addCallback(_wait_for_reconnect)
- options2 = {"debug_stash_RemoteEncryptedUploadable": True}
+
def _upload_again(res):
- print "uploading again"
- log.msg("uploading again", level=log.WEIRD)
- return self.extra_node.upload(u, options2)
+ log.msg("uploading again", level=log.NOISY,
+ facility="tahoe.test.test_system")
+ return self.extra_node.upload(u2)
d.addCallbacks(_upload_again)
def _uploaded(uri):
- log.msg("I think its uploaded", level=log.WEIRD)
- print "I tunk its uploaded", uri
- reu = options2["RemoteEncryptedUploadable"]
- print "REU.bytes", reu._bytes_read
+ log.msg("Second upload complete", level=log.NOISY,
+ facility="tahoe.test.test_system")
+ reu = u2.debug_RemoteEncryptedUploadable
# make sure we didn't read the whole file the second time
# around
- #self.failUnless(reu._bytes_read < len(DATA),
- # "resumption didn't save us any work:"
- # " read %d bytes out of %d total" %
- # (reu._bytes_read, len(DATA)))
+ self.failUnless(reu._bytes_sent < len(DATA),
+ "resumption didn't save us any work:"
+ " read %d bytes out of %d total" %
+ (reu._bytes_sent, len(DATA)))
return self.downloader.download_to_data(uri)
d.addCallback(_uploaded)
+
def _check(newdata):
self.failUnlessEqual(newdata, DATA)
+ # also check that the helper has removed the temp file from
+ # its directories
+ basedir = os.path.join(self.getdir("client0"), "helper")
+ files = os.listdir(os.path.join(basedir, "CHK_encoding"))
+ self.failUnlessEqual(files, [])
+ files = os.listdir(os.path.join(basedir, "CHK_incoming"))
+ self.failUnlessEqual(files, [])
d.addCallback(_check)
return d
- #d.addCallback(_upload_resumable)
+ d.addCallback(_upload_resumable)
return d
test_upload_and_download.timeout = 4800