from allmydata.util.happinessutil import servers_of_happiness, \
shares_by_server, merge_servers, \
failure_message
-from allmydata.util.assertutil import precondition
+from allmydata.util.assertutil import precondition, _assert
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
NoServersError, InsufficientVersionError, UploadUnhappinessError, \
- DEFAULT_MAX_SEGMENT_SIZE
+ DEFAULT_MAX_SEGMENT_SIZE, IProgress
from allmydata.immutable import layout
from pycryptopp.cipher.aes import AES
class TooFullError(Exception):
pass
-class UploadResults(Copyable, RemoteCopy):
- implements(IUploadResults)
+# HelperUploadResults are what we get from the Helper, and to retain
+# backwards compatibility with old Helpers we can't change the format. We
+# convert them into a local UploadResults upon receipt.
+class HelperUploadResults(Copyable, RemoteCopy):
# note: don't change this string, it needs to match the value used on the
# helper, and it does *not* need to match the fully-qualified
# package/module/class name
self.preexisting_shares = None # count of shares already present
self.pushed_shares = None # count of shares we pushed
+class UploadResults:
+ implements(IUploadResults)
+
+ def __init__(self, file_size,
+ ciphertext_fetched, # how much the helper fetched
+ preexisting_shares, # count of shares already present
+ pushed_shares, # count of shares we pushed
+ sharemap, # {shnum: set(server)}
+ servermap, # {server: set(shnum)}
+ timings, # dict of name to number of seconds
+ uri_extension_data,
+ uri_extension_hash,
+ verifycapstr):
+ self._file_size = file_size
+ self._ciphertext_fetched = ciphertext_fetched
+ self._preexisting_shares = preexisting_shares
+ self._pushed_shares = pushed_shares
+ self._sharemap = sharemap
+ self._servermap = servermap
+ self._timings = timings
+ self._uri_extension_data = uri_extension_data
+ self._uri_extension_hash = uri_extension_hash
+ self._verifycapstr = verifycapstr
+
+ def set_uri(self, uri):
+ self._uri = uri
+
+ def get_file_size(self):
+ return self._file_size
+ def get_uri(self):
+ return self._uri
+ def get_ciphertext_fetched(self):
+ return self._ciphertext_fetched
+ def get_preexisting_shares(self):
+ return self._preexisting_shares
+ def get_pushed_shares(self):
+ return self._pushed_shares
+ def get_sharemap(self):
+ return self._sharemap
+ def get_servermap(self):
+ return self._servermap
+ def get_timings(self):
+ return self._timings
+ def get_uri_extension_data(self):
+ return self._uri_extension_data
+ def get_verifycapstr(self):
+ return self._verifycapstr
# our current uri_extension is 846 bytes for small files, a few bytes
# more for larger ones (since the filesize is encoded in decimal in a
return ("<ServerTracker for server %s and SI %s>"
% (self._server.get_name(), si_b2a(self.storage_index)[:5]))
+ def get_server(self):
+ return self._server
def get_serverid(self):
return self._server.get_serverid()
def get_name(self):
v0 = server.get_rref().version
v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
return v1["maximum-immutable-share-size"]
- writable_servers = [server for server in all_servers
+ writeable_servers = [server for server in all_servers
if _get_maxsize(server) >= allocated_size]
- readonly_servers = set(all_servers[:2*total_shares]) - set(writable_servers)
+ readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)
# decide upon the renewal/cancel secrets, to include them in the
# allocate_buckets query.
# second-pass list and repeat the "second" pass (really the third,
# fourth, etc pass), until all shares are assigned, or we've run out
# of potential servers.
- self.first_pass_trackers = _make_trackers(writable_servers)
+ self.first_pass_trackers = _make_trackers(writeable_servers)
self.second_pass_trackers = [] # servers worth asking again
self.next_pass_trackers = [] # servers that we have asked again
self._started_second_pass = False
implements(IEncryptedUploadable)
CHUNKSIZE = 50*1024
- def __init__(self, original, log_parent=None):
+ def __init__(self, original, log_parent=None, progress=None):
+ precondition(original.default_params_set,
+ "set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,))
self.original = IUploadable(original)
self._log_number = log_parent
self._encryptor = None
self._file_size = None
self._ciphertext_bytes_read = 0
self._status = None
+ self._progress = progress
def set_upload_status(self, upload_status):
self._status = IUploadStatus(upload_status)
self._file_size = size
if self._status:
self._status.set_size(size)
+ if self._progress:
+ self._progress.set_progress_total(size)
return size
d.addCallback(_got_size)
return d
class CHKUploader:
server_selector_class = Tahoe2ServerSelector
- def __init__(self, storage_broker, secret_holder):
+ def __init__(self, storage_broker, secret_holder, progress=None):
# server_selector needs storage_broker and secret_holder
self._storage_broker = storage_broker
self._secret_holder = secret_holder
self._log_number = self.log("CHKUploader starting", parent=None)
self._encoder = None
- self._results = UploadResults()
self._storage_index = None
self._upload_status = UploadStatus()
self._upload_status.set_helper(False)
self._upload_status.set_active(True)
- self._upload_status.set_results(self._results)
+ self._progress = progress
# locate_all_shareholders() will create the following attribute:
# self._server_trackers = {} # k: shnum, v: instance of ServerTracker
eu = IEncryptedUploadable(encrypted)
started = time.time()
- self._encoder = e = encode.Encoder(self._log_number,
- self._upload_status)
+ self._encoder = e = encode.Encoder(
+ self._log_number,
+ self._upload_status,
+ progress=self._progress,
+ )
d = e.set_encrypted_uploadable(eu)
d.addCallback(self.locate_all_shareholders, started)
d.addCallback(self.set_shareholders, e)
for st in upload_trackers], already_serverids)
self.log(msgtempl % values, level=log.OPERATIONAL)
# record already-present shares in self._results
- self._results.preexisting_shares = len(already_serverids)
+ self._count_preexisting_shares = len(already_serverids)
self._server_trackers = {} # k: shnum, v: instance of ServerTracker
for tracker in upload_trackers:
encoder.set_shareholders(buckets, servermap)
def _encrypted_done(self, verifycap):
- """ Returns a Deferred that will fire with the UploadResults instance. """
- r = self._results
- for shnum in self._encoder.get_shares_placed():
- server_tracker = self._server_trackers[shnum]
- serverid = server_tracker.get_serverid()
- r.sharemap.add(shnum, serverid)
- r.servermap.add(serverid, shnum)
- r.pushed_shares = len(self._encoder.get_shares_placed())
+ """Returns a Deferred that will fire with the UploadResults instance."""
+ e = self._encoder
+ sharemap = dictutil.DictOfSets()
+ servermap = dictutil.DictOfSets()
+ for shnum in e.get_shares_placed():
+ server = self._server_trackers[shnum].get_server()
+ sharemap.add(shnum, server)
+ servermap.add(server, shnum)
now = time.time()
- r.file_size = self._encoder.file_size
- r.timings["total"] = now - self._started
- r.timings["storage_index"] = self._storage_index_elapsed
- r.timings["peer_selection"] = self._server_selection_elapsed
- r.timings.update(self._encoder.get_times())
- r.uri_extension_data = self._encoder.get_uri_extension_data()
- r.verifycapstr = verifycap.to_string()
- return r
+ timings = {}
+ timings["total"] = now - self._started
+ timings["storage_index"] = self._storage_index_elapsed
+ timings["peer_selection"] = self._server_selection_elapsed
+ timings.update(e.get_times())
+ ur = UploadResults(file_size=e.file_size,
+ ciphertext_fetched=0,
+ preexisting_shares=self._count_preexisting_shares,
+ pushed_shares=len(e.get_shares_placed()),
+ sharemap=sharemap,
+ servermap=servermap,
+ timings=timings,
+ uri_extension_data=e.get_uri_extension_data(),
+ uri_extension_hash=e.get_uri_extension_hash(),
+ verifycapstr=verifycap.to_string())
+ self._upload_status.set_results(ur)
+ return ur
def get_upload_status(self):
return self._upload_status
class LiteralUploader:
- def __init__(self):
- self._results = UploadResults()
+ def __init__(self, progress=None):
self._status = s = UploadStatus()
s.set_storage_index(None)
s.set_helper(False)
s.set_progress(0, 1.0)
s.set_active(False)
- s.set_results(self._results)
+ self._progress = progress
def start(self, uploadable):
uploadable = IUploadable(uploadable)
def _got_size(size):
self._size = size
self._status.set_size(size)
- self._results.file_size = size
+ if self._progress:
+ self._progress.set_progress_total(size)
return read_this_many_bytes(uploadable, size)
d.addCallback(_got_size)
d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
return d
def _build_results(self, uri):
- self._results.uri = uri
+ ur = UploadResults(file_size=self._size,
+ ciphertext_fetched=0,
+ preexisting_shares=0,
+ pushed_shares=0,
+ sharemap={},
+ servermap={},
+ timings={},
+ uri_extension_data=None,
+ uri_extension_hash=None,
+ verifycapstr=None)
+ ur.set_uri(uri)
self._status.set_status("Finished")
self._status.set_progress(1, 1.0)
self._status.set_progress(2, 1.0)
- return self._results
+ self._status.set_results(ur)
+ if self._progress:
+ self._progress.set_progress(self._size)
+ return ur
def close(self):
pass
class AssistedUploader:
- def __init__(self, helper):
+ def __init__(self, helper, storage_broker):
self._helper = helper
+ self._storage_broker = storage_broker
self._log_number = log.msg("AssistedUploader starting")
self._storage_index = None
self._upload_status = s = UploadStatus()
d.addCallback(self._contacted_helper)
return d
- def _contacted_helper(self, (upload_results, upload_helper)):
+ def _contacted_helper(self, (helper_upload_results, upload_helper)):
now = time.time()
elapsed = now - self._time_contacting_helper_start
self._elapsed_time_contacting_helper = elapsed
return d
self.log("helper says file is already uploaded", level=log.OPERATIONAL)
self._upload_status.set_progress(1, 1.0)
- self._upload_status.set_results(upload_results)
- return upload_results
+ return helper_upload_results
def _convert_old_upload_results(self, upload_results):
# pre-1.3.0 helpers return upload results which contain a mapping
if str in [type(v) for v in sharemap.values()]:
upload_results.sharemap = None
- def _build_verifycap(self, upload_results):
+ def _build_verifycap(self, helper_upload_results):
self.log("upload finished, building readcap", level=log.OPERATIONAL)
- self._convert_old_upload_results(upload_results)
+ self._convert_old_upload_results(helper_upload_results)
self._upload_status.set_status("Building Readcap")
- r = upload_results
- assert r.uri_extension_data["needed_shares"] == self._needed_shares
- assert r.uri_extension_data["total_shares"] == self._total_shares
- assert r.uri_extension_data["segment_size"] == self._segment_size
- assert r.uri_extension_data["size"] == self._size
- r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
- uri_extension_hash=r.uri_extension_hash,
- needed_shares=self._needed_shares,
- total_shares=self._total_shares, size=self._size
- ).to_string()
+ hur = helper_upload_results
+ assert hur.uri_extension_data["needed_shares"] == self._needed_shares
+ assert hur.uri_extension_data["total_shares"] == self._total_shares
+ assert hur.uri_extension_data["segment_size"] == self._segment_size
+ assert hur.uri_extension_data["size"] == self._size
+
+ # hur.verifycap doesn't exist if already found
+ v = uri.CHKFileVerifierURI(self._storage_index,
+ uri_extension_hash=hur.uri_extension_hash,
+ needed_shares=self._needed_shares,
+ total_shares=self._total_shares,
+ size=self._size)
+ timings = {}
+ timings["storage_index"] = self._storage_index_elapsed
+ timings["contacting_helper"] = self._elapsed_time_contacting_helper
+ for key,val in hur.timings.items():
+ if key == "total":
+ key = "helper_total"
+ timings[key] = val
now = time.time()
- r.file_size = self._size
- r.timings["storage_index"] = self._storage_index_elapsed
- r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
- if "total" in r.timings:
- r.timings["helper_total"] = r.timings["total"]
- r.timings["total"] = now - self._started
+ timings["total"] = now - self._started
+
+ gss = self._storage_broker.get_stub_server
+ sharemap = {}
+ servermap = {}
+ for shnum, serverids in hur.sharemap.items():
+ sharemap[shnum] = set([gss(serverid) for serverid in serverids])
+ # if the file was already in the grid, hur.servermap is an empty dict
+ for serverid, shnums in hur.servermap.items():
+ servermap[gss(serverid)] = set(shnums)
+
+ ur = UploadResults(file_size=self._size,
+ # not if already found
+ ciphertext_fetched=hur.ciphertext_fetched,
+ preexisting_shares=hur.preexisting_shares,
+ pushed_shares=hur.pushed_shares,
+ sharemap=sharemap,
+ servermap=servermap,
+ timings=timings,
+ uri_extension_data=hur.uri_extension_data,
+ uri_extension_hash=hur.uri_extension_hash,
+ verifycapstr=v.to_string())
+
self._upload_status.set_status("Finished")
- self._upload_status.set_results(r)
- return r
+ self._upload_status.set_results(ur)
+ return ur
def get_upload_status(self):
return self._upload_status
class BaseUploadable:
# this is overridden by max_segment_size
default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
- default_encoding_param_k = 3 # overridden by encoding_parameters
- default_encoding_param_happy = 7
- default_encoding_param_n = 10
+ default_params_set = False
max_segment_size = None
encoding_param_k = None
self.default_encoding_param_n = default_params["n"]
if "max_segment_size" in default_params:
self.default_max_segment_size = default_params["max_segment_size"]
+ self.default_params_set = True
def get_all_encoding_parameters(self):
+ _assert(self.default_params_set, "set_default_encoding_parameters not called on %r" % (self,))
if self._all_encoding_parameters:
return defer.succeed(self._all_encoding_parameters)
def get_size(self):
if self._size is not None:
return defer.succeed(self._size)
- self._filehandle.seek(0,2)
+ self._filehandle.seek(0, os.SEEK_END)
size = self._filehandle.tell()
self._size = size
self._filehandle.seek(0)
name = "uploader"
URI_LIT_SIZE_THRESHOLD = 55
- def __init__(self, helper_furl=None, stats_provider=None):
+ def __init__(self, helper_furl=None, stats_provider=None, history=None, progress=None):
self._helper_furl = helper_furl
self.stats_provider = stats_provider
+ self._history = history
self._helper = None
self._all_uploads = weakref.WeakKeyDictionary() # for debugging
+ self._progress = progress
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
service.MultiService.__init__(self)
return (self._helper_furl, bool(self._helper))
- def upload(self, uploadable, history=None):
+ def upload(self, uploadable, progress=None):
"""
Returns a Deferred that will fire with the UploadResults instance.
"""
assert self.parent
assert self.running
+ assert progress is None or IProgress.providedBy(progress)
uploadable = IUploadable(uploadable)
d = uploadable.get_size()
precondition(isinstance(default_params, dict), default_params)
precondition("max_segment_size" in default_params, default_params)
uploadable.set_default_encoding_parameters(default_params)
+ if progress:
+ progress.set_progress_total(size)
if self.stats_provider:
self.stats_provider.count('uploader.files_uploaded', 1)
self.stats_provider.count('uploader.bytes_uploaded', size)
if size <= self.URI_LIT_SIZE_THRESHOLD:
- uploader = LiteralUploader()
+ uploader = LiteralUploader(progress=progress)
return uploader.start(uploadable)
else:
eu = EncryptAnUploadable(uploadable, self._parentmsgid)
d2 = defer.succeed(None)
+ storage_broker = self.parent.get_storage_broker()
if self._helper:
- uploader = AssistedUploader(self._helper)
+ uploader = AssistedUploader(self._helper, storage_broker)
d2.addCallback(lambda x: eu.get_storage_index())
d2.addCallback(lambda si: uploader.start(eu, si))
else:
storage_broker = self.parent.get_storage_broker()
secret_holder = self.parent._secret_holder
- uploader = CHKUploader(storage_broker, secret_holder)
+ uploader = CHKUploader(storage_broker, secret_holder, progress=progress)
d2.addCallback(lambda x: uploader.start(eu))
self._all_uploads[uploader] = None
- if history:
- history.add_upload(uploader.get_upload_status())
+ if self._history:
+ self._history.add_upload(uploader.get_upload_status())
def turn_verifycap_into_read_cap(uploadresults):
# Generate the uri from the verifycap plus the key.
d3 = uploadable.get_encryption_key()
def put_readcap_into_results(key):
- v = uri.from_string(uploadresults.verifycapstr)
+ v = uri.from_string(uploadresults.get_verifycapstr())
r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
- uploadresults.uri = r.to_string()
+ uploadresults.set_uri(r.to_string())
return uploadresults
d3.addCallback(put_readcap_into_results)
return d3