]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/offloaded.py
add upload-results timing info for helper uploads. This changes the Helper protocol...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / offloaded.py
index 4da21e2c4bd206f2a213da4583c424c832c92c50..7f2a34f4fe92a700bd5bc20deae45d804477614c 100644 (file)
@@ -1,5 +1,5 @@
 
-import os.path, stat
+import os.path, stat, time
 from zope.interface import implements
 from twisted.application import service
 from twisted.internet import defer
@@ -131,13 +131,14 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
 
     def __init__(self, storage_index, helper,
                  incoming_file, encoding_file,
-                 log_number):
+                 results, log_number):
         self._storage_index = storage_index
         self._helper = helper
         self._incoming_file = incoming_file
         self._encoding_file = encoding_file
         upload_id = idlib.b2a(storage_index)[:6]
         self._log_number = log_number
+        self._results = results
         self._helper.log("CHKUploadHelper starting for SI %s" % upload_id,
                          parent=log_number)
 
@@ -159,6 +160,7 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
         return upload.CHKUploader.log(self, *args, **kwargs)
 
     def start(self):
+        self._started = time.time()
         # determine if we need to upload the file. If so, return ({},self) .
         # If not, return (UploadResults,None) .
         self.log("deciding whether to upload the file or not", level=log.NOISY)
@@ -166,15 +168,15 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
             # we have the whole file, and we might be encoding it (or the
             # encode/upload might have failed, and we need to restart it).
             self.log("ciphertext already in place", level=log.UNUSUAL)
-            return ({}, self)
+            return (self._results, self)
         if os.path.exists(self._incoming_file):
             # we have some of the file, but not all of it (otherwise we'd be
             # encoding). The caller might be useful.
             self.log("partial ciphertext already present", level=log.UNUSUAL)
-            return ({}, self)
+            return (self._results, self)
         # we don't remember uploading this file
         self.log("no ciphertext yet", level=log.NOISY)
-        return ({}, self)
+        return (self._results, self)
 
     def remote_upload(self, reader):
         # reader is an RIEncryptedUploadable. I am specified to return an
@@ -190,10 +192,14 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
 
     def _finished(self, res):
         (uri_extension_hash, needed_shares, total_shares, size) = res
-        upload_results = {'uri_extension_hash': uri_extension_hash}
+        r = self._results
+        r.uri_extension_hash = uri_extension_hash
+        f_times = self._fetcher.get_times()
+        r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
+        r.timings["total_fetch"] = f_times["total"]
         self._reader.close()
         os.unlink(self._encoding_file)
-        self._finished_observers.fire(upload_results)
+        self._finished_observers.fire(r)
         self._helper.upload_finished(self._storage_index)
         del self._reader
 
@@ -248,6 +254,10 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
         self._readers = []
         self._started = False
         self._f = None
+        self._times = {
+            "cumulative_fetch": 0.0,
+            "total": 0.0,
+            }
 
     def log(self, *args, **kwargs):
         if "facility" not in kwargs:
@@ -264,6 +274,7 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
         if self._started:
             return
         self._started = True
+        started = time.time()
 
         if os.path.exists(self._encoding_file):
             self.log("ciphertext already present, bypassing fetch",
@@ -276,7 +287,7 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
             # else.
             have = os.stat(self._encoding_file)[stat.ST_SIZE]
             d = self.call("read_encrypted", have-1, 1)
-            d.addCallback(lambda ignored: self._done2())
+            d.addCallback(self._done2, started)
             return
 
         # first, find out how large the file is going to be
@@ -284,6 +295,7 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
         d.addCallback(self._got_size)
         d.addCallback(self._start_reading)
         d.addCallback(self._done)
+        d.addCallback(self._done2, started)
         d.addErrback(self._failed)
 
     def _got_size(self, size):
@@ -327,8 +339,11 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
         # transfer that involves more than a few hundred chunks.
         # 'fire_when_done' lives a long time, but the Deferreds returned by
         # the inner _fetch() call do not.
+        start = time.time()
         d = defer.maybeDeferred(self._fetch)
         def _done(finished):
+            elapsed = time.time() - start
+            self._times["cumulative_fetch"] += elapsed
             if finished:
                 self.log("finished reading ciphertext", level=log.NOISY)
                 fire_when_done.callback(None)
@@ -366,10 +381,11 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
                  size=os.stat(self._incoming_file)[stat.ST_SIZE],
                  level=log.NOISY)
         os.rename(self._incoming_file, self._encoding_file)
-        return self._done2()
 
-    def _done2(self):
+    def _done2(self, _ignored, started):
         self.log("done2", level=log.NOISY)
+        elapsed = time.time() - started
+        self._times["total"] = elapsed
         self._readers = []
         self._done_observers.fire(None)
 
@@ -382,6 +398,8 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
     def when_done(self):
         return self._done_observers.when_fired()
 
+    def get_times(self):
+        return self._times
 
 
 class LocalCiphertextReader(AskUntilSuccessMixin):
@@ -449,6 +467,8 @@ class Helper(Referenceable, service.MultiService):
         return self.parent.log(*args, **kwargs)
 
     def remote_upload_chk(self, storage_index):
+        r = upload.UploadResults()
+        started = time.time()
         si_s = idlib.b2a(storage_index)
         lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
         incoming_file = os.path.join(self._chk_incoming, si_s)
@@ -458,11 +478,14 @@ class Helper(Referenceable, service.MultiService):
             uh = self._active_uploads[storage_index]
             return uh.start()
 
-        d = self._check_for_chk_already_in_grid(storage_index, lp)
-        def _checked(upload_results):
-            if upload_results:
+        d = self._check_for_chk_already_in_grid(storage_index, r, lp)
+        def _checked(already_present):
+            elapsed = time.time() - started
+            r.timings['existence_check'] = elapsed
+            if already_present:
+                # the necessary results are placed in the UploadResults
                 self.log("file already found in grid", parent=lp)
-                return (upload_results, None)
+                return (r, None)
 
             # the file is not present in the grid, by which we mean there are
             # less than 'N' shares available.
@@ -477,7 +500,7 @@ class Helper(Referenceable, service.MultiService):
                 self.log("creating new upload helper", parent=lp)
                 uh = self.chk_upload_helper_class(storage_index, self,
                                                   incoming_file, encoding_file,
-                                                  lp)
+                                                  r, lp)
                 self._active_uploads[storage_index] = uh
             return uh.start()
         d.addCallback(_checked)
@@ -488,7 +511,7 @@ class Helper(Referenceable, service.MultiService):
         d.addErrback(_err)
         return d
 
-    def _check_for_chk_already_in_grid(self, storage_index, lp):
+    def _check_for_chk_already_in_grid(self, storage_index, results, lp):
         # see if this file is already in the grid
         lp2 = self.log("doing a quick check+UEBfetch",
                        parent=lp, level=log.NOISY)
@@ -499,8 +522,8 @@ class Helper(Referenceable, service.MultiService):
             if res:
                 (sharemap, ueb_data, ueb_hash) = res
                 self.log("found file in grid", level=log.NOISY, parent=lp)
-                upload_results = {'uri_extension_hash': ueb_hash}
-                return upload_results
+                results.uri_extension_hash = ueb_hash
+                return True
             return False
         d.addCallback(_checked)
         return d