-
+import os
+from twisted.internet import defer
from twisted.trial import unittest
from twisted.application import service
-from foolscap import Tub, eventual
-from foolscap.logging import log
+from foolscap.api import Tub, fireEventually, flushEventualQueue
+
+from allmydata.storage.server import si_b2a
+from allmydata.storage_client import StorageFarmBroker
+from allmydata.immutable import offloaded, upload
+from allmydata import uri, client
+from allmydata.util import hashutil, fileutil, mathutil
+from pycryptopp.cipher.aes import AES
+
+MiB = 1024*1024
-from allmydata import upload, offloaded
-from allmydata.util import hashutil
+DATA = "I need help\n" * 1000
class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
def start_encrypted(self, eu):
- needed_shares, happy, total_shares = self._encoding_parameters
d = eu.get_size()
def _got_size(size):
- return (hashutil.uri_extension_hash(""),
- needed_shares, total_shares, size)
+ d2 = eu.get_all_encoding_parameters()
+ def _got_parms(parms):
+ # just pretend we did the upload
+ needed_shares, happy, total_shares, segsize = parms
+ ueb_data = {"needed_shares": needed_shares,
+ "total_shares": total_shares,
+ "segment_size": segsize,
+ "size": size,
+ }
+ ueb_hash = "fake"
+ v = uri.CHKFileVerifierURI(self._storage_index, "x"*32,
+ needed_shares, total_shares, size)
+ _UR = upload.UploadResults
+ ur = _UR(file_size=size,
+ ciphertext_fetched=0,
+ preexisting_shares=0,
+ pushed_shares=total_shares,
+ sharemap={},
+ servermap={},
+ timings={},
+ uri_extension_data=ueb_data,
+ uri_extension_hash=ueb_hash,
+ verifycapstr=v.to_string())
+ self._upload_status.set_results(ur)
+ return ur
+ d2.addCallback(_got_parms)
+ return d2
d.addCallback(_got_size)
return d
-class CHKUploadHelper_already_uploaded(offloaded.CHKUploadHelper):
- def start(self):
- res = {'uri_extension_hash': hashutil.uri_extension_hash("")}
- return (res, None)
+class Helper_fake_upload(offloaded.Helper):
+ def _make_chk_upload_helper(self, storage_index, lp):
+ si_s = si_b2a(storage_index)
+ incoming_file = os.path.join(self._chk_incoming, si_s)
+ encoding_file = os.path.join(self._chk_encoding, si_s)
+ uh = CHKUploadHelper_fake(storage_index, self,
+ self._storage_broker,
+ self._secret_holder,
+ incoming_file, encoding_file,
+ lp)
+ return uh
+
+class Helper_already_uploaded(Helper_fake_upload):
+ def _check_chk(self, storage_index, lp):
+ res = upload.HelperUploadResults()
+ res.uri_extension_hash = hashutil.uri_extension_hash("")
+
+ # we're pretending that the file they're trying to upload was already
+ # present in the grid. We return some information about the file, so
+ # the client can decide if they like the way it looks. The parameters
+ # used here are chosen to match the defaults.
+ PARAMS = FakeClient.DEFAULT_ENCODING_PARAMETERS
+ ueb_data = {"needed_shares": PARAMS["k"],
+ "total_shares": PARAMS["n"],
+ "segment_size": min(PARAMS["max_segment_size"], len(DATA)),
+ "size": len(DATA),
+ }
+ res.uri_extension_data = ueb_data
+ return defer.succeed(res)
class FakeClient(service.MultiService):
- def log(self, *args, **kwargs):
- return log.msg(*args, **kwargs)
- def get_push_to_ourselves(self):
- return True
+ DEFAULT_ENCODING_PARAMETERS = {"k":25,
+ "happy": 75,
+ "n": 100,
+ "max_segment_size": 1*MiB,
+ }
+
def get_encoding_parameters(self):
- return None
+ return self.DEFAULT_ENCODING_PARAMETERS
+ def get_storage_broker(self):
+ return self.storage_broker
def flush_but_dont_ignore(res):
- d = eventual.flushEventualQueue()
+ d = flushEventualQueue()
def _done(ignored):
return res
d.addCallback(_done)
return d
+def wait_a_few_turns(ignored=None):
+ d = fireEventually()
+ d.addCallback(fireEventually)
+ d.addCallback(fireEventually)
+ d.addCallback(fireEventually)
+ d.addCallback(fireEventually)
+ d.addCallback(fireEventually)
+ return d
+
+def upload_data(uploader, data, convergence):
+ u = upload.Data(data, convergence=convergence)
+ return uploader.upload(u)
+
class AssistedUpload(unittest.TestCase):
+ timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
def setUp(self):
self.s = FakeClient()
+ self.s.storage_broker = StorageFarmBroker(None, True, 0, None)
+ self.s.secret_holder = client.SecretHolder("lease secret", "converge")
self.s.startService()
self.tub = t = Tub()
+ t.setOption("expose-remote-exception-types", False)
t.setServiceParent(self.s)
self.s.tub = t
# we never actually use this for network traffic, so it can use a
# bogus host/port
t.setLocation("bogus:1234")
- self.helper = h = offloaded.Helper(".")
- h.chk_upload_helper_class = CHKUploadHelper_fake
- h.setServiceParent(self.s)
- self.helper_furl = t.registerReference(h)
+ def setUpHelper(self, basedir, helper_class=Helper_fake_upload):
+ fileutil.make_dirs(basedir)
+ self.helper = h = helper_class(basedir,
+ self.s.storage_broker,
+ self.s.secret_holder,
+ None, None)
+ self.helper_furl = self.tub.registerReference(h)
def tearDown(self):
d = self.s.stopService()
- d.addCallback(eventual.fireEventually)
+ d.addCallback(fireEventually)
d.addBoth(flush_but_dont_ignore)
return d
def test_one(self):
+ self.basedir = "helper/AssistedUpload/test_one"
+ self.setUpHelper(self.basedir)
u = upload.Uploader(self.helper_furl)
u.setServiceParent(self.s)
- # wait a few turns
- d = eventual.fireEventually()
- d.addCallback(eventual.fireEventually)
- d.addCallback(eventual.fireEventually)
+ d = wait_a_few_turns()
def _ready(res):
assert u._helper
- DATA = "I need help\n" * 1000
- return u.upload_data(DATA)
+ return upload_data(u, DATA, convergence="some convergence string")
d.addCallback(_ready)
- def _uploaded(uri):
- assert "CHK" in uri
+ def _uploaded(results):
+ the_uri = results.get_uri()
+ assert "CHK" in the_uri
d.addCallback(_uploaded)
+ def _check_empty(res):
+ files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
+ self.failUnlessEqual(files, [])
+ files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
+ self.failUnlessEqual(files, [])
+ d.addCallback(_check_empty)
+
return d
+ def test_previous_upload_failed(self):
+ self.basedir = "helper/AssistedUpload/test_previous_upload_failed"
+ self.setUpHelper(self.basedir)
+
+ # we want to make sure that an upload which fails (leaving the
+ # ciphertext in the CHK_encoding/ directory) does not prevent a later
+ # attempt to upload that file from working. We simulate this by
+ # populating the directory manually. The hardest part is guessing the
+ # storage index.
+
+ k = FakeClient.DEFAULT_ENCODING_PARAMETERS["k"]
+ n = FakeClient.DEFAULT_ENCODING_PARAMETERS["n"]
+ max_segsize = FakeClient.DEFAULT_ENCODING_PARAMETERS["max_segment_size"]
+ segsize = min(max_segsize, len(DATA))
+ # this must be a multiple of 'required_shares'==k
+ segsize = mathutil.next_multiple(segsize, k)
+
+ key = hashutil.convergence_hash(k, n, segsize, DATA, "test convergence string")
+ assert len(key) == 16
+ encryptor = AES(key)
+ SI = hashutil.storage_index_hash(key)
+ SI_s = si_b2a(SI)
+ encfile = os.path.join(self.basedir, "CHK_encoding", SI_s)
+ f = open(encfile, "wb")
+ f.write(encryptor.process(DATA))
+ f.close()
+
+ u = upload.Uploader(self.helper_furl)
+ u.setServiceParent(self.s)
+
+ d = wait_a_few_turns()
+
+ def _ready(res):
+ assert u._helper
+ return upload_data(u, DATA, convergence="test convergence string")
+ d.addCallback(_ready)
+ def _uploaded(results):
+ the_uri = results.get_uri()
+ assert "CHK" in the_uri
+ d.addCallback(_uploaded)
+
+ def _check_empty(res):
+ files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
+ self.failUnlessEqual(files, [])
+ files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
+ self.failUnlessEqual(files, [])
+ d.addCallback(_check_empty)
+
+ return d
def test_already_uploaded(self):
- self.helper.chk_upload_helper_class = CHKUploadHelper_already_uploaded
+ self.basedir = "helper/AssistedUpload/test_already_uploaded"
+ self.setUpHelper(self.basedir, helper_class=Helper_already_uploaded)
u = upload.Uploader(self.helper_furl)
u.setServiceParent(self.s)
- # wait a few turns
- d = eventual.fireEventually()
- d.addCallback(eventual.fireEventually)
- d.addCallback(eventual.fireEventually)
+ d = wait_a_few_turns()
def _ready(res):
assert u._helper
- DATA = "I need help\n" * 1000
- return u.upload_data(DATA)
+ return upload_data(u, DATA, convergence="some convergence string")
d.addCallback(_ready)
- def _uploaded(uri):
- assert "CHK" in uri
+ def _uploaded(results):
+ the_uri = results.get_uri()
+ assert "CHK" in the_uri
d.addCallback(_uploaded)
+ def _check_empty(res):
+ files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
+ self.failUnlessEqual(files, [])
+ files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
+ self.failUnlessEqual(files, [])
+ d.addCallback(_check_empty)
+
return d