]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
add upload-results timing info for helper uploads. This changes the Helper protocol...
authorBrian Warner <warner@allmydata.com>
Wed, 6 Feb 2008 08:52:25 +0000 (01:52 -0700)
committerBrian Warner <warner@allmydata.com>
Wed, 6 Feb 2008 08:52:25 +0000 (01:52 -0700)
src/allmydata/encode.py
src/allmydata/interfaces.py
src/allmydata/offloaded.py
src/allmydata/test/test_helper.py
src/allmydata/upload.py
src/allmydata/web/unlinked-upload.xhtml
src/allmydata/webish.py

index 39f1e904e06975d233aad9c66b5398375038642f..1e41ef1202719ad46afced24661a4ccc8a075b4f 100644 (file)
@@ -669,10 +669,3 @@ class Encoder(object):
     def get_times(self):
         # return a dictionary of encode+push timings
         return self._times
-    def get_rates(self):
-        # return a dictionary of encode+push speeds
-        rates = {
-            "encode": self.file_size / self._times["cumulative_encoding"],
-            "push": self.file_size / self._times["cumulative_sending"],
-            }
-        return rates
index ae1fc65c7ae56cccd114829d2b2d291adb2c8853..0312ff0d8a0c4fb1a1bdc9412476bb373884def0 100644 (file)
@@ -1376,7 +1376,7 @@ class RIControlClient(RemoteInterface):
 
         return DictOf(Nodeid, float)
 
-UploadResults = DictOf(str, str)
+UploadResults = Any() #DictOf(str, str)
 
 class RIEncryptedUploadable(RemoteInterface):
     __remote_name__ = "RIEncryptedUploadable.tahoe.allmydata.com"
index 4da21e2c4bd206f2a213da4583c424c832c92c50..7f2a34f4fe92a700bd5bc20deae45d804477614c 100644 (file)
@@ -1,5 +1,5 @@
 
-import os.path, stat
+import os.path, stat, time
 from zope.interface import implements
 from twisted.application import service
 from twisted.internet import defer
@@ -131,13 +131,14 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
 
     def __init__(self, storage_index, helper,
                  incoming_file, encoding_file,
-                 log_number):
+                 results, log_number):
         self._storage_index = storage_index
         self._helper = helper
         self._incoming_file = incoming_file
         self._encoding_file = encoding_file
         upload_id = idlib.b2a(storage_index)[:6]
         self._log_number = log_number
+        self._results = results
         self._helper.log("CHKUploadHelper starting for SI %s" % upload_id,
                          parent=log_number)
 
@@ -159,6 +160,7 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
         return upload.CHKUploader.log(self, *args, **kwargs)
 
     def start(self):
+        self._started = time.time()
         # determine if we need to upload the file. If so, return ({},self) .
         # If not, return (UploadResults,None) .
         self.log("deciding whether to upload the file or not", level=log.NOISY)
@@ -166,15 +168,15 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
             # we have the whole file, and we might be encoding it (or the
             # encode/upload might have failed, and we need to restart it).
             self.log("ciphertext already in place", level=log.UNUSUAL)
-            return ({}, self)
+            return (self._results, self)
         if os.path.exists(self._incoming_file):
             # we have some of the file, but not all of it (otherwise we'd be
             # encoding). The caller might be useful.
             self.log("partial ciphertext already present", level=log.UNUSUAL)
-            return ({}, self)
+            return (self._results, self)
         # we don't remember uploading this file
         self.log("no ciphertext yet", level=log.NOISY)
-        return ({}, self)
+        return (self._results, self)
 
     def remote_upload(self, reader):
         # reader is an RIEncryptedUploadable. I am specified to return an
@@ -190,10 +192,14 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
 
     def _finished(self, res):
         (uri_extension_hash, needed_shares, total_shares, size) = res
-        upload_results = {'uri_extension_hash': uri_extension_hash}
+        r = self._results
+        r.uri_extension_hash = uri_extension_hash
+        f_times = self._fetcher.get_times()
+        r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
+        r.timings["total_fetch"] = f_times["total"]
         self._reader.close()
         os.unlink(self._encoding_file)
-        self._finished_observers.fire(upload_results)
+        self._finished_observers.fire(r)
         self._helper.upload_finished(self._storage_index)
         del self._reader
 
@@ -248,6 +254,10 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
         self._readers = []
         self._started = False
         self._f = None
+        self._times = {
+            "cumulative_fetch": 0.0,
+            "total": 0.0,
+            }
 
     def log(self, *args, **kwargs):
         if "facility" not in kwargs:
@@ -264,6 +274,7 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
         if self._started:
             return
         self._started = True
+        started = time.time()
 
         if os.path.exists(self._encoding_file):
             self.log("ciphertext already present, bypassing fetch",
@@ -276,7 +287,7 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
             # else.
             have = os.stat(self._encoding_file)[stat.ST_SIZE]
             d = self.call("read_encrypted", have-1, 1)
-            d.addCallback(lambda ignored: self._done2())
+            d.addCallback(self._done2, started)
             return
 
         # first, find out how large the file is going to be
@@ -284,6 +295,7 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
         d.addCallback(self._got_size)
         d.addCallback(self._start_reading)
         d.addCallback(self._done)
+        d.addCallback(self._done2, started)
         d.addErrback(self._failed)
 
     def _got_size(self, size):
@@ -327,8 +339,11 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
         # transfer that involves more than a few hundred chunks.
         # 'fire_when_done' lives a long time, but the Deferreds returned by
         # the inner _fetch() call do not.
+        start = time.time()
         d = defer.maybeDeferred(self._fetch)
         def _done(finished):
+            elapsed = time.time() - start
+            self._times["cumulative_fetch"] += elapsed
             if finished:
                 self.log("finished reading ciphertext", level=log.NOISY)
                 fire_when_done.callback(None)
@@ -366,10 +381,11 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
                  size=os.stat(self._incoming_file)[stat.ST_SIZE],
                  level=log.NOISY)
         os.rename(self._incoming_file, self._encoding_file)
-        return self._done2()
 
-    def _done2(self):
+    def _done2(self, _ignored, started):
         self.log("done2", level=log.NOISY)
+        elapsed = time.time() - started
+        self._times["total"] = elapsed
         self._readers = []
         self._done_observers.fire(None)
 
@@ -382,6 +398,8 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
     def when_done(self):
         return self._done_observers.when_fired()
 
+    def get_times(self):
+        return self._times
 
 
 class LocalCiphertextReader(AskUntilSuccessMixin):
@@ -449,6 +467,8 @@ class Helper(Referenceable, service.MultiService):
         return self.parent.log(*args, **kwargs)
 
     def remote_upload_chk(self, storage_index):
+        r = upload.UploadResults()
+        started = time.time()
         si_s = idlib.b2a(storage_index)
         lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
         incoming_file = os.path.join(self._chk_incoming, si_s)
@@ -458,11 +478,14 @@ class Helper(Referenceable, service.MultiService):
             uh = self._active_uploads[storage_index]
             return uh.start()
 
-        d = self._check_for_chk_already_in_grid(storage_index, lp)
-        def _checked(upload_results):
-            if upload_results:
+        d = self._check_for_chk_already_in_grid(storage_index, r, lp)
+        def _checked(already_present):
+            elapsed = time.time() - started
+            r.timings['existence_check'] = elapsed
+            if already_present:
+                # the necessary results are placed in the UploadResults
                 self.log("file already found in grid", parent=lp)
-                return (upload_results, None)
+                return (r, None)
 
             # the file is not present in the grid, by which we mean there are
             # less than 'N' shares available.
@@ -477,7 +500,7 @@ class Helper(Referenceable, service.MultiService):
                 self.log("creating new upload helper", parent=lp)
                 uh = self.chk_upload_helper_class(storage_index, self,
                                                   incoming_file, encoding_file,
-                                                  lp)
+                                                  r, lp)
                 self._active_uploads[storage_index] = uh
             return uh.start()
         d.addCallback(_checked)
@@ -488,7 +511,7 @@ class Helper(Referenceable, service.MultiService):
         d.addErrback(_err)
         return d
 
-    def _check_for_chk_already_in_grid(self, storage_index, lp):
+    def _check_for_chk_already_in_grid(self, storage_index, results, lp):
         # see if this file is already in the grid
         lp2 = self.log("doing a quick check+UEBfetch",
                        parent=lp, level=log.NOISY)
@@ -499,8 +522,8 @@ class Helper(Referenceable, service.MultiService):
             if res:
                 (sharemap, ueb_data, ueb_hash) = res
                 self.log("found file in grid", level=log.NOISY, parent=lp)
-                upload_results = {'uri_extension_hash': ueb_hash}
-                return upload_results
+                results.uri_extension_hash = ueb_hash
+                return True
             return False
         d.addCallback(_checked)
         return d
index 481664e9c701b035f6356e14d7eb5f74b1d9c62d..e1e8b0c6e401eb3443d70ec2b1f24d62e93a504b 100644 (file)
@@ -28,7 +28,8 @@ class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
 
 class CHKUploadHelper_already_uploaded(offloaded.CHKUploadHelper):
     def start(self):
-        res = {'uri_extension_hash': hashutil.uri_extension_hash("")}
+        res = upload.UploadResults()
+        res.uri_extension_hash = hashutil.uri_extension_hash("")
         return (res, None)
 
 class FakeClient(service.MultiService):
index 69a42c6047d74fcda91bd3f4e74b5aa9ef0a092c..72b27faaab2f447bbd97e7666f91213e642ab415 100644 (file)
@@ -4,7 +4,7 @@ from zope.interface import implements
 from twisted.python import failure
 from twisted.internet import defer
 from twisted.application import service
-from foolscap import Referenceable
+from foolscap import Referenceable, Copyable, RemoteCopy
 from foolscap import eventual
 from foolscap.logging import log
 
@@ -36,14 +36,17 @@ class HaveAllPeersError(Exception):
 class TooFullError(Exception):
     pass
 
-class UploadResults:
+class UploadResults(Copyable, RemoteCopy):
     implements(IUploadResults)
+    typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
+    copytype = typeToCopy
+
+    file_size = None
     uri = None
     sharemap = None # dict of shnum to placement string
     servermap = None # dict of peerid to set(shnums)
     def __init__(self):
         self.timings = {} # dict of name to number of seconds
-        self.rates = {} # dict of name to rates (in bytes per second)
 
 # our current uri_extension is 846 bytes for small files, a few bytes
 # more for larger ones (since the filesize is encoded in decimal in a
@@ -597,15 +600,19 @@ class CHKUploader:
     def start_encrypted(self, encrypted):
         eu = IEncryptedUploadable(encrypted)
 
+        started = time.time()
         self._encoder = e = encode.Encoder(self._log_number)
         d = e.set_encrypted_uploadable(eu)
-        d.addCallback(self.locate_all_shareholders)
+        d.addCallback(self.locate_all_shareholders, started)
         d.addCallback(self.set_shareholders, e)
         d.addCallback(lambda res: e.start())
+        d.addCallback(self._encrypted_done)
         # this fires with the uri_extension_hash and other data
         return d
 
-    def locate_all_shareholders(self, encoder):
+    def locate_all_shareholders(self, encoder, started):
+        peer_selection_started = now = time.time()
+        self._storage_index_elapsed = now - started
         storage_index = encoder.get_param("storage_index")
         upload_id = idlib.b2a(storage_index)[:6]
         self.log("using storage index %s" % upload_id)
@@ -621,7 +628,7 @@ class CHKUploader:
                                            share_size, block_size,
                                            num_segments, n, desired)
         def _done(res):
-            self._peer_selection_finished = time.time()
+            self._peer_selection_elapsed = time.time() - peer_selection_started
             return res
         d.addCallback(_done)
         return d
@@ -642,17 +649,8 @@ class CHKUploader:
         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
         encoder.set_shareholders(buckets)
 
-    def _compute_uri(self, (uri_extension_hash,
-                            needed_shares, total_shares, size),
-                     key):
-        u = uri.CHKFileURI(key=key,
-                           uri_extension_hash=uri_extension_hash,
-                           needed_shares=needed_shares,
-                           total_shares=total_shares,
-                           size=size,
-                           )
+    def _encrypted_done(self, res):
         r = self._results
-        r.uri = u.to_string()
         r.sharemap = {}
         r.servermap = {}
         for shnum in self._encoder.get_shares_placed():
@@ -663,14 +661,25 @@ class CHKUploader:
             if peerid not in r.servermap:
                 r.servermap[peerid] = set()
             r.servermap[peerid].add(shnum)
-        peer_selection_time = (self._peer_selection_finished
-                               - self._peer_selection_started)
         now = time.time()
+        r.file_size = self._encoder.file_size
         r.timings["total"] = now - self._started
-        r.rates["total"] = 1.0 * self._encoder.file_size / r.timings["total"]
-        r.timings["peer_selection"] = peer_selection_time
+        r.timings["storage_index"] = self._storage_index_elapsed
+        r.timings["peer_selection"] = self._peer_selection_elapsed
         r.timings.update(self._encoder.get_times())
-        r.rates.update(self._encoder.get_rates())
+        return res
+
+    def _compute_uri(self, (uri_extension_hash,
+                            needed_shares, total_shares, size),
+                     key):
+        u = uri.CHKFileURI(key=key,
+                           uri_extension_hash=uri_extension_hash,
+                           needed_shares=needed_shares,
+                           total_shares=total_shares,
+                           size=size,
+                           )
+        r = self._results
+        r.uri = u.to_string()
         return r
 
 
@@ -703,7 +712,10 @@ class LiteralUploader:
     def start(self, uploadable):
         uploadable = IUploadable(uploadable)
         d = uploadable.get_size()
-        d.addCallback(lambda size: read_this_many_bytes(uploadable, size))
+        def _got_size(size):
+            self._results.file_size = size
+            return read_this_many_bytes(uploadable, size)
+        d.addCallback(_got_size)
         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
         d.addCallback(lambda u: u.to_string())
         d.addCallback(self._build_results)
@@ -794,7 +806,6 @@ class AssistedUploader:
         assert isinstance(default_encoding_parameters, dict)
         self._default_encoding_parameters = default_encoding_parameters
         self._log_number = log.msg("AssistedUploader starting")
-        self._results = UploadResults()
 
     def log(self, msg, parent=None, **kwargs):
         if parent is None:
@@ -838,16 +849,16 @@ class AssistedUploader:
         self._storage_index = storage_index
 
     def _contact_helper(self, res):
-        now = self._time_contacting_helper = time.time()
-        self._results.timings["local_hashing"] = now - self._started
+        now = self._time_contacting_helper_start = time.time()
+        self._storage_index_elapsed = now - self._started
         self.log("contacting helper..")
         d = self._helper.callRemote("upload_chk", self._storage_index)
         d.addCallback(self._contacted_helper)
         return d
     def _contacted_helper(self, (upload_results, upload_helper)):
         now = time.time()
-        elapsed = now - self._time_contacting_helper
-        self._results.timings["contacting_helper"] = elapsed
+        elapsed = now - self._time_contacting_helper_start
+        self._elapsed_time_contacting_helper = elapsed
         if upload_helper:
             self.log("helper says we need to upload")
             # we need to upload the file
@@ -883,18 +894,21 @@ class AssistedUploader:
 
     def _build_readcap(self, upload_results):
         self.log("upload finished, building readcap")
-        ur = upload_results
+        r = upload_results
         u = uri.CHKFileURI(key=self._key,
-                           uri_extension_hash=ur['uri_extension_hash'],
+                           uri_extension_hash=r.uri_extension_hash,
                            needed_shares=self._needed_shares,
                            total_shares=self._total_shares,
                            size=self._size,
                            )
-        r = self._results
         r.uri = u.to_string()
         now = time.time()
+        r.file_size = self._size
+        r.timings["storage_index"] = self._storage_index_elapsed
+        r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
+        if "total" in r.timings:
+            r.timings["helper_total"] = r.timings["total"]
         r.timings["total"] = now - self._started
-        r.rates["total"] = 1.0 * self._size / r.timings["total"]
         return r
 
 class NoParameterPreferencesMixin:
index 6f9149d3768f9c0b39c4bb52aa51b72e67a6b061..eb4283a7fbe97d0f93d0c569dc5516c1c4971f2e 100644 (file)
   <li>Servermap: <span n:render="servermap" /></li>
   <li>Timings:</li>
   <ul>
+    <li>File Size: <span n:render="string" n:data="file_size" /> bytes</li>
     <li>Total: <span n:render="time" n:data="time_total" />
      (<span n:render="rate" n:data="rate_total" />)</li>
     <ul>
+      <li>Storage Index: <span n:render="time" n:data="time_storage_index" />
+     (<span n:render="rate" n:data="rate_storage_index" />)</li>
       <li>Peer Selection: <span n:render="time" n:data="time_peer_selection" /></li>
       <li>Encode And Push: <span n:render="time" n:data="time_total_encode_and_push" /></li>
       <ul>
index e5b8126a0042726a35db6a544b1bb50e604ec448..0bdf0d81c4b2d40de0b95caac93d875f1d118bc8 100644 (file)
@@ -1303,6 +1303,11 @@ class UnlinkedPOSTCHKUploader(rend.Page):
         d.addCallback(_render)
         return d
 
+    def data_file_size(self, ctx, data):
+        d = self.upload_results()
+        d.addCallback(lambda res: res.file_size)
+        return d
+
     def render_time(self, ctx, data):
         # 1.23s, 790ms, 132us
         if data is None:
@@ -1327,50 +1332,57 @@ class UnlinkedPOSTCHKUploader(rend.Page):
             return "%.1fkBps" % (r/1000)
         return "%dBps" % r
 
-    def data_time_total(self, ctx, data):
+    def _get_time(self, name):
         d = self.upload_results()
-        d.addCallback(lambda res: res.timings.get("total"))
+        d.addCallback(lambda res: res.timings.get(name))
         return d
 
+    def data_time_total(self, ctx, data):
+        return self._get_time("total")
+
+    def data_time_storage_index(self, ctx, data):
+        return self._get_time("storage_index")
+
     def data_time_peer_selection(self, ctx, data):
-        d = self.upload_results()
-        d.addCallback(lambda res: res.timings.get("peer_selection"))
-        return d
+        return self._get_time("peer_selection")
 
     def data_time_total_encode_and_push(self, ctx, data):
-        d = self.upload_results()
-        d.addCallback(lambda res: res.timings.get("total_encode_and_push"))
-        return d
+        return self._get_time("total_encode_and_push")
 
     def data_time_cumulative_encoding(self, ctx, data):
-        d = self.upload_results()
-        d.addCallback(lambda res: res.timings.get("cumulative_encoding"))
-        return d
+        return self._get_time("cumulative_encoding")
 
     def data_time_cumulative_sending(self, ctx, data):
-        d = self.upload_results()
-        d.addCallback(lambda res: res.timings.get("cumulative_sending"))
-        return d
+        return self._get_time("cumulative_sending")
 
     def data_time_hashes_and_close(self, ctx, data):
+        return self._get_time("hashes_and_close")
+
+    def _get_rate(self, name):
         d = self.upload_results()
-        d.addCallback(lambda res: res.timings.get("hashes_and_close"))
+        def _convert(r):
+            file_size = r.file_size
+            time = r.timings.get(name)
+            if time is None:
+                return None
+            try:
+                return 1.0 * file_size / time
+            except ZeroDivisionError:
+                return None
+        d.addCallback(_convert)
         return d
 
     def data_rate_total(self, ctx, data):
-        d = self.upload_results()
-        d.addCallback(lambda res: res.rates.get("total"))
-        return d
+        return self._get_rate("total")
+
+    def data_rate_storage_index(self, ctx, data):
+        return self._get_rate("storage_index")
 
     def data_rate_encode(self, ctx, data):
-        d = self.upload_results()
-        d.addCallback(lambda res: res.rates.get("encode"))
-        return d
+        return self._get_rate("cumulative_encoding")
 
     def data_rate_push(self, ctx, data):
-        d = self.upload_results()
-        d.addCallback(lambda res: res.rates.get("push"))
-        return d
+        return self._get_rate("cumulative_sending")
 
 
 class UnlinkedPOSTSSKUploader(rend.Page):