sm = prr.data['sharemap']
assert isinstance(sm, DictOfSets), sm
- sm.update(ur.sharemap)
+ sm.update(ur.get_sharemap())
servers_responding = set(prr.data['servers-responding'])
- for shnum, serverids in ur.sharemap.items():
+ for shnum, serverids in ur.get_sharemap().items():
servers_responding.update(serverids)
servers_responding = sorted(servers_responding)
prr.data['servers-responding'] = servers_responding
return self._finished_observers.when_fired()
def _finished(self, ur):
- precondition(isinstance(ur.verifycapstr, str), ur.verifycapstr)
assert interfaces.IUploadResults.providedBy(ur), ur
- v = uri.from_string(ur.verifycapstr)
+ vcapstr = ur.get_verifycapstr()
+ precondition(isinstance(vcapstr, str), vcapstr)
+ v = uri.from_string(vcapstr)
f_times = self._fetcher.get_times()
hur = upload.HelperUploadResults()
hur.timings = {"cumulative_fetch": f_times["cumulative_fetch"],
"total_fetch": f_times["total"],
}
- for k in ur.timings:
- hur.timings[k] = ur.timings[k]
+ for key,val in ur.get_timings().items():
+ hur.timings[key] = val
hur.uri_extension_hash = v.uri_extension_hash
hur.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
- hur.preexisting_shares = ur.preexisting_shares
- hur.sharemap = ur.sharemap
- hur.servermap = ur.servermap
- hur.pushed_shares = ur.pushed_shares
- hur.file_size = ur.file_size
- hur.uri_extension_data = ur.uri_extension_data
- hur.verifycapstr = ur.verifycapstr
+ hur.preexisting_shares = ur.get_preexisting_shares()
+ hur.sharemap = ur.get_sharemap()
+ hur.servermap = ur.get_servermap()
+ hur.pushed_shares = ur.get_pushed_shares()
+ hur.file_size = ur.get_file_size()
+ hur.uri_extension_data = ur.get_uri_extension_data()
+ hur.verifycapstr = vcapstr
self._reader.close()
os.unlink(self._encoding_file)
uri_extension_data,
uri_extension_hash,
verifycapstr):
- self.file_size = file_size
- self.ciphertext_fetched = ciphertext_fetched
- self.preexisting_shares = preexisting_shares
- self.pushed_shares = pushed_shares
- self.sharemap = sharemap
- self.servermap = servermap
- self.timings = timings
- self.uri_extension_data = uri_extension_data
- self.uri_extension_hash = uri_extension_hash
- self.verifycapstr = verifycapstr
+ self._file_size = file_size
+ self._ciphertext_fetched = ciphertext_fetched
+ self._preexisting_shares = preexisting_shares
+ self._pushed_shares = pushed_shares
+ self._sharemap = sharemap
+ self._servermap = servermap
+ self._timings = timings
+ self._uri_extension_data = uri_extension_data
+ self._uri_extension_hash = uri_extension_hash
+ self._verifycapstr = verifycapstr
self.uri = None
def set_uri(self, uri):
self.uri = uri
+ def get_file_size(self):
+ return self._file_size
+ def get_ciphertext_fetched(self):
+ return self._ciphertext_fetched
+ def get_preexisting_shares(self):
+ return self._preexisting_shares
+ def get_pushed_shares(self):
+ return self._pushed_shares
+ def get_sharemap(self):
+ return self._sharemap
+ def get_servermap(self):
+ return self._servermap
+ def get_timings(self):
+ return self._timings
+ def get_uri_extension_data(self):
+ return self._uri_extension_data
+ def get_verifycapstr(self):
+ return self._verifycapstr
# our current uri_extension is 846 bytes for small files, a few bytes
# more for larger ones (since the filesize is encoded in decimal in a
# Generate the uri from the verifycap plus the key.
d3 = uploadable.get_encryption_key()
def put_readcap_into_results(key):
- v = uri.from_string(uploadresults.verifycapstr)
+ v = uri.from_string(uploadresults.get_verifycapstr())
r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
uploadresults.set_uri(r.to_string())
return uploadresults
"""
class IUploadResults(Interface):
- """I am returned by upload() methods. I contain a number of public
- attributes which can be read to determine the results of the upload. Some
- of these are functional, some are timing information. All of these may be
- None.
+ """I am returned by immutable upload() methods and contain the results of
+ the upload.
- .file_size : the size of the file, in bytes
+ I contain one public attribute:
.uri : the CHK read-cap for the file
- .ciphertext_fetched : how many bytes were fetched by the helper
- .sharemap: dict mapping share identifier to set of serverids
- (binary strings). This indicates which servers were given
- which shares. For immutable files, the shareid is an
- integer (the share number, from 0 to N-1). For mutable
- files, it is a string of the form 'seq%d-%s-sh%d',
- containing the sequence number, the roothash, and the
- share number.
- .servermap : dict mapping server peerid to a set of share numbers
- .timings : dict of timing information, mapping name to seconds (float)
- total : total upload time, start to finish
- storage_index : time to compute the storage index
- peer_selection : time to decide which peers will be used
- contacting_helper : initial helper query to upload/no-upload decision
- helper_total : initial helper query to helper finished pushing
- cumulative_fetch : helper waiting for ciphertext requests
- total_fetch : helper start to last ciphertext response
- cumulative_encoding : just time spent in zfec
- cumulative_sending : just time spent waiting for storage servers
- hashes_and_close : last segment push to shareholder close
- total_encode_and_push : first encode to shareholder close
-
"""
+ # some methods return empty values (0 or an empty dict) when called for
+ # non-distributed LIT files
+ def get_file_size():
+ """Return the file size, in bytes."""
+ def get_ciphertext_fetched():
+ """Return the number of bytes fetched by the helpe for this upload,
+ or 0 if the helper did not need to fetch any bytes (or if there was
+ no helper)."""
+ def get_preexisting_shares():
+ """Return the number of shares that were already present in the grid."""
+ def get_pushed_shares():
+ """Return the number of shares that were uploaded."""
+ def get_sharemap():
+ """Return a dict mapping share identifier to set of serverids (binary
+ strings). This indicates which servers were given which shares. For
+ immutable files, the shareid is an integer (the share number, from 0
+ to N-1). For mutable files, it is a string of the form
+ 'seq%d-%s-sh%d', containing the sequence number, the roothash, and
+ the share number."""
+ def get_servermap():
+ """Return dict mapping server peerid to a set of share numbers."""
+ def get_timings():
+ """Return dict of timing information, mapping name to seconds. All
+ times are floats:
+ total : total upload time, start to finish
+ storage_index : time to compute the storage index
+ peer_selection : time to decide which peers will be used
+ contacting_helper : initial helper query to upload/no-upload decision
+ helper_total : initial helper query to helper finished pushing
+ cumulative_fetch : helper waiting for ciphertext requests
+ total_fetch : helper start to last ciphertext response
+ cumulative_encoding : just time spent in zfec
+ cumulative_sending : just time spent waiting for storage servers
+ hashes_and_close : last segment push to shareholder close
+ total_encode_and_push : first encode to shareholder close
+ """
+ def get_uri_extension_data():
+ """Return the dict of UEB data created for this file."""
+ def get_verifycapstr():
+ """Return the (string) verify-cap URI for the uploaded object."""
+
class IDownloadResults(Interface):
"""I am created internally by download() methods. I contain a number of
public attributes which contain details about the download process.::
# this is really bytes received rather than sent, but it's
# convenient and basically measures the same thing
- bytes_sent = results.ciphertext_fetched
+ bytes_sent = results.get_ciphertext_fetched()
self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent)
# We currently don't support resumption of upload if the data is
# Make sure that only as many shares as necessary to satisfy
# servers of happiness were pushed.
d.addCallback(lambda results:
- self.failUnlessEqual(results.pushed_shares, 3))
+ self.failUnlessEqual(results.get_pushed_shares(), 3))
d.addCallback(lambda ign:
self.failUnless(self._has_happy_share_distribution()))
return d
def render_pushed_shares(self, ctx, data):
d = self.upload_results()
- d.addCallback(lambda res: res.pushed_shares)
+ d.addCallback(lambda res: res.get_pushed_shares())
return d
def render_preexisting_shares(self, ctx, data):
d = self.upload_results()
- d.addCallback(lambda res: res.preexisting_shares)
+ d.addCallback(lambda res: res.get_preexisting_shares())
return d
def render_sharemap(self, ctx, data):
d = self.upload_results()
- d.addCallback(lambda res: res.sharemap)
+ d.addCallback(lambda res: res.get_sharemap())
def _render(sharemap):
if sharemap is None:
return "None"
def render_servermap(self, ctx, data):
d = self.upload_results()
- d.addCallback(lambda res: res.servermap)
+ d.addCallback(lambda res: res.get_servermap())
def _render(servermap):
if servermap is None:
return "None"
def data_file_size(self, ctx, data):
d = self.upload_results()
- d.addCallback(lambda res: res.file_size)
+ d.addCallback(lambda res: res.get_file_size())
return d
def _get_time(self, name):
d = self.upload_results()
- d.addCallback(lambda res: res.timings.get(name))
+ d.addCallback(lambda res: res.get_timings().get(name))
return d
def data_time_total(self, ctx, data):
def _get_rate(self, name):
d = self.upload_results()
def _convert(r):
- file_size = r.file_size
- time = r.timings.get(name)
+ file_size = r.get_file_size()
+ time = r.get_timings().get(name)
return compute_rate(file_size, time)
d.addCallback(_convert)
return d
def data_rate_encode_and_push(self, ctx, data):
d = self.upload_results()
def _convert(r):
- file_size = r.file_size
- time1 = r.timings.get("cumulative_encoding")
- time2 = r.timings.get("cumulative_sending")
+ file_size = r.get_file_size()
+ time1 = r.get_timings().get("cumulative_encoding")
+ time2 = r.get_timings().get("cumulative_sending")
if (time1 is None or time2 is None):
return None
else:
def data_rate_ciphertext_fetch(self, ctx, data):
d = self.upload_results()
def _convert(r):
- fetch_size = r.ciphertext_fetched
- time = r.timings.get("cumulative_fetch")
+ fetch_size = r.get_ciphertext_fetched()
+ time = r.get_timings().get("cumulative_fetch")
return compute_rate(fetch_size, time)
d.addCallback(_convert)
return d