def get_times(self):
# return a dictionary of encode+push timings
return self._times
- def get_rates(self):
- # return a dictionary of encode+push speeds
- rates = {
- "encode": self.file_size / self._times["cumulative_encoding"],
- "push": self.file_size / self._times["cumulative_sending"],
- }
- return rates
return DictOf(Nodeid, float)
-UploadResults = DictOf(str, str)
+UploadResults = Any() #DictOf(str, str)
class RIEncryptedUploadable(RemoteInterface):
__remote_name__ = "RIEncryptedUploadable.tahoe.allmydata.com"
-import os.path, stat
+import os.path, stat, time
from zope.interface import implements
from twisted.application import service
from twisted.internet import defer
def __init__(self, storage_index, helper,
incoming_file, encoding_file,
- log_number):
+ results, log_number):
self._storage_index = storage_index
self._helper = helper
self._incoming_file = incoming_file
self._encoding_file = encoding_file
upload_id = idlib.b2a(storage_index)[:6]
self._log_number = log_number
+ self._results = results
self._helper.log("CHKUploadHelper starting for SI %s" % upload_id,
parent=log_number)
return upload.CHKUploader.log(self, *args, **kwargs)
def start(self):
+ self._started = time.time()
# determine if we need to upload the file. If so, return ({},self) .
# If not, return (UploadResults,None) .
self.log("deciding whether to upload the file or not", level=log.NOISY)
# we have the whole file, and we might be encoding it (or the
# encode/upload might have failed, and we need to restart it).
self.log("ciphertext already in place", level=log.UNUSUAL)
- return ({}, self)
+ return (self._results, self)
if os.path.exists(self._incoming_file):
# we have some of the file, but not all of it (otherwise we'd be
# encoding). The caller might be useful.
self.log("partial ciphertext already present", level=log.UNUSUAL)
- return ({}, self)
+ return (self._results, self)
# we don't remember uploading this file
self.log("no ciphertext yet", level=log.NOISY)
- return ({}, self)
+ return (self._results, self)
def remote_upload(self, reader):
# reader is an RIEncryptedUploadable. I am specified to return an
def _finished(self, res):
(uri_extension_hash, needed_shares, total_shares, size) = res
- upload_results = {'uri_extension_hash': uri_extension_hash}
+ r = self._results
+ r.uri_extension_hash = uri_extension_hash
+ f_times = self._fetcher.get_times()
+ r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
+ r.timings["total_fetch"] = f_times["total"]
self._reader.close()
os.unlink(self._encoding_file)
- self._finished_observers.fire(upload_results)
+ self._finished_observers.fire(r)
self._helper.upload_finished(self._storage_index)
del self._reader
self._readers = []
self._started = False
self._f = None
+ self._times = {
+ "cumulative_fetch": 0.0,
+ "total": 0.0,
+ }
def log(self, *args, **kwargs):
if "facility" not in kwargs:
if self._started:
return
self._started = True
+ started = time.time()
if os.path.exists(self._encoding_file):
self.log("ciphertext already present, bypassing fetch",
# else.
have = os.stat(self._encoding_file)[stat.ST_SIZE]
d = self.call("read_encrypted", have-1, 1)
- d.addCallback(lambda ignored: self._done2())
+ d.addCallback(self._done2, started)
return
# first, find out how large the file is going to be
d.addCallback(self._got_size)
d.addCallback(self._start_reading)
d.addCallback(self._done)
+ d.addCallback(self._done2, started)
d.addErrback(self._failed)
def _got_size(self, size):
# transfer that involves more than a few hundred chunks.
# 'fire_when_done' lives a long time, but the Deferreds returned by
# the inner _fetch() call do not.
+ start = time.time()
d = defer.maybeDeferred(self._fetch)
def _done(finished):
+ elapsed = time.time() - start
+ self._times["cumulative_fetch"] += elapsed
if finished:
self.log("finished reading ciphertext", level=log.NOISY)
fire_when_done.callback(None)
size=os.stat(self._incoming_file)[stat.ST_SIZE],
level=log.NOISY)
os.rename(self._incoming_file, self._encoding_file)
- return self._done2()
- def _done2(self):
+ def _done2(self, _ignored, started):
self.log("done2", level=log.NOISY)
+ elapsed = time.time() - started
+ self._times["total"] = elapsed
self._readers = []
self._done_observers.fire(None)
def when_done(self):
return self._done_observers.when_fired()
+ def get_times(self):
+ return self._times
class LocalCiphertextReader(AskUntilSuccessMixin):
return self.parent.log(*args, **kwargs)
def remote_upload_chk(self, storage_index):
+ r = upload.UploadResults()
+ started = time.time()
si_s = idlib.b2a(storage_index)
lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
incoming_file = os.path.join(self._chk_incoming, si_s)
uh = self._active_uploads[storage_index]
return uh.start()
- d = self._check_for_chk_already_in_grid(storage_index, lp)
- def _checked(upload_results):
- if upload_results:
+ d = self._check_for_chk_already_in_grid(storage_index, r, lp)
+ def _checked(already_present):
+ elapsed = time.time() - started
+ r.timings['existence_check'] = elapsed
+ if already_present:
+ # the necessary results are placed in the UploadResults
self.log("file already found in grid", parent=lp)
- return (upload_results, None)
+ return (r, None)
# the file is not present in the grid, by which we mean there are
# less than 'N' shares available.
self.log("creating new upload helper", parent=lp)
uh = self.chk_upload_helper_class(storage_index, self,
incoming_file, encoding_file,
- lp)
+ r, lp)
self._active_uploads[storage_index] = uh
return uh.start()
d.addCallback(_checked)
d.addErrback(_err)
return d
- def _check_for_chk_already_in_grid(self, storage_index, lp):
+ def _check_for_chk_already_in_grid(self, storage_index, results, lp):
# see if this file is already in the grid
lp2 = self.log("doing a quick check+UEBfetch",
parent=lp, level=log.NOISY)
if res:
(sharemap, ueb_data, ueb_hash) = res
self.log("found file in grid", level=log.NOISY, parent=lp)
- upload_results = {'uri_extension_hash': ueb_hash}
- return upload_results
+ results.uri_extension_hash = ueb_hash
+ return True
return False
d.addCallback(_checked)
return d
class CHKUploadHelper_already_uploaded(offloaded.CHKUploadHelper):
def start(self):
- res = {'uri_extension_hash': hashutil.uri_extension_hash("")}
+ res = upload.UploadResults()
+ res.uri_extension_hash = hashutil.uri_extension_hash("")
return (res, None)
class FakeClient(service.MultiService):
from twisted.python import failure
from twisted.internet import defer
from twisted.application import service
-from foolscap import Referenceable
+from foolscap import Referenceable, Copyable, RemoteCopy
from foolscap import eventual
from foolscap.logging import log
class TooFullError(Exception):
pass
-class UploadResults:
+class UploadResults(Copyable, RemoteCopy):
implements(IUploadResults)
+ typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
+ copytype = typeToCopy
+
+ file_size = None
uri = None
sharemap = None # dict of shnum to placement string
servermap = None # dict of peerid to set(shnums)
def __init__(self):
self.timings = {} # dict of name to number of seconds
- self.rates = {} # dict of name to rates (in bytes per second)
# our current uri_extension is 846 bytes for small files, a few bytes
# more for larger ones (since the filesize is encoded in decimal in a
def start_encrypted(self, encrypted):
eu = IEncryptedUploadable(encrypted)
+ started = time.time()
self._encoder = e = encode.Encoder(self._log_number)
d = e.set_encrypted_uploadable(eu)
- d.addCallback(self.locate_all_shareholders)
+ d.addCallback(self.locate_all_shareholders, started)
d.addCallback(self.set_shareholders, e)
d.addCallback(lambda res: e.start())
+ d.addCallback(self._encrypted_done)
# this fires with the uri_extension_hash and other data
return d
- def locate_all_shareholders(self, encoder):
+ def locate_all_shareholders(self, encoder, started):
+ peer_selection_started = now = time.time()
+ self._storage_index_elapsed = now - started
storage_index = encoder.get_param("storage_index")
upload_id = idlib.b2a(storage_index)[:6]
self.log("using storage index %s" % upload_id)
share_size, block_size,
num_segments, n, desired)
def _done(res):
- self._peer_selection_finished = time.time()
+ self._peer_selection_elapsed = time.time() - peer_selection_started
return res
d.addCallback(_done)
return d
assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
encoder.set_shareholders(buckets)
- def _compute_uri(self, (uri_extension_hash,
- needed_shares, total_shares, size),
- key):
- u = uri.CHKFileURI(key=key,
- uri_extension_hash=uri_extension_hash,
- needed_shares=needed_shares,
- total_shares=total_shares,
- size=size,
- )
+ def _encrypted_done(self, res):
r = self._results
- r.uri = u.to_string()
r.sharemap = {}
r.servermap = {}
for shnum in self._encoder.get_shares_placed():
if peerid not in r.servermap:
r.servermap[peerid] = set()
r.servermap[peerid].add(shnum)
- peer_selection_time = (self._peer_selection_finished
- - self._peer_selection_started)
now = time.time()
+ r.file_size = self._encoder.file_size
r.timings["total"] = now - self._started
- r.rates["total"] = 1.0 * self._encoder.file_size / r.timings["total"]
- r.timings["peer_selection"] = peer_selection_time
+ r.timings["storage_index"] = self._storage_index_elapsed
+ r.timings["peer_selection"] = self._peer_selection_elapsed
r.timings.update(self._encoder.get_times())
- r.rates.update(self._encoder.get_rates())
+ return res
+
+ def _compute_uri(self, (uri_extension_hash,
+ needed_shares, total_shares, size),
+ key):
+ u = uri.CHKFileURI(key=key,
+ uri_extension_hash=uri_extension_hash,
+ needed_shares=needed_shares,
+ total_shares=total_shares,
+ size=size,
+ )
+ r = self._results
+ r.uri = u.to_string()
return r
def start(self, uploadable):
uploadable = IUploadable(uploadable)
d = uploadable.get_size()
- d.addCallback(lambda size: read_this_many_bytes(uploadable, size))
+ def _got_size(size):
+ self._results.file_size = size
+ return read_this_many_bytes(uploadable, size)
+ d.addCallback(_got_size)
d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
d.addCallback(lambda u: u.to_string())
d.addCallback(self._build_results)
assert isinstance(default_encoding_parameters, dict)
self._default_encoding_parameters = default_encoding_parameters
self._log_number = log.msg("AssistedUploader starting")
- self._results = UploadResults()
def log(self, msg, parent=None, **kwargs):
if parent is None:
self._storage_index = storage_index
def _contact_helper(self, res):
- now = self._time_contacting_helper = time.time()
- self._results.timings["local_hashing"] = now - self._started
+ now = self._time_contacting_helper_start = time.time()
+ self._storage_index_elapsed = now - self._started
self.log("contacting helper..")
d = self._helper.callRemote("upload_chk", self._storage_index)
d.addCallback(self._contacted_helper)
return d
def _contacted_helper(self, (upload_results, upload_helper)):
now = time.time()
- elapsed = now - self._time_contacting_helper
- self._results.timings["contacting_helper"] = elapsed
+ elapsed = now - self._time_contacting_helper_start
+ self._elapsed_time_contacting_helper = elapsed
if upload_helper:
self.log("helper says we need to upload")
# we need to upload the file
def _build_readcap(self, upload_results):
self.log("upload finished, building readcap")
- ur = upload_results
+ r = upload_results
u = uri.CHKFileURI(key=self._key,
- uri_extension_hash=ur['uri_extension_hash'],
+ uri_extension_hash=r.uri_extension_hash,
needed_shares=self._needed_shares,
total_shares=self._total_shares,
size=self._size,
)
- r = self._results
r.uri = u.to_string()
now = time.time()
+ r.file_size = self._size
+ r.timings["storage_index"] = self._storage_index_elapsed
+ r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
+ if "total" in r.timings:
+ r.timings["helper_total"] = r.timings["total"]
r.timings["total"] = now - self._started
- r.rates["total"] = 1.0 * self._size / r.timings["total"]
return r
class NoParameterPreferencesMixin:
<li>Servermap: <span n:render="servermap" /></li>
<li>Timings:</li>
<ul>
+ <li>File Size: <span n:render="string" n:data="file_size" /> bytes</li>
<li>Total: <span n:render="time" n:data="time_total" />
(<span n:render="rate" n:data="rate_total" />)</li>
<ul>
+ <li>Storage Index: <span n:render="time" n:data="time_storage_index" />
+ (<span n:render="rate" n:data="rate_storage_index" />)</li>
<li>Peer Selection: <span n:render="time" n:data="time_peer_selection" /></li>
<li>Encode And Push: <span n:render="time" n:data="time_total_encode_and_push" /></li>
<ul>
d.addCallback(_render)
return d
+ def data_file_size(self, ctx, data):
+ d = self.upload_results()
+ d.addCallback(lambda res: res.file_size)
+ return d
+
def render_time(self, ctx, data):
# 1.23s, 790ms, 132us
if data is None:
return "%.1fkBps" % (r/1000)
return "%dBps" % r
- def data_time_total(self, ctx, data):
+ def _get_time(self, name):
d = self.upload_results()
- d.addCallback(lambda res: res.timings.get("total"))
+ d.addCallback(lambda res: res.timings.get(name))
return d
+ def data_time_total(self, ctx, data):
+ return self._get_time("total")
+
+ def data_time_storage_index(self, ctx, data):
+ return self._get_time("storage_index")
+
def data_time_peer_selection(self, ctx, data):
- d = self.upload_results()
- d.addCallback(lambda res: res.timings.get("peer_selection"))
- return d
+ return self._get_time("peer_selection")
def data_time_total_encode_and_push(self, ctx, data):
- d = self.upload_results()
- d.addCallback(lambda res: res.timings.get("total_encode_and_push"))
- return d
+ return self._get_time("total_encode_and_push")
def data_time_cumulative_encoding(self, ctx, data):
- d = self.upload_results()
- d.addCallback(lambda res: res.timings.get("cumulative_encoding"))
- return d
+ return self._get_time("cumulative_encoding")
def data_time_cumulative_sending(self, ctx, data):
- d = self.upload_results()
- d.addCallback(lambda res: res.timings.get("cumulative_sending"))
- return d
+ return self._get_time("cumulative_sending")
def data_time_hashes_and_close(self, ctx, data):
+ return self._get_time("hashes_and_close")
+
+ def _get_rate(self, name):
d = self.upload_results()
- d.addCallback(lambda res: res.timings.get("hashes_and_close"))
+ def _convert(r):
+ file_size = r.file_size
+ time = r.timings.get(name)
+ if time is None:
+ return None
+ try:
+ return 1.0 * file_size / time
+ except ZeroDivisionError:
+ return None
+ d.addCallback(_convert)
return d
def data_rate_total(self, ctx, data):
- d = self.upload_results()
- d.addCallback(lambda res: res.rates.get("total"))
- return d
+ return self._get_rate("total")
+
+ def data_rate_storage_index(self, ctx, data):
+ return self._get_rate("storage_index")
def data_rate_encode(self, ctx, data):
- d = self.upload_results()
- d.addCallback(lambda res: res.rates.get("encode"))
- return d
+ return self._get_rate("cumulative_encoding")
def data_rate_push(self, ctx, data):
- d = self.upload_results()
- d.addCallback(lambda res: res.rates.get("push"))
- return d
+ return self._get_rate("cumulative_sending")
class UnlinkedPOSTSSKUploader(rend.Page):