-import os.path, stat
+import os.path, stat, time
from zope.interface import implements
from twisted.application import service
from twisted.internet import defer
def __init__(self, storage_index, helper,
incoming_file, encoding_file,
- log_number):
+ results, log_number):
self._storage_index = storage_index
self._helper = helper
self._incoming_file = incoming_file
self._encoding_file = encoding_file
upload_id = idlib.b2a(storage_index)[:6]
self._log_number = log_number
+ self._results = results
self._helper.log("CHKUploadHelper starting for SI %s" % upload_id,
parent=log_number)
return upload.CHKUploader.log(self, *args, **kwargs)
def start(self):
+ self._started = time.time()
# determine if we need to upload the file. If so, return ({},self) .
# If not, return (UploadResults,None) .
self.log("deciding whether to upload the file or not", level=log.NOISY)
# we have the whole file, and we might be encoding it (or the
# encode/upload might have failed, and we need to restart it).
self.log("ciphertext already in place", level=log.UNUSUAL)
- return ({}, self)
+ return (self._results, self)
if os.path.exists(self._incoming_file):
# we have some of the file, but not all of it (otherwise we'd be
# encoding). The caller might be useful.
self.log("partial ciphertext already present", level=log.UNUSUAL)
- return ({}, self)
+ return (self._results, self)
# we don't remember uploading this file
self.log("no ciphertext yet", level=log.NOISY)
- return ({}, self)
+ return (self._results, self)
def remote_upload(self, reader):
# reader is an RIEncryptedUploadable. I am specified to return an
def _finished(self, res):
(uri_extension_hash, needed_shares, total_shares, size) = res
- upload_results = {'uri_extension_hash': uri_extension_hash}
+ r = self._results
+ r.uri_extension_hash = uri_extension_hash
+ f_times = self._fetcher.get_times()
+ r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
+ r.timings["total_fetch"] = f_times["total"]
self._reader.close()
os.unlink(self._encoding_file)
- self._finished_observers.fire(upload_results)
+ self._finished_observers.fire(r)
self._helper.upload_finished(self._storage_index)
del self._reader
self._readers = []
self._started = False
self._f = None
+ self._times = {
+ "cumulative_fetch": 0.0,
+ "total": 0.0,
+ }
def log(self, *args, **kwargs):
if "facility" not in kwargs:
if self._started:
return
self._started = True
+ started = time.time()
if os.path.exists(self._encoding_file):
self.log("ciphertext already present, bypassing fetch",
# else.
have = os.stat(self._encoding_file)[stat.ST_SIZE]
d = self.call("read_encrypted", have-1, 1)
- d.addCallback(lambda ignored: self._done2())
+ d.addCallback(self._done2, started)
return
# first, find out how large the file is going to be
d.addCallback(self._got_size)
d.addCallback(self._start_reading)
d.addCallback(self._done)
+ d.addCallback(self._done2, started)
d.addErrback(self._failed)
def _got_size(self, size):
# transfer that involves more than a few hundred chunks.
# 'fire_when_done' lives a long time, but the Deferreds returned by
# the inner _fetch() call do not.
+ start = time.time()
d = defer.maybeDeferred(self._fetch)
def _done(finished):
+ elapsed = time.time() - start
+ self._times["cumulative_fetch"] += elapsed
if finished:
self.log("finished reading ciphertext", level=log.NOISY)
fire_when_done.callback(None)
size=os.stat(self._incoming_file)[stat.ST_SIZE],
level=log.NOISY)
os.rename(self._incoming_file, self._encoding_file)
- return self._done2()
- def _done2(self):
+ def _done2(self, _ignored, started):
self.log("done2", level=log.NOISY)
+ elapsed = time.time() - started
+ self._times["total"] = elapsed
self._readers = []
self._done_observers.fire(None)
def when_done(self):
return self._done_observers.when_fired()
+ def get_times(self):
+ return self._times
class LocalCiphertextReader(AskUntilSuccessMixin):
return self.parent.log(*args, **kwargs)
def remote_upload_chk(self, storage_index):
+ r = upload.UploadResults()
+ started = time.time()
si_s = idlib.b2a(storage_index)
lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
incoming_file = os.path.join(self._chk_incoming, si_s)
uh = self._active_uploads[storage_index]
return uh.start()
- d = self._check_for_chk_already_in_grid(storage_index, lp)
- def _checked(upload_results):
- if upload_results:
+ d = self._check_for_chk_already_in_grid(storage_index, r, lp)
+ def _checked(already_present):
+ elapsed = time.time() - started
+ r.timings['existence_check'] = elapsed
+ if already_present:
+ # the necessary results are placed in the UploadResults
self.log("file already found in grid", parent=lp)
- return (upload_results, None)
+ return (r, None)
# the file is not present in the grid, by which we mean there are
# less than 'N' shares available.
self.log("creating new upload helper", parent=lp)
uh = self.chk_upload_helper_class(storage_index, self,
incoming_file, encoding_file,
- lp)
+ r, lp)
self._active_uploads[storage_index] = uh
return uh.start()
d.addCallback(_checked)
d.addErrback(_err)
return d
- def _check_for_chk_already_in_grid(self, storage_index, lp):
+ def _check_for_chk_already_in_grid(self, storage_index, results, lp):
# see if this file is already in the grid
lp2 = self.log("doing a quick check+UEBfetch",
parent=lp, level=log.NOISY)
if res:
(sharemap, ueb_data, ueb_hash) = res
self.log("found file in grid", level=log.NOISY, parent=lp)
- upload_results = {'uri_extension_hash': ueb_hash}
- return upload_results
+ results.uri_extension_hash = ueb_hash
+ return True
return False
d.addCallback(_checked)
return d