]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
add upload-status objects, to track upload progress
authorBrian Warner <warner@allmydata.com>
Tue, 12 Feb 2008 22:36:05 +0000 (15:36 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 12 Feb 2008 22:36:05 +0000 (15:36 -0700)
src/allmydata/encode.py
src/allmydata/interfaces.py
src/allmydata/offloaded.py
src/allmydata/upload.py

index d662e173156d5a06b3a595a852160111b9017663..993b12f1493209ff85976051e722563656d84f28 100644 (file)
@@ -10,7 +10,7 @@ from allmydata.util import mathutil, hashutil, idlib, log
 from allmydata.util.assertutil import _assert, precondition
 from allmydata.codec import CRSEncoder
 from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
-     IEncryptedUploadable
+     IEncryptedUploadable, IUploadStatus
 
 """
 The goal of the encoder is to turn the original file into a series of
@@ -74,10 +74,13 @@ PiB=1024*TiB
 class Encoder(object):
     implements(IEncoder)
 
-    def __init__(self, log_parent=None):
+    def __init__(self, log_parent=None, upload_status=None):
         object.__init__(self)
         self.uri_extension_data = {}
         self._codec = None
+        self._status = None
+        if upload_status:
+            self._status = IUploadStatus(upload_status)
         precondition(log_parent is None or isinstance(log_parent, int),
                      log_parent)
         self._log_number = log.msg("creating Encoder %s" % self,
@@ -247,6 +250,18 @@ class Encoder(object):
         d.addCallbacks(lambda res: self.done(), self.err)
         return d
 
+    def set_status(self, status):
+        if self._status:
+            self._status.set_status(status)
+
+    def set_encode_and_push_progress(self, sent_segments=None, extra=0.0):
+        if self._status:
+            # we treat the final hash+close as an extra segment
+            if sent_segments is None:
+                sent_segments = self.num_segments
+            progress = float(sent_segments + extra) / (self.num_segments + 1)
+            self._status.set_progress(2, progress)
+
     def abort(self):
         self.log("aborting upload", level=log.UNUSUAL)
         assert self._codec, "don't call abort before start"
@@ -269,6 +284,7 @@ class Encoder(object):
 
     def start_all_shareholders(self):
         self.log("starting shareholders", level=log.NOISY)
+        self.set_status("Starting shareholders")
         dl = []
         for shareid in self.landlords:
             d = self.landlords[shareid].start()
@@ -409,6 +425,9 @@ class Encoder(object):
                 shareids=shareids, landlords=self.landlords)
         start = time.time()
         dl = []
+        self.set_status("Sending segment %d of %d" % (segnum+1,
+                                                      self.num_segments))
+        self.set_encode_and_push_progress(segnum)
         lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY)
         for i in range(len(shares)):
             subshare = shares[i]
@@ -488,6 +507,8 @@ class Encoder(object):
 
     def finish_hashing(self):
         self._start_hashing_and_close_timestamp = time.time()
+        self.set_status("Finishing hashes")
+        self.set_encode_and_push_progress(extra=0.0)
         crypttext_hash = self._crypttext_hasher.digest()
         self.uri_extension_data["crypttext_hash"] = crypttext_hash
         d = self._uploadable.get_plaintext_hash()
@@ -509,6 +530,8 @@ class Encoder(object):
 
     def send_plaintext_hash_tree_to_all_shareholders(self):
         self.log("sending plaintext hash tree", level=log.NOISY)
+        self.set_status("Sending Plaintext Hash Tree")
+        self.set_encode_and_push_progress(extra=0.2)
         dl = []
         for shareid in self.landlords.keys():
             d = self.send_plaintext_hash_tree(shareid,
@@ -526,6 +549,8 @@ class Encoder(object):
 
     def send_crypttext_hash_tree_to_all_shareholders(self):
         self.log("sending crypttext hash tree", level=log.NOISY)
+        self.set_status("Sending Crypttext Hash Tree")
+        self.set_encode_and_push_progress(extra=0.3)
         t = HashTree(self._crypttext_hashes)
         all_hashes = list(t)
         self.uri_extension_data["crypttext_root_hash"] = t[0]
@@ -544,6 +569,8 @@ class Encoder(object):
 
     def send_all_subshare_hash_trees(self):
         self.log("sending subshare hash trees", level=log.NOISY)
+        self.set_status("Sending Subshare Hash Trees")
+        self.set_encode_and_push_progress(extra=0.4)
         dl = []
         for shareid,hashes in enumerate(self.subshare_hashes):
             # hashes is a list of the hashes of all subshares that were sent
@@ -571,6 +598,8 @@ class Encoder(object):
         # not include the top-level hash root (which is stored securely in
         # the URI instead).
         self.log("sending all share hash trees", level=log.NOISY)
+        self.set_status("Sending Share Hash Trees")
+        self.set_encode_and_push_progress(extra=0.6)
         dl = []
         for h in self.share_root_hashes:
             assert h
@@ -597,6 +626,8 @@ class Encoder(object):
 
     def send_uri_extension_to_all_shareholders(self):
         lp = self.log("sending uri_extension", level=log.NOISY)
+        self.set_status("Sending URI Extensions")
+        self.set_encode_and_push_progress(extra=0.8)
         for k in ('crypttext_root_hash', 'crypttext_hash',
                   'plaintext_root_hash', 'plaintext_hash',
                   ):
@@ -623,6 +654,8 @@ class Encoder(object):
 
     def close_all_shareholders(self):
         self.log("closing shareholders", level=log.NOISY)
+        self.set_status("Closing Shareholders")
+        self.set_encode_and_push_progress(extra=0.9)
         dl = []
         for shareid in self.landlords:
             d = self.landlords[shareid].close()
@@ -632,6 +665,8 @@ class Encoder(object):
 
     def done(self):
         self.log("upload done", level=log.OPERATIONAL)
+        self.set_status("Done")
+        self.set_encode_and_push_progress(extra=1.0) # done
         now = time.time()
         h_and_c_elapsed = now - self._start_hashing_and_close_timestamp
         self._times["hashes_and_close"] = h_and_c_elapsed
@@ -645,6 +680,7 @@ class Encoder(object):
 
     def err(self, f):
         self.log("upload failed", failure=f, level=log.UNUSUAL)
+        self.set_status("Failed")
         # we need to abort any remaining shareholders, so they'll delete the
         # partial share, allowing someone else to upload it again.
         self.log("aborting shareholders", level=log.UNUSUAL)
index bf61994d3520aa1ad640e253b896fcb21e953d1d..64ae64699fa1aedb19d90d4dd006e1062b138040 100644 (file)
@@ -1104,6 +1104,13 @@ class IDownloader(Interface):
         when the download is finished, or errbacks if something went wrong."""
 
 class IEncryptedUploadable(Interface):
+    def set_upload_status(upload_status):
+        """Provide an IUploadStatus object that should be filled with status
+        information. The IEncryptedUploadable is responsible for setting
+        key-determination progress ('chk'), size, storage_index, and
+        ciphertext-fetch progress. It may delegate some of this
+        responsibility to others, in particular to the IUploadable."""
+
     def get_size():
         """This behaves just like IUploadable.get_size()."""
 
@@ -1165,6 +1172,11 @@ class IEncryptedUploadable(Interface):
         """Just like IUploadable.close()."""
 
 class IUploadable(Interface):
+    def set_upload_status(upload_status):
+        """Provide an IUploadStatus object that should be filled with status
+        information. The IUploadable is responsible for setting
+        key-determination progress ('chk')."""
+
     def set_default_encoding_parameters(params):
         """Set the default encoding parameters, which must be a dict mapping
         strings to ints. The meaningful keys are 'k', 'happy', 'n', and
@@ -1362,6 +1374,32 @@ class IClient(Interface):
                  IDirectoryNode-providing instances, like NewDirectoryNode.
         """
 
+class IUploadStatus(Interface):
+    def get_storage_index():
+        """Return a string with the (binary) storage index in use on this
+        upload. Returns None if the storage index has not yet been
+        calculated."""
+    def get_size():
+        """Return an integer with the number of bytes that will eventually
+        be uploaded for this file. Returns None if the size is not yet known.
+        """
+    def using_helper():
+        """Return True if this upload is using a Helper, False if not."""
+    def get_status():
+        """Return a string describing the current state of the upload
+        process."""
+    def get_progress():
+        """Returns a tuple of floats, (chk, ciphertext, encode_and_push),
+        each from 0.0 to 1.0 . 'chk' describes how much progress has been
+        made towards hashing the file to determine a CHK encryption key: if
+        non-convergent encryption is in use, this will be trivial, otherwise
+        the whole file must be hashed. 'ciphertext' describes how much of the
+        ciphertext has been pushed to the helper, and is '1.0' for non-helper
+        uploads. 'encode_and_push' describes how much of the encode-and-push
+        process has finished: for helper uploads this is dependent upon the
+        helper providing progress reports. It might be reasonable to add all
+        three numbers and report the sum to the user."""
+
 
 class NotCapableError(Exception):
     """You have tried to write to a read-only node."""
index 319e05aed723412fe3de2db94f4d97b920deb72a..acc241381a96e3391d03f1c109aa2b8952c0dc81 100644 (file)
@@ -139,6 +139,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
         upload_id = idlib.b2a(storage_index)[:6]
         self._log_number = log_number
         self._results = results
+        self._upload_status = upload.UploadStatus()
+        self._upload_status.set_helper(False)
         self._helper.log("CHKUploadHelper starting for SI %s" % upload_id,
                          parent=log_number)
 
@@ -416,6 +418,10 @@ class LocalCiphertextReader(AskUntilSuccessMixin):
         self._upload_helper = upload_helper
         self._storage_index = storage_index
         self._encoding_file = encoding_file
+        self._status = None
+
+    def set_upload_status(self, upload_status):
+        self._status = interfaces.IUploadStatus(upload_status)
 
     def start(self):
         self._size = os.stat(self._encoding_file)[stat.ST_SIZE]
index 2b150d0b596a241c143e1add6d44b763c2d4de3c..3ea412bc6deae68d5d4fc58dc436674a4c666ee8 100644 (file)
@@ -1,5 +1,5 @@
 
-import os, time
+import os, time, weakref
 from zope.interface import implements
 from twisted.python import failure
 from twisted.internet import defer
@@ -16,7 +16,7 @@ from allmydata import encode, storage, hashtree, uri
 from allmydata.util import idlib, mathutil
 from allmydata.util.assertutil import precondition
 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
-     IEncryptedUploadable, RIEncryptedUploadable
+     IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus
 from pycryptopp.cipher.aes import AES
 
 from cStringIO import StringIO
@@ -113,12 +113,13 @@ class PeerTracker:
 
 class Tahoe2PeerSelector:
 
-    def __init__(self, upload_id, logparent=None):
+    def __init__(self, upload_id, logparent=None, upload_status=None):
         self.upload_id = upload_id
         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
         self.error_count = 0
         self.num_peers_contacted = 0
         self.last_failure_msg = None
+        self._status = IUploadStatus(upload_status)
         self._log_parent = log.msg("%s starting" % self, parent=logparent)
 
     def __repr__(self):
@@ -132,6 +133,9 @@ class Tahoe2PeerSelector:
                  shares for us
         """
 
+        if self._status:
+            self._status.set_status("Contacting Peers..")
+
         self.total_shares = total_shares
         self.shares_of_happiness = shares_of_happiness
 
@@ -204,6 +208,11 @@ class Tahoe2PeerSelector:
             shares_to_ask = set([self.homeless_shares.pop(0)])
             self.query_count += 1
             self.num_peers_contacted += 1
+            if self._status:
+                self._status.set_status("Contacting Peers [%s] (first query),"
+                                        " %d shares left.."
+                                        % (idlib.shortnodeid_b2a(peer.peerid),
+                                           len(self.homeless_shares)))
             d = peer.query(shares_to_ask)
             d.addBoth(self._got_response, peer, shares_to_ask,
                       self.contacted_peers)
@@ -220,6 +229,11 @@ class Tahoe2PeerSelector:
             shares_to_ask = set(self.homeless_shares[:num_shares])
             self.homeless_shares[:num_shares] = []
             self.query_count += 1
+            if self._status:
+                self._status.set_status("Contacting Peers [%s] (second query),"
+                                        " %d shares left.."
+                                        % (idlib.shortnodeid_b2a(peer.peerid),
+                                           len(self.homeless_shares)))
             d = peer.query(shares_to_ask)
             d.addBoth(self._got_response, peer, shares_to_ask,
                       self.contacted_peers2)
@@ -250,6 +264,8 @@ class Tahoe2PeerSelector:
                 raise encode.NotEnoughPeersError(msg)
             else:
                 # we placed enough to be happy, so we're done
+                if self._status:
+                    self._status.set_status("Placed all shares")
                 return self.use_peers
 
     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
@@ -339,6 +355,12 @@ class EncryptAnUploadable:
         self._plaintext_segment_hashes = []
         self._encoding_parameters = None
         self._file_size = None
+        self._ciphertext_bytes_read = 0
+        self._status = None
+
+    def set_upload_status(self, upload_status):
+        self._status = IUploadStatus(upload_status)
+        self.original.set_upload_status(upload_status)
 
     def log(self, *args, **kwargs):
         if "facility" not in kwargs:
@@ -351,6 +373,8 @@ class EncryptAnUploadable:
         d = self.original.get_size()
         def _got_size(size):
             self._file_size = size
+            if self._status:
+                self._status.set_size(size)
             return size
         d.addCallback(_got_size)
         return d
@@ -384,7 +408,8 @@ class EncryptAnUploadable:
             # specify that it is truncated to the same 128 bits as the AES key.
             assert len(storage_index) == 16  # SHA-256 truncated to 128b
             self._storage_index = storage_index
-
+            if self._status:
+                self._status.set_storage_index(storage_index)
             return e
         d.addCallback(_got)
         return d
@@ -432,6 +457,8 @@ class EncryptAnUploadable:
     def read_encrypted(self, length, hash_only):
         # make sure our parameters have been set up first
         d = self.get_all_encoding_parameters()
+        # and size
+        d.addCallback(lambda ignored: self.get_size())
         d.addCallback(lambda ignored: self._get_encryptor())
         # then fetch and encrypt the plaintext. The unusual structure here
         # (passing a Deferred *into* a function) is needed to avoid
@@ -481,10 +508,12 @@ class EncryptAnUploadable:
         cryptdata = []
         # we use data.pop(0) instead of 'for chunk in data' to save
         # memory: each chunk is destroyed as soon as we're done with it.
+        bytes_processed = 0
         while data:
             chunk = data.pop(0)
             log.msg(" read_encrypted handling %dB-sized chunk" % len(chunk),
                     level=log.NOISY)
+            bytes_processed += len(chunk)
             self._plaintext_hasher.update(chunk)
             self._update_segment_hash(chunk)
             # TODO: we have to encrypt the data (even if hash_only==True)
@@ -499,6 +528,10 @@ class EncryptAnUploadable:
                 cryptdata.append(ciphertext)
             del ciphertext
             del chunk
+        self._ciphertext_bytes_read += bytes_processed
+        if self._status:
+            progress = float(self._ciphertext_bytes_read) / self._file_size
+            self._status.set_progress(1, progress)
         return cryptdata
 
 
@@ -526,6 +559,38 @@ class EncryptAnUploadable:
     def close(self):
         return self.original.close()
 
+class UploadStatus:
+    implements(IUploadStatus)
+
+    def __init__(self):
+        self.storage_index = None
+        self.size = None
+        self.helper = False
+        self.status = "Not started"
+        self.progress = [0.0, 0.0, 0.0]
+
+    def get_storage_index(self):
+        return self.storage_index
+    def get_size(self):
+        return self.size
+    def using_helper(self):
+        return self.helper
+    def get_status(self):
+        return self.status
+    def get_progress(self):
+        return tuple(self.progress)
+
+    def set_storage_index(self, si):
+        self.storage_index = si
+    def set_size(self, size):
+        self.size = size
+    def set_helper(self, helper):
+        self.helper = helper
+    def set_status(self, status):
+        self.status = status
+    def set_progress(self, which, value):
+        # [0]: chk, [1]: ciphertext, [2]: encode+push
+        self.progress[which] = value
 
 class CHKUploader:
     peer_selector_class = Tahoe2PeerSelector
@@ -535,6 +600,9 @@ class CHKUploader:
         self._log_number = self._client.log("CHKUploader starting")
         self._encoder = None
         self._results = UploadResults()
+        self._storage_index = None
+        self._upload_status = UploadStatus()
+        self._upload_status.set_helper(False)
 
     def log(self, *args, **kwargs):
         if "parent" not in kwargs:
@@ -554,6 +622,7 @@ class CHKUploader:
         self.log("starting upload of %s" % uploadable)
 
         eu = EncryptAnUploadable(uploadable)
+        eu.set_upload_status(self._upload_status)
         d = self.start_encrypted(eu)
         def _uploaded(res):
             d1 = uploadable.get_encryption_key()
@@ -575,7 +644,8 @@ class CHKUploader:
         eu = IEncryptedUploadable(encrypted)
 
         started = time.time()
-        self._encoder = e = encode.Encoder(self._log_number)
+        self._encoder = e = encode.Encoder(self._log_number,
+                                           self._upload_status)
         d = e.set_encrypted_uploadable(eu)
         d.addCallback(self.locate_all_shareholders, started)
         d.addCallback(self.set_shareholders, e)
@@ -588,9 +658,11 @@ class CHKUploader:
         peer_selection_started = now = time.time()
         self._storage_index_elapsed = now - started
         storage_index = encoder.get_param("storage_index")
+        self._storage_index = storage_index
         upload_id = idlib.b2a(storage_index)[:6]
         self.log("using storage index %s" % upload_id)
-        peer_selector = self.peer_selector_class(upload_id, self._log_number)
+        peer_selector = self.peer_selector_class(upload_id, self._log_number,
+                                                 self._upload_status)
 
         share_size = encoder.get_param("share_size")
         block_size = encoder.get_param("block_size")
@@ -657,6 +729,8 @@ class CHKUploader:
         r.uri = u.to_string()
         return r
 
+    def get_upload_status(self):
+        return self._upload_status
 
 def read_this_many_bytes(uploadable, size, prepend_data=[]):
     if size == 0:
@@ -680,11 +754,17 @@ class LiteralUploader:
     def __init__(self, client):
         self._client = client
         self._results = UploadResults()
+        self._status = s = UploadStatus()
+        s.set_storage_index(None)
+        s.set_helper(False)
+        s.set_progress(0, 1.0)
 
     def start(self, uploadable):
         uploadable = IUploadable(uploadable)
         d = uploadable.get_size()
         def _got_size(size):
+            self._size = size
+            self._status.set_size(size)
             self._results.file_size = size
             return read_this_many_bytes(uploadable, size)
         d.addCallback(_got_size)
@@ -695,21 +775,41 @@ class LiteralUploader:
 
     def _build_results(self, uri):
         self._results.uri = uri
+        self._status.set_status("Done")
+        self._status.set_progress(1, 1.0)
+        self._status.set_progress(2, 1.0)
         return self._results
 
     def close(self):
         pass
 
+    def get_upload_status(self):
+        return self._status
+
 class RemoteEncryptedUploadable(Referenceable):
     implements(RIEncryptedUploadable)
 
-    def __init__(self, encrypted_uploadable):
+    def __init__(self, encrypted_uploadable, upload_status):
         self._eu = IEncryptedUploadable(encrypted_uploadable)
         self._offset = 0
         self._bytes_sent = 0
+        self._status = IUploadStatus(upload_status)
+        # we are responsible for updating the status string while we run, and
+        # for setting the ciphertext-fetch progress.
+        self._size = None
+
+    def get_size(self):
+        if self._size is not None:
+            return defer.succeed(self._size)
+        d = self._eu.get_size()
+        def _got_size(size):
+            self._size = size
+            return size
+        d.addCallback(_got_size)
+        return d
 
     def remote_get_size(self):
-        return self._eu.get_size()
+        return self.get_size()
     def remote_get_all_encoding_parameters(self):
         return self._eu.get_all_encoding_parameters()
 
@@ -771,6 +871,9 @@ class AssistedUploader:
     def __init__(self, helper):
         self._helper = helper
         self._log_number = log.msg("AssistedUploader starting")
+        self._storage_index = None
+        self._upload_status = s = UploadStatus()
+        s.set_helper(True)
 
     def log(self, msg, parent=None, **kwargs):
         if parent is None:
@@ -781,6 +884,7 @@ class AssistedUploader:
         self._started = time.time()
         u = IUploadable(uploadable)
         eu = EncryptAnUploadable(u)
+        eu.set_upload_status(self._upload_status)
         self._encuploadable = eu
         d = eu.get_size()
         d.addCallback(self._got_size)
@@ -800,6 +904,7 @@ class AssistedUploader:
 
     def _got_size(self, size):
         self._size = size
+        self._upload_status.set_size(size)
 
     def _got_all_encoding_parameters(self, params):
         k, happy, n, segment_size = params
@@ -819,6 +924,7 @@ class AssistedUploader:
         now = self._time_contacting_helper_start = time.time()
         self._storage_index_elapsed = now - self._started
         self.log("contacting helper..")
+        self._upload_status.set_status("Contacting Helper")
         d = self._helper.callRemote("upload_chk", self._storage_index)
         d.addCallback(self._contacted_helper)
         return d
@@ -829,16 +935,23 @@ class AssistedUploader:
         self._elapsed_time_contacting_helper = elapsed
         if upload_helper:
             self.log("helper says we need to upload")
+            self._upload_status.set_status("Uploading Ciphertext")
             # we need to upload the file
-            reu = RemoteEncryptedUploadable(self._encuploadable)
-            d = upload_helper.callRemote("upload", reu)
+            reu = RemoteEncryptedUploadable(self._encuploadable,
+                                            self._upload_status)
+            # let it pre-compute the size for progress purposes
+            d = reu.get_size()
+            d.addCallback(lambda ignored:
+                          upload_helper.callRemote("upload", reu))
             # this Deferred will fire with the upload results
             return d
         self.log("helper says file is already uploaded")
+        self._upload_status.set_progress(1, 1.0)
         return upload_results
 
     def _build_readcap(self, upload_results):
         self.log("upload finished, building readcap")
+        self._upload_status.set_status("Building Readcap")
         r = upload_results
         assert r.uri_extension_data["needed_shares"] == self._needed_shares
         assert r.uri_extension_data["total_shares"] == self._total_shares
@@ -858,8 +971,12 @@ class AssistedUploader:
         if "total" in r.timings:
             r.timings["helper_total"] = r.timings["total"]
         r.timings["total"] = now - self._started
+        self._upload_status.set_status("Done")
         return r
 
+    def get_upload_status(self):
+        return self._upload_status
+
 class BaseUploadable:
     default_max_segment_size = 1*MiB # overridden by max_segment_size
     default_encoding_param_k = 3 # overridden by encoding_parameters
@@ -872,6 +989,10 @@ class BaseUploadable:
     encoding_param_n = None
 
     _all_encoding_parameters = None
+    _status = None
+
+    def set_upload_status(self, upload_status):
+        self._status = IUploadStatus(upload_status)
 
     def set_default_encoding_parameters(self, default_params):
         assert isinstance(default_params, dict)
@@ -915,25 +1036,38 @@ class FileHandle(BaseUploadable):
         self._filehandle = filehandle
         self._key = None
         self._contenthashkey = contenthashkey
+        self._size = None
 
     def _get_encryption_key_content_hash(self):
         if self._key is not None:
             return defer.succeed(self._key)
 
-        d = self.get_all_encoding_parameters()
+        d = self.get_size()
+        # that sets self._size as a side-effect
+        d.addCallback(lambda size: self.get_all_encoding_parameters())
         def _got(params):
             k, happy, n, segsize = params
             f = self._filehandle
             enckey_hasher = content_hash_key_hasher(k, n, segsize)
             f.seek(0)
             BLOCKSIZE = 64*1024
+            bytes_read = 0
             while True:
                 data = f.read(BLOCKSIZE)
                 if not data:
                     break
                 enckey_hasher.update(data)
+                # TODO: setting progress in a non-yielding loop is kind of
+                # pointless, but I'm anticipating (perhaps prematurely) the
+                # day when we use a slowjob or twisted's CooperatorService to
+                # make this yield time to other jobs.
+                bytes_read += len(data)
+                if self._status:
+                    self._status.set_progress(0, float(bytes_read)/self._size)
             f.seek(0)
             self._key = enckey_hasher.digest()
+            if self._status:
+                self._status.set_progress(0, 1.0)
             assert len(self._key) == 16
             return self._key
         d.addCallback(_got)
@@ -951,8 +1085,11 @@ class FileHandle(BaseUploadable):
             return self._get_encryption_key_random()
 
     def get_size(self):
+        if self._size is not None:
+            return defer.succeed(self._size)
         self._filehandle.seek(0,2)
         size = self._filehandle.tell()
+        self._size = size
         self._filehandle.seek(0)
         return defer.succeed(size)
 
@@ -985,6 +1122,7 @@ class Uploader(service.MultiService):
     def __init__(self, helper_furl=None):
         self._helper_furl = helper_furl
         self._helper = None
+        self._all_uploads = weakref.WeakKeyDictionary()
         service.MultiService.__init__(self)
 
     def startService(self):
@@ -1021,6 +1159,7 @@ class Uploader(service.MultiService):
                 uploader = AssistedUploader(self._helper)
             else:
                 uploader = self.uploader_class(self.parent)
+            self._all_uploads[uploader.get_upload_status()] = None
             return uploader.start(uploadable)
         d.addCallback(_got_size)
         def _done(res):