def __init__(self, storage_index,
helper, storage_broker, secret_holder,
incoming_file, encoding_file,
- results, log_number):
+ log_number):
self._storage_index = storage_index
self._helper = helper
self._incoming_file = incoming_file
self._encoding_file = encoding_file
self._upload_id = si_b2a(storage_index)[:5]
self._log_number = log_number
- self._results = results
self._upload_status = upload.UploadStatus()
self._upload_status.set_helper(False)
self._upload_status.set_storage_index(storage_index)
# and inform the client when the upload has finished
return self._finished_observers.when_fired()
- def _finished(self, uploadresults):
- precondition(isinstance(uploadresults.verifycapstr, str), uploadresults.verifycapstr)
- assert interfaces.IUploadResults.providedBy(uploadresults), uploadresults
- r = uploadresults
- v = uri.from_string(r.verifycapstr)
- r.uri_extension_hash = v.uri_extension_hash
+ def _finished(self, ur):
+ precondition(isinstance(ur.verifycapstr, str), ur.verifycapstr)
+ assert interfaces.IUploadResults.providedBy(ur), ur
+ v = uri.from_string(ur.verifycapstr)
f_times = self._fetcher.get_times()
- r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
- r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
- r.timings["total_fetch"] = f_times["total"]
+
+ hur = upload.HelperUploadResults()
+ hur.timings = {"cumulative_fetch": f_times["cumulative_fetch"],
+ "total_fetch": f_times["total"],
+ }
+ for k in ur.timings:
+ hur.timings[k] = ur.timings[k]
+ hur.uri_extension_hash = v.uri_extension_hash
+ hur.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
+ hur.preexisting_shares = ur.preexisting_shares
+ hur.sharemap = ur.sharemap
+ hur.servermap = ur.servermap
+ hur.pushed_shares = ur.pushed_shares
+ hur.file_size = ur.file_size
+ hur.uri_extension_data = ur.uri_extension_data
+ hur.verifycapstr = ur.verifycapstr
+
self._reader.close()
os.unlink(self._encoding_file)
- self._finished_observers.fire(r)
+ self._finished_observers.fire(hur)
self._helper.upload_finished(self._storage_index, v.size)
del self._reader
def remote_upload_chk(self, storage_index):
self.count("chk_upload_helper.upload_requests")
- r = upload.UploadResults()
lp = self.log(format="helper: upload_chk query for SI %(si)s",
si=si_b2a(storage_index))
if storage_index in self._active_uploads:
uh = self._active_uploads[storage_index]
return (None, uh)
- d = self._check_chk(storage_index, r, lp)
- d.addCallback(self._did_chk_check, storage_index, r, lp)
+ d = self._check_chk(storage_index, lp)
+ d.addCallback(self._did_chk_check, storage_index, lp)
def _err(f):
self.log("error while checking for chk-already-in-grid",
failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
d.addErrback(_err)
return d
- def _check_chk(self, storage_index, results, lp):
+ def _check_chk(self, storage_index, lp):
# see if this file is already in the grid
lp2 = self.log("doing a quick check+UEBfetch",
parent=lp, level=log.NOISY)
if res:
(sharemap, ueb_data, ueb_hash) = res
self.log("found file in grid", level=log.NOISY, parent=lp)
- results.uri_extension_hash = ueb_hash
- results.sharemap = sharemap
- results.uri_extension_data = ueb_data
- results.preexisting_shares = len(sharemap)
- results.pushed_shares = 0
- return results
+ hur = upload.HelperUploadResults()
+ hur.uri_extension_hash = ueb_hash
+ hur.sharemap = sharemap
+ hur.uri_extension_data = ueb_data
+ hur.preexisting_shares = len(sharemap)
+ hur.pushed_shares = 0
+ return hur
return None
d.addCallback(_checked)
return d
- def _did_chk_check(self, already_present, storage_index, r, lp):
+ def _did_chk_check(self, already_present, storage_index, lp):
if already_present:
# the necessary results are placed in the UploadResults
self.count("chk_upload_helper.upload_already_present")
uh = self._active_uploads[storage_index]
else:
self.log("creating new upload helper", parent=lp)
- uh = self._make_chk_upload_helper(storage_index, r, lp)
+ uh = self._make_chk_upload_helper(storage_index, lp)
self._active_uploads[storage_index] = uh
self._add_upload(uh)
return (None, uh)
- def _make_chk_upload_helper(self, storage_index, r, lp):
+ def _make_chk_upload_helper(self, storage_index, lp):
si_s = si_b2a(storage_index)
incoming_file = os.path.join(self._chk_incoming, si_s)
encoding_file = os.path.join(self._chk_encoding, si_s)
self._storage_broker,
self._secret_holder,
incoming_file, encoding_file,
- r, lp)
+ lp)
return uh
def _add_upload(self, uh):
class TooFullError(Exception):
pass
-class UploadResults(Copyable, RemoteCopy):
- implements(IUploadResults)
+# HelperUploadResults are what we get from the Helper, and to retain
+# backwards compatibility with old Helpers we can't change the format. We
+# convert them into a local UploadResults upon receipt.
+class HelperUploadResults(Copyable, RemoteCopy):
# note: don't change this string, it needs to match the value used on the
# helper, and it does *not* need to match the fully-qualified
# package/module/class name
self.preexisting_shares = None # count of shares already present
self.pushed_shares = None # count of shares we pushed
+class UploadResults:
+ implements(IUploadResults)
+
+ def __init__(self):
+ self.timings = {} # dict of name to number of seconds
+ self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
+ self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
+ self.file_size = None
+ self.ciphertext_fetched = None # how much the helper fetched
+ self.uri = None
+ self.preexisting_shares = None # count of shares already present
+ self.pushed_shares = None # count of shares we pushed
+
# our current uri_extension is 846 bytes for small files, a few bytes
# more for larger ones (since the filesize is encoded in decimal in a
d.addCallback(self._contacted_helper)
return d
- def _contacted_helper(self, (upload_results, upload_helper)):
+ def _contacted_helper(self, (helper_upload_results, upload_helper)):
now = time.time()
elapsed = now - self._time_contacting_helper_start
self._elapsed_time_contacting_helper = elapsed
return d
self.log("helper says file is already uploaded", level=log.OPERATIONAL)
self._upload_status.set_progress(1, 1.0)
- return upload_results
+ return helper_upload_results
def _convert_old_upload_results(self, upload_results):
# pre-1.3.0 helpers return upload results which contain a mapping
if str in [type(v) for v in sharemap.values()]:
upload_results.sharemap = None
- def _build_verifycap(self, upload_results):
+ def _build_verifycap(self, helper_upload_results):
self.log("upload finished, building readcap", level=log.OPERATIONAL)
- self._convert_old_upload_results(upload_results)
+ self._convert_old_upload_results(helper_upload_results)
self._upload_status.set_status("Building Readcap")
- r = upload_results
- assert r.uri_extension_data["needed_shares"] == self._needed_shares
- assert r.uri_extension_data["total_shares"] == self._total_shares
- assert r.uri_extension_data["segment_size"] == self._segment_size
- assert r.uri_extension_data["size"] == self._size
- r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
- uri_extension_hash=r.uri_extension_hash,
- needed_shares=self._needed_shares,
- total_shares=self._total_shares, size=self._size
- ).to_string()
+ hur = helper_upload_results
+ assert hur.uri_extension_data["needed_shares"] == self._needed_shares
+ assert hur.uri_extension_data["total_shares"] == self._total_shares
+ assert hur.uri_extension_data["segment_size"] == self._segment_size
+ assert hur.uri_extension_data["size"] == self._size
+ ur = UploadResults()
+ # hur.verifycap doesn't exist if already found
+ v = uri.CHKFileVerifierURI(self._storage_index,
+ uri_extension_hash=hur.uri_extension_hash,
+ needed_shares=self._needed_shares,
+ total_shares=self._total_shares,
+ size=self._size)
+ ur.verifycapstr = v.to_string()
+ ur.timings = hur.timings
+ ur.uri_extension_data = hur.uri_extension_data
+ ur.uri_extension_hash = hur.uri_extension_hash
+ ur.preexisting_shares = hur.preexisting_shares
+ ur.pushed_shares = hur.pushed_shares
+ ur.sharemap = hur.sharemap
+ ur.servermap = hur.servermap # not if already found
+ ur.ciphertext_fetched = hur.ciphertext_fetched # not if already found
now = time.time()
- r.file_size = self._size
- r.timings["storage_index"] = self._storage_index_elapsed
- r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
- if "total" in r.timings:
- r.timings["helper_total"] = r.timings["total"]
- r.timings["total"] = now - self._started
+ ur.file_size = self._size
+ ur.timings["storage_index"] = self._storage_index_elapsed
+ ur.timings["contacting_helper"] = self._elapsed_time_contacting_helper
+ if "total" in ur.timings:
+ ur.timings["helper_total"] = ur.timings["total"]
+ ur.timings["total"] = now - self._started
self._upload_status.set_status("Finished")
- self._upload_status.set_results(r)
- return r
+ self._upload_status.set_results(ur)
+ return ur
def get_upload_status(self):
return self._upload_status
def _got_size(size):
d2 = eu.get_all_encoding_parameters()
def _got_parms(parms):
+ # just pretend we did the upload
needed_shares, happy, total_shares, segsize = parms
ueb_data = {"needed_shares": needed_shares,
"total_shares": total_shares,
"segment_size": segsize,
"size": size,
}
- self._results.uri_extension_data = ueb_data
- self._results.verifycapstr = uri.CHKFileVerifierURI(self._storage_index, "x"*32,
- needed_shares, total_shares,
- size).to_string()
- return self._results
+
+ r = upload.UploadResults()
+ r.preexisting_shares = 0
+ r.pushed_shares = total_shares
+ r.file_size = size
+ r.uri_extension_data = ueb_data
+ v = uri.CHKFileVerifierURI(self._storage_index, "x"*32,
+ needed_shares, total_shares,
+ size)
+ r.verifycapstr = v.to_string()
+ return r
d2.addCallback(_got_parms)
return d2
d.addCallback(_got_size)
return d
class Helper_fake_upload(offloaded.Helper):
- def _make_chk_upload_helper(self, storage_index, r, lp):
+ def _make_chk_upload_helper(self, storage_index, lp):
si_s = si_b2a(storage_index)
incoming_file = os.path.join(self._chk_incoming, si_s)
encoding_file = os.path.join(self._chk_encoding, si_s)
self._storage_broker,
self._secret_holder,
incoming_file, encoding_file,
- r, lp)
+ lp)
return uh
class Helper_already_uploaded(Helper_fake_upload):
- def _check_chk(self, storage_index, results, lp):
- res = upload.UploadResults()
+ def _check_chk(self, storage_index, lp):
+ res = upload.HelperUploadResults()
res.uri_extension_hash = hashutil.uri_extension_hash("")
# we're pretending that the file they're trying to upload was already