-import os
+import os, time
from zope.interface import implements
from twisted.python import failure
from twisted.internet import defer
class UploadResults:
implements(IUploadResults)
+ uri = None
+ sharemap = None # dict of shnum to placement string
+ servermap = None # dict of peerid to set(shnums)
+ def __init__(self):
+ self.timings = {} # dict of name to number of seconds
+ self.rates = {} # dict of name to rates (in bytes per second)
# our current uri_extension is 846 bytes for small files, a few bytes
# more for larger ones (since the filesize is encoded in decimal in a
self._default_encoding_parameters = default_encoding_parameters
self._log_number = self._client.log("CHKUploader starting")
self._encoder = None
+ self._results = UploadResults()
def log(self, *args, **kwargs):
if "parent" not in kwargs:
This method returns a Deferred that will fire with the URI (a
string)."""
+ self._started = time.time()
uploadable = IUploadable(uploadable)
self.log("starting upload of %s" % uploadable)
num_segments = encoder.get_param("num_segments")
k,desired,n = encoder.get_param("share_counts")
+ self._peer_selection_started = time.time()
d = peer_selector.get_shareholders(self._client, storage_index,
share_size, block_size,
num_segments, n, desired)
+ def _done(res):
+ self._peer_selection_finished = time.time()
+ return res
+ d.addCallback(_done)
return d
def set_shareholders(self, used_peers, encoder):
@param used_peers: a sequence of PeerTracker objects
"""
self.log("_send_shares, used_peers is %s" % (used_peers,))
+ self._sharemap = {}
for peer in used_peers:
assert isinstance(peer, PeerTracker)
buckets = {}
for peer in used_peers:
buckets.update(peer.buckets)
+ for shnum in peer.buckets:
+ self._sharemap[shnum] = peer
assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
encoder.set_shareholders(buckets)
total_shares=total_shares,
size=size,
)
- results = UploadResults()
- results.uri = u.to_string()
- return results
+ r = self._results
+ r.uri = u.to_string()
+ r.sharemap = {}
+ r.servermap = {}
+ for shnum in self._encoder.get_shares_placed():
+ peer_tracker = self._sharemap[shnum]
+ peerid = peer_tracker.peerid
+ peerid_s = idlib.shortnodeid_b2a(peerid)
+ r.sharemap[shnum] = "Placed on [%s]" % peerid_s
+ if peerid not in r.servermap:
+ r.servermap[peerid] = set()
+ r.servermap[peerid].add(shnum)
+ peer_selection_time = (self._peer_selection_finished
+ - self._peer_selection_started)
+ now = time.time()
+ r.timings["total"] = now - self._started
+ r.rates["total"] = 1.0 * self._encoder.file_size / r.timings["total"]
+ r.timings["peer_selection"] = peer_selection_time
+ r.timings.update(self._encoder.get_times())
+ r.rates.update(self._encoder.get_rates())
+ return r
def read_this_many_bytes(uploadable, size, prepend_data=[]):
def __init__(self, client):
self._client = client
+ self._results = UploadResults()
def set_params(self, encoding_parameters):
pass
return d
def _build_results(self, uri):
- results = UploadResults()
- results.uri = uri
- return results
+ self._results.uri = uri
+ return self._results
def close(self):
pass
assert isinstance(default_encoding_parameters, dict)
self._default_encoding_parameters = default_encoding_parameters
self._log_number = log.msg("AssistedUploader starting")
+ self._results = UploadResults()
def log(self, msg, parent=None, **kwargs):
if parent is None:
return log.msg(msg, parent=parent, **kwargs)
def start(self, uploadable):
+ self._started = time.time()
u = IUploadable(uploadable)
eu = EncryptAnUploadable(u, self._default_encoding_parameters)
self._encuploadable = eu
self._storage_index = storage_index
def _contact_helper(self, res):
+ now = self._time_contacting_helper = time.time()
+ self._results.timings["local_hashing"] = now - self._started
self.log("contacting helper..")
d = self._helper.callRemote("upload_chk", self._storage_index)
d.addCallback(self._contacted_helper)
return d
def _contacted_helper(self, (upload_results, upload_helper)):
+ now = time.time()
+ elapsed = now - self._time_contacting_helper
+ self._results.timings["contacting_helper"] = elapsed
if upload_helper:
self.log("helper says we need to upload")
# we need to upload the file
total_shares=self._total_shares,
size=self._size,
)
- results = UploadResults()
- results.uri = u.to_string()
- return results
+ r = self._results
+ r.uri = u.to_string()
+ now = time.time()
+ r.timings["total"] = now - self._started
+ r.rates["total"] = 1.0 * self._size / r.timings["total"]
+ return r
class NoParameterPreferencesMixin:
max_segment_size = None