From b71234c538701246d0466f41d63731898e873f5c Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@lothar.com>
Date: Mon, 21 May 2012 21:14:00 -0700
Subject: [PATCH] add HelperUploadResults

This splits the pb.Copyable on-wire object (HelperUploadResults) out
from the local results object (UploadResults). To maintain compatibility
with older Helpers, we have to leave pb.Copyable classes alone and
unmodified, but we want to change UploadResults to use IServers instead
of serverids. So by using a different class on the wire, and translating
to/from it on either end, we can accomplish both.
---
 src/allmydata/immutable/offloaded.py | 63 +++++++++++++----------
 src/allmydata/immutable/upload.py    | 74 +++++++++++++++++++---------
 src/allmydata/test/test_helper.py    | 25 ++++++----
 3 files changed, 103 insertions(+), 59 deletions(-)

diff --git a/src/allmydata/immutable/offloaded.py b/src/allmydata/immutable/offloaded.py
index 922750a9..8faf516a 100644
--- a/src/allmydata/immutable/offloaded.py
+++ b/src/allmydata/immutable/offloaded.py
@@ -137,14 +137,13 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
     def __init__(self, storage_index,
                  helper, storage_broker, secret_holder,
                  incoming_file, encoding_file,
-                 results, log_number):
+                 log_number):
         self._storage_index = storage_index
         self._helper = helper
         self._incoming_file = incoming_file
         self._encoding_file = encoding_file
         self._upload_id = si_b2a(storage_index)[:5]
         self._log_number = log_number
-        self._results = results
         self._upload_status = upload.UploadStatus()
         self._upload_status.set_helper(False)
         self._upload_status.set_storage_index(storage_index)
@@ -201,19 +200,31 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
         # and inform the client when the upload has finished
         return self._finished_observers.when_fired()
 
-    def _finished(self, uploadresults):
-        precondition(isinstance(uploadresults.verifycapstr, str), uploadresults.verifycapstr)
-        assert interfaces.IUploadResults.providedBy(uploadresults), uploadresults
-        r = uploadresults
-        v = uri.from_string(r.verifycapstr)
-        r.uri_extension_hash = v.uri_extension_hash
+    def _finished(self, ur):
+        precondition(isinstance(ur.verifycapstr, str), ur.verifycapstr)
+        assert interfaces.IUploadResults.providedBy(ur), ur
+        v = uri.from_string(ur.verifycapstr)
         f_times = self._fetcher.get_times()
-        r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
-        r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
-        r.timings["total_fetch"] = f_times["total"]
+
+        hur = upload.HelperUploadResults()
+        hur.timings = {"cumulative_fetch": f_times["cumulative_fetch"],
+                       "total_fetch": f_times["total"],
+                       }
+        for k in ur.timings:
+            hur.timings[k] = ur.timings[k]
+        hur.uri_extension_hash = v.uri_extension_hash
+        hur.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
+        hur.preexisting_shares = ur.preexisting_shares
+        hur.sharemap = ur.sharemap
+        hur.servermap = ur.servermap
+        hur.pushed_shares = ur.pushed_shares
+        hur.file_size = ur.file_size
+        hur.uri_extension_data = ur.uri_extension_data
+        hur.verifycapstr = ur.verifycapstr
+
         self._reader.close()
         os.unlink(self._encoding_file)
-        self._finished_observers.fire(r)
+        self._finished_observers.fire(hur)
         self._helper.upload_finished(self._storage_index, v.size)
         del self._reader
 
@@ -561,7 +572,6 @@ class Helper(Referenceable):
 
     def remote_upload_chk(self, storage_index):
         self.count("chk_upload_helper.upload_requests")
-        r = upload.UploadResults()
         lp = self.log(format="helper: upload_chk query for SI %(si)s",
                       si=si_b2a(storage_index))
         if storage_index in self._active_uploads:
@@ -569,8 +579,8 @@ class Helper(Referenceable):
             uh = self._active_uploads[storage_index]
             return (None, uh)
 
-        d = self._check_chk(storage_index, r, lp)
-        d.addCallback(self._did_chk_check, storage_index, r, lp)
+        d = self._check_chk(storage_index, lp)
+        d.addCallback(self._did_chk_check, storage_index, lp)
         def _err(f):
             self.log("error while checking for chk-already-in-grid",
                      failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
@@ -578,7 +588,7 @@ class Helper(Referenceable):
         d.addErrback(_err)
         return d
 
-    def _check_chk(self, storage_index, results, lp):
+    def _check_chk(self, storage_index, lp):
         # see if this file is already in the grid
         lp2 = self.log("doing a quick check+UEBfetch",
                        parent=lp, level=log.NOISY)
@@ -589,17 +599,18 @@ class Helper(Referenceable):
             if res:
                 (sharemap, ueb_data, ueb_hash) = res
                 self.log("found file in grid", level=log.NOISY, parent=lp)
-                results.uri_extension_hash = ueb_hash
-                results.sharemap = sharemap
-                results.uri_extension_data = ueb_data
-                results.preexisting_shares = len(sharemap)
-                results.pushed_shares = 0
-                return results
+                hur = upload.HelperUploadResults()
+                hur.uri_extension_hash = ueb_hash
+                hur.sharemap = sharemap
+                hur.uri_extension_data = ueb_data
+                hur.preexisting_shares = len(sharemap)
+                hur.pushed_shares = 0
+                return hur
             return None
         d.addCallback(_checked)
         return d
 
-    def _did_chk_check(self, already_present, storage_index, r, lp):
+    def _did_chk_check(self, already_present, storage_index, lp):
         if already_present:
             # the necessary results are placed in the UploadResults
             self.count("chk_upload_helper.upload_already_present")
@@ -618,12 +629,12 @@ class Helper(Referenceable):
             uh = self._active_uploads[storage_index]
         else:
             self.log("creating new upload helper", parent=lp)
-            uh = self._make_chk_upload_helper(storage_index, r, lp)
+            uh = self._make_chk_upload_helper(storage_index, lp)
             self._active_uploads[storage_index] = uh
             self._add_upload(uh)
         return (None, uh)
 
-    def _make_chk_upload_helper(self, storage_index, r, lp):
+    def _make_chk_upload_helper(self, storage_index, lp):
         si_s = si_b2a(storage_index)
         incoming_file = os.path.join(self._chk_incoming, si_s)
         encoding_file = os.path.join(self._chk_encoding, si_s)
@@ -631,7 +642,7 @@ class Helper(Referenceable):
                              self._storage_broker,
                              self._secret_holder,
                              incoming_file, encoding_file,
-                             r, lp)
+                             lp)
         return uh
 
     def _add_upload(self, uh):
diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py
index 9d33cb42..958962ab 100644
--- a/src/allmydata/immutable/upload.py
+++ b/src/allmydata/immutable/upload.py
@@ -32,8 +32,10 @@ from cStringIO import StringIO
 class TooFullError(Exception):
     pass
 
-class UploadResults(Copyable, RemoteCopy):
-    implements(IUploadResults)
+# HelperUploadResults are what we get from the Helper, and to retain
+# backwards compatibility with old Helpers we can't change the format. We
+# convert them into a local UploadResults upon receipt.
+class HelperUploadResults(Copyable, RemoteCopy):
     # note: don't change this string, it needs to match the value used on the
     # helper, and it does *not* need to match the fully-qualified
     # package/module/class name
@@ -55,6 +57,19 @@ class UploadResults(Copyable, RemoteCopy):
         self.preexisting_shares = None # count of shares already present
         self.pushed_shares = None # count of shares we pushed
 
+class UploadResults:
+    implements(IUploadResults)
+
+    def __init__(self):
+        self.timings = {} # dict of name to number of seconds
+        self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
+        self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
+        self.file_size = None
+        self.ciphertext_fetched = None # how much the helper fetched
+        self.uri = None
+        self.preexisting_shares = None # count of shares already present
+        self.pushed_shares = None # count of shares we pushed
+
 
 # our current uri_extension is 846 bytes for small files, a few bytes
 # more for larger ones (since the filesize is encoded in decimal in a
@@ -1179,7 +1194,7 @@ class AssistedUploader:
         d.addCallback(self._contacted_helper)
         return d
 
-    def _contacted_helper(self, (upload_results, upload_helper)):
+    def _contacted_helper(self, (helper_upload_results, upload_helper)):
         now = time.time()
         elapsed = now - self._time_contacting_helper_start
         self._elapsed_time_contacting_helper = elapsed
@@ -1197,7 +1212,7 @@ class AssistedUploader:
             return d
         self.log("helper says file is already uploaded", level=log.OPERATIONAL)
         self._upload_status.set_progress(1, 1.0)
-        return upload_results
+        return helper_upload_results
 
     def _convert_old_upload_results(self, upload_results):
         # pre-1.3.0 helpers return upload results which contain a mapping
@@ -1216,30 +1231,41 @@ class AssistedUploader:
         if str in [type(v) for v in sharemap.values()]:
             upload_results.sharemap = None
 
-    def _build_verifycap(self, upload_results):
+    def _build_verifycap(self, helper_upload_results):
         self.log("upload finished, building readcap", level=log.OPERATIONAL)
-        self._convert_old_upload_results(upload_results)
+        self._convert_old_upload_results(helper_upload_results)
         self._upload_status.set_status("Building Readcap")
-        r = upload_results
-        assert r.uri_extension_data["needed_shares"] == self._needed_shares
-        assert r.uri_extension_data["total_shares"] == self._total_shares
-        assert r.uri_extension_data["segment_size"] == self._segment_size
-        assert r.uri_extension_data["size"] == self._size
-        r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
-                                             uri_extension_hash=r.uri_extension_hash,
-                                             needed_shares=self._needed_shares,
-                                             total_shares=self._total_shares, size=self._size
-                                             ).to_string()
+        hur = helper_upload_results
+        assert hur.uri_extension_data["needed_shares"] == self._needed_shares
+        assert hur.uri_extension_data["total_shares"] == self._total_shares
+        assert hur.uri_extension_data["segment_size"] == self._segment_size
+        assert hur.uri_extension_data["size"] == self._size
+        ur = UploadResults()
+        # hur.verifycap doesn't exist if already found
+        v = uri.CHKFileVerifierURI(self._storage_index,
+                                   uri_extension_hash=hur.uri_extension_hash,
+                                   needed_shares=self._needed_shares,
+                                   total_shares=self._total_shares,
+                                   size=self._size)
+        ur.verifycapstr = v.to_string()
+        ur.timings = hur.timings
+        ur.uri_extension_data = hur.uri_extension_data
+        ur.uri_extension_hash = hur.uri_extension_hash
+        ur.preexisting_shares = hur.preexisting_shares
+        ur.pushed_shares = hur.pushed_shares
+        ur.sharemap = hur.sharemap
+        ur.servermap = hur.servermap # not if already found
+        ur.ciphertext_fetched = hur.ciphertext_fetched # not if already found
         now = time.time()
-        r.file_size = self._size
-        r.timings["storage_index"] = self._storage_index_elapsed
-        r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
-        if "total" in r.timings:
-            r.timings["helper_total"] = r.timings["total"]
-        r.timings["total"] = now - self._started
+        ur.file_size = self._size
+        ur.timings["storage_index"] = self._storage_index_elapsed
+        ur.timings["contacting_helper"] = self._elapsed_time_contacting_helper
+        if "total" in ur.timings:
+            ur.timings["helper_total"] = ur.timings["total"]
+        ur.timings["total"] = now - self._started
         self._upload_status.set_status("Finished")
-        self._upload_status.set_results(r)
-        return r
+        self._upload_status.set_results(ur)
+        return ur
 
     def get_upload_status(self):
         return self._upload_status
diff --git a/src/allmydata/test/test_helper.py b/src/allmydata/test/test_helper.py
index c0851f8c..6d7093b5 100644
--- a/src/allmydata/test/test_helper.py
+++ b/src/allmydata/test/test_helper.py
@@ -22,24 +22,31 @@ class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
         def _got_size(size):
             d2 = eu.get_all_encoding_parameters()
             def _got_parms(parms):
+                # just pretend we did the upload
                 needed_shares, happy, total_shares, segsize = parms
                 ueb_data = {"needed_shares": needed_shares,
                             "total_shares": total_shares,
                             "segment_size": segsize,
                             "size": size,
                             }
-                self._results.uri_extension_data = ueb_data
-                self._results.verifycapstr = uri.CHKFileVerifierURI(self._storage_index, "x"*32,
-                                                                 needed_shares, total_shares,
-                                                                 size).to_string()
-                return self._results
+
+                r = upload.UploadResults()
+                r.preexisting_shares = 0
+                r.pushed_shares = total_shares
+                r.file_size = size
+                r.uri_extension_data = ueb_data
+                v = uri.CHKFileVerifierURI(self._storage_index, "x"*32,
+                                           needed_shares, total_shares,
+                                           size)
+                r.verifycapstr = v.to_string()
+                return r
             d2.addCallback(_got_parms)
             return d2
         d.addCallback(_got_size)
         return d
 
 class Helper_fake_upload(offloaded.Helper):
-    def _make_chk_upload_helper(self, storage_index, r, lp):
+    def _make_chk_upload_helper(self, storage_index, lp):
         si_s = si_b2a(storage_index)
         incoming_file = os.path.join(self._chk_incoming, si_s)
         encoding_file = os.path.join(self._chk_encoding, si_s)
@@ -47,12 +54,12 @@ class Helper_fake_upload(offloaded.Helper):
                                   self._storage_broker,
                                   self._secret_holder,
                                   incoming_file, encoding_file,
-                                  r, lp)
+                                  lp)
         return uh
 
 class Helper_already_uploaded(Helper_fake_upload):
-    def _check_chk(self, storage_index, results, lp):
-        res = upload.UploadResults()
+    def _check_chk(self, storage_index, lp):
+        res = upload.HelperUploadResults()
         res.uri_extension_hash = hashutil.uri_extension_hash("")
 
         # we're pretending that the file they're trying to upload was already
-- 
2.45.2