self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
self._finished_observers = observer.OneShotObserverList()
+ self._started = time.time()
d = self._fetcher.when_done()
d.addCallback(lambda res: self._reader.start())
d.addCallback(lambda res: self.start_encrypted(self._reader))
kwargs['facility'] = "tahoe.helper.chk"
return upload.CHKUploader.log(self, *args, **kwargs)
- def start(self):
- self._started = time.time()
- # determine if we need to upload the file. If so, return ({},self) .
- # If not, return (UploadResults,None) .
+ def remote_get_version(self):
+ return self.VERSION
+
+ def remote_upload(self, reader):
+ # reader is an RIEncryptedUploadable. I am specified to return an
+ # UploadResults dictionary.
+
+ # Log how much ciphertext we need to get.
self.log("deciding whether to upload the file or not", level=log.NOISY)
if os.path.exists(self._encoding_file):
# we have the whole file, and we might be encoding it (or the
# encode/upload might have failed, and we need to restart it).
self.log("ciphertext already in place", level=log.UNUSUAL)
- return (self._results, self)
- if os.path.exists(self._incoming_file):
+ elif os.path.exists(self._incoming_file):
# we have some of the file, but not all of it (otherwise we'd be
# encoding). The caller might be useful.
self.log("partial ciphertext already present", level=log.UNUSUAL)
- return (self._results, self)
- # we don't remember uploading this file
- self.log("no ciphertext yet", level=log.NOISY)
- return (self._results, self)
-
- def remote_get_version(self):
- return self.VERSION
-
- def remote_upload(self, reader):
- # reader is an RIEncryptedUploadable. I am specified to return an
- # UploadResults dictionary.
+ else:
+ # we don't remember uploading this file
+ self.log("no ciphertext yet", level=log.NOISY)
# let our fetcher pull ciphertext from the reader.
self._fetcher.add_reader(reader)
{ },
"application-version": str(allmydata.__full_version__),
}
- chk_upload_helper_class = CHKUploadHelper
MAX_UPLOAD_STATUSES = 10
def __init__(self, basedir, storage_broker, secret_holder,
def remote_upload_chk(self, storage_index):
self.count("chk_upload_helper.upload_requests")
r = upload.UploadResults()
- si_s = si_b2a(storage_index)
- lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
- incoming_file = os.path.join(self._chk_incoming, si_s)
- encoding_file = os.path.join(self._chk_encoding, si_s)
+ lp = self.log(format="helper: upload_chk query for SI %(si)s",
+ si=si_b2a(storage_index))
if storage_index in self._active_uploads:
self.log("upload is currently active", parent=lp)
uh = self._active_uploads[storage_index]
- return uh.start()
-
- d = self._check_for_chk_already_in_grid(storage_index, r, lp)
- def _checked(already_present):
- if already_present:
- # the necessary results are placed in the UploadResults
- self.count("chk_upload_helper.upload_already_present")
- self.log("file already found in grid", parent=lp)
- return (r, None)
-
- self.count("chk_upload_helper.upload_need_upload")
- # the file is not present in the grid, by which we mean there are
- # less than 'N' shares available.
- self.log("unable to find file in the grid", parent=lp,
- level=log.NOISY)
- # We need an upload helper. Check our active uploads again in
- # case there was a race.
- if storage_index in self._active_uploads:
- self.log("upload is currently active", parent=lp)
- uh = self._active_uploads[storage_index]
- else:
- self.log("creating new upload helper", parent=lp)
- uh = self.chk_upload_helper_class(storage_index, self,
- self._storage_broker,
- self._secret_holder,
- incoming_file, encoding_file,
- r, lp)
- self._active_uploads[storage_index] = uh
- self._add_upload(uh)
- return uh.start()
- d.addCallback(_checked)
+ return (None, uh)
+
+ d = self._check_chk(storage_index, r, lp)
+ d.addCallback(self._did_chk_check, storage_index, r, lp)
def _err(f):
self.log("error while checking for chk-already-in-grid",
failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
d.addErrback(_err)
return d
- def _check_for_chk_already_in_grid(self, storage_index, results, lp):
+ def _check_chk(self, storage_index, results, lp):
# see if this file is already in the grid
lp2 = self.log("doing a quick check+UEBfetch",
parent=lp, level=log.NOISY)
results.uri_extension_data = ueb_data
results.preexisting_shares = len(sharemap)
results.pushed_shares = 0
- return True
- return False
+ return results
+ return None
d.addCallback(_checked)
return d
+ def _did_chk_check(self, already_present, storage_index, r, lp):
+ if already_present:
+ # the necessary results are placed in the UploadResults
+ self.count("chk_upload_helper.upload_already_present")
+ self.log("file already found in grid", parent=lp)
+ return (already_present, None)
+
+ self.count("chk_upload_helper.upload_need_upload")
+ # the file is not present in the grid, by which we mean there are
+ # less than 'N' shares available.
+ self.log("unable to find file in the grid", parent=lp,
+ level=log.NOISY)
+ # We need an upload helper. Check our active uploads again in
+ # case there was a race.
+ if storage_index in self._active_uploads:
+ self.log("upload is currently active", parent=lp)
+ uh = self._active_uploads[storage_index]
+ else:
+ self.log("creating new upload helper", parent=lp)
+ uh = self._make_chk_upload_helper(storage_index, r, lp)
+ self._active_uploads[storage_index] = uh
+ self._add_upload(uh)
+ return (None, uh)
+
+ def _make_chk_upload_helper(self, storage_index, r, lp):
+ si_s = si_b2a(storage_index)
+ incoming_file = os.path.join(self._chk_incoming, si_s)
+ encoding_file = os.path.join(self._chk_encoding, si_s)
+ uh = CHKUploadHelper(storage_index, self,
+ self._storage_broker,
+ self._secret_holder,
+ incoming_file, encoding_file,
+ r, lp)
+ return uh
+
def _add_upload(self, uh):
self._all_uploads[uh] = None
if self._history:
import os
+from twisted.internet import defer
from twisted.trial import unittest
from twisted.application import service
d.addCallback(_got_size)
return d
-class CHKUploadHelper_already_uploaded(offloaded.CHKUploadHelper):
- def start(self):
+class Helper_fake_upload(offloaded.Helper):
+ def _make_chk_upload_helper(self, storage_index, r, lp):
+ si_s = si_b2a(storage_index)
+ incoming_file = os.path.join(self._chk_incoming, si_s)
+ encoding_file = os.path.join(self._chk_encoding, si_s)
+ uh = CHKUploadHelper_fake(storage_index, self,
+ self._storage_broker,
+ self._secret_holder,
+ incoming_file, encoding_file,
+ r, lp)
+ return uh
+
+class Helper_already_uploaded(Helper_fake_upload):
+ def _check_chk(self, storage_index, results, lp):
res = upload.UploadResults()
res.uri_extension_hash = hashutil.uri_extension_hash("")
"size": len(DATA),
}
res.uri_extension_data = ueb_data
- return (res, None)
+ return defer.succeed(res)
class FakeClient(service.MultiService):
DEFAULT_ENCODING_PARAMETERS = {"k":25,
# bogus host/port
t.setLocation("bogus:1234")
- def setUpHelper(self, basedir):
+ def setUpHelper(self, basedir, helper_class=Helper_fake_upload):
fileutil.make_dirs(basedir)
- self.helper = h = offloaded.Helper(basedir,
- self.storage_broker,
- self.secret_holder,
- None, None)
- h.chk_upload_helper_class = CHKUploadHelper_fake
+ self.helper = h = helper_class(basedir,
+ self.storage_broker,
+ self.secret_holder,
+ None, None)
self.helper_furl = self.tub.registerReference(h)
def tearDown(self):
def test_already_uploaded(self):
self.basedir = "helper/AssistedUpload/test_already_uploaded"
- self.setUpHelper(self.basedir)
- self.helper.chk_upload_helper_class = CHKUploadHelper_already_uploaded
+ self.setUpHelper(self.basedir, helper_class=Helper_already_uploaded)
u = upload.Uploader(self.helper_furl)
u.setServiceParent(self.s)