from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
NoServersError, InsufficientVersionError, UploadUnhappinessError, \
- DEFAULT_MAX_SEGMENT_SIZE
+ DEFAULT_MAX_SEGMENT_SIZE, IProgress
from allmydata.immutable import layout
from pycryptopp.cipher.aes import AES
implements(IEncryptedUploadable)
CHUNKSIZE = 50*1024
- def __init__(self, original, log_parent=None):
+ def __init__(self, original, log_parent=None, progress=None):
precondition(original.default_params_set,
"set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,))
self.original = IUploadable(original)
self._file_size = None
self._ciphertext_bytes_read = 0
self._status = None
+ self._progress = progress
def set_upload_status(self, upload_status):
self._status = IUploadStatus(upload_status)
self._file_size = size
if self._status:
self._status.set_size(size)
+ if self._progress:
+ self._progress.set_progress_total(size)
return size
d.addCallback(_got_size)
return d
class CHKUploader:
server_selector_class = Tahoe2ServerSelector
- def __init__(self, storage_broker, secret_holder):
+ def __init__(self, storage_broker, secret_holder, progress=None):
# server_selector needs storage_broker and secret_holder
self._storage_broker = storage_broker
self._secret_holder = secret_holder
self._upload_status = UploadStatus()
self._upload_status.set_helper(False)
self._upload_status.set_active(True)
+ self._progress = progress
# locate_all_shareholders() will create the following attribute:
# self._server_trackers = {} # k: shnum, v: instance of ServerTracker
eu = IEncryptedUploadable(encrypted)
started = time.time()
- self._encoder = e = encode.Encoder(self._log_number,
- self._upload_status)
+ self._encoder = e = encode.Encoder(
+ self._log_number,
+ self._upload_status,
+ progress=self._progress,
+ )
d = e.set_encrypted_uploadable(eu)
d.addCallback(self.locate_all_shareholders, started)
d.addCallback(self.set_shareholders, e)
class LiteralUploader:
- def __init__(self):
+ def __init__(self, progress=None):
self._status = s = UploadStatus()
s.set_storage_index(None)
s.set_helper(False)
s.set_progress(0, 1.0)
s.set_active(False)
+ self._progress = progress
def start(self, uploadable):
uploadable = IUploadable(uploadable)
def _got_size(size):
self._size = size
self._status.set_size(size)
+ if self._progress:
+ self._progress.set_progress_total(size)
return read_this_many_bytes(uploadable, size)
d.addCallback(_got_size)
d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
self._status.set_progress(1, 1.0)
self._status.set_progress(2, 1.0)
self._status.set_results(ur)
+ if self._progress:
+ self._progress.set_progress(self._size)
return ur
def close(self):
name = "uploader"
URI_LIT_SIZE_THRESHOLD = 55
- def __init__(self, helper_furl=None, stats_provider=None, history=None):
+ def __init__(self, helper_furl=None, stats_provider=None, history=None, progress=None):
self._helper_furl = helper_furl
self.stats_provider = stats_provider
self._history = history
self._helper = None
self._all_uploads = weakref.WeakKeyDictionary() # for debugging
+ self._progress = progress
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
service.MultiService.__init__(self)
return (self._helper_furl, bool(self._helper))
- def upload(self, uploadable):
+ def upload(self, uploadable, progress=None):
"""
Returns a Deferred that will fire with the UploadResults instance.
"""
assert self.parent
assert self.running
+ assert progress is None or IProgress.providedBy(progress)
uploadable = IUploadable(uploadable)
d = uploadable.get_size()
precondition(isinstance(default_params, dict), default_params)
precondition("max_segment_size" in default_params, default_params)
uploadable.set_default_encoding_parameters(default_params)
+ if progress:
+ progress.set_progress_total(size)
if self.stats_provider:
self.stats_provider.count('uploader.files_uploaded', 1)
self.stats_provider.count('uploader.bytes_uploaded', size)
if size <= self.URI_LIT_SIZE_THRESHOLD:
- uploader = LiteralUploader()
+ uploader = LiteralUploader(progress=progress)
return uploader.start(uploadable)
else:
eu = EncryptAnUploadable(uploadable, self._parentmsgid)
else:
storage_broker = self.parent.get_storage_broker()
secret_holder = self.parent._secret_holder
- uploader = CHKUploader(storage_broker, secret_holder)
+ uploader = CHKUploader(storage_broker, secret_holder, progress=progress)
d2.addCallback(lambda x: uploader.start(eu))
self._all_uploads[uploader] = None