]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
add HelperUploadResults
authorBrian Warner <warner@lothar.com>
Tue, 22 May 2012 04:14:00 +0000 (21:14 -0700)
committerBrian Warner <warner@lothar.com>
Tue, 22 May 2012 04:14:00 +0000 (21:14 -0700)
This splits the pb.Copyable on-wire object (HelperUploadResults) out
from the local results object (UploadResults). To maintain compatibility
with older Helpers, we have to leave pb.Copyable classes alone and
unmodified, but we want to change UploadResults to use IServers instead
of serverids. So by using a different class on the wire, and translating
to/from it on either end, we can accomplish both.

src/allmydata/immutable/offloaded.py
src/allmydata/immutable/upload.py
src/allmydata/test/test_helper.py

index 922750a948f7a499c93488faa18bc13cbfa79519..8faf516abd6513dbcf8c3e24314c795f1215ab61 100644 (file)
@@ -137,14 +137,13 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
     def __init__(self, storage_index,
                  helper, storage_broker, secret_holder,
                  incoming_file, encoding_file,
-                 results, log_number):
+                 log_number):
         self._storage_index = storage_index
         self._helper = helper
         self._incoming_file = incoming_file
         self._encoding_file = encoding_file
         self._upload_id = si_b2a(storage_index)[:5]
         self._log_number = log_number
-        self._results = results
         self._upload_status = upload.UploadStatus()
         self._upload_status.set_helper(False)
         self._upload_status.set_storage_index(storage_index)
@@ -201,19 +200,31 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
         # and inform the client when the upload has finished
         return self._finished_observers.when_fired()
 
-    def _finished(self, uploadresults):
-        precondition(isinstance(uploadresults.verifycapstr, str), uploadresults.verifycapstr)
-        assert interfaces.IUploadResults.providedBy(uploadresults), uploadresults
-        r = uploadresults
-        v = uri.from_string(r.verifycapstr)
-        r.uri_extension_hash = v.uri_extension_hash
+    def _finished(self, ur):
+        precondition(isinstance(ur.verifycapstr, str), ur.verifycapstr)
+        assert interfaces.IUploadResults.providedBy(ur), ur
+        v = uri.from_string(ur.verifycapstr)
         f_times = self._fetcher.get_times()
-        r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
-        r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
-        r.timings["total_fetch"] = f_times["total"]
+
+        hur = upload.HelperUploadResults()
+        hur.timings = {"cumulative_fetch": f_times["cumulative_fetch"],
+                       "total_fetch": f_times["total"],
+                       }
+        for k in ur.timings:
+            hur.timings[k] = ur.timings[k]
+        hur.uri_extension_hash = v.uri_extension_hash
+        hur.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
+        hur.preexisting_shares = ur.preexisting_shares
+        hur.sharemap = ur.sharemap
+        hur.servermap = ur.servermap
+        hur.pushed_shares = ur.pushed_shares
+        hur.file_size = ur.file_size
+        hur.uri_extension_data = ur.uri_extension_data
+        hur.verifycapstr = ur.verifycapstr
+
         self._reader.close()
         os.unlink(self._encoding_file)
-        self._finished_observers.fire(r)
+        self._finished_observers.fire(hur)
         self._helper.upload_finished(self._storage_index, v.size)
         del self._reader
 
@@ -561,7 +572,6 @@ class Helper(Referenceable):
 
     def remote_upload_chk(self, storage_index):
         self.count("chk_upload_helper.upload_requests")
-        r = upload.UploadResults()
         lp = self.log(format="helper: upload_chk query for SI %(si)s",
                       si=si_b2a(storage_index))
         if storage_index in self._active_uploads:
@@ -569,8 +579,8 @@ class Helper(Referenceable):
             uh = self._active_uploads[storage_index]
             return (None, uh)
 
-        d = self._check_chk(storage_index, r, lp)
-        d.addCallback(self._did_chk_check, storage_index, r, lp)
+        d = self._check_chk(storage_index, lp)
+        d.addCallback(self._did_chk_check, storage_index, lp)
         def _err(f):
             self.log("error while checking for chk-already-in-grid",
                      failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
@@ -578,7 +588,7 @@ class Helper(Referenceable):
         d.addErrback(_err)
         return d
 
-    def _check_chk(self, storage_index, results, lp):
+    def _check_chk(self, storage_index, lp):
         # see if this file is already in the grid
         lp2 = self.log("doing a quick check+UEBfetch",
                        parent=lp, level=log.NOISY)
@@ -589,17 +599,18 @@ class Helper(Referenceable):
             if res:
                 (sharemap, ueb_data, ueb_hash) = res
                 self.log("found file in grid", level=log.NOISY, parent=lp)
-                results.uri_extension_hash = ueb_hash
-                results.sharemap = sharemap
-                results.uri_extension_data = ueb_data
-                results.preexisting_shares = len(sharemap)
-                results.pushed_shares = 0
-                return results
+                hur = upload.HelperUploadResults()
+                hur.uri_extension_hash = ueb_hash
+                hur.sharemap = sharemap
+                hur.uri_extension_data = ueb_data
+                hur.preexisting_shares = len(sharemap)
+                hur.pushed_shares = 0
+                return hur
             return None
         d.addCallback(_checked)
         return d
 
-    def _did_chk_check(self, already_present, storage_index, r, lp):
+    def _did_chk_check(self, already_present, storage_index, lp):
         if already_present:
             # the necessary results are placed in the UploadResults
             self.count("chk_upload_helper.upload_already_present")
@@ -618,12 +629,12 @@ class Helper(Referenceable):
             uh = self._active_uploads[storage_index]
         else:
             self.log("creating new upload helper", parent=lp)
-            uh = self._make_chk_upload_helper(storage_index, r, lp)
+            uh = self._make_chk_upload_helper(storage_index, lp)
             self._active_uploads[storage_index] = uh
             self._add_upload(uh)
         return (None, uh)
 
-    def _make_chk_upload_helper(self, storage_index, r, lp):
+    def _make_chk_upload_helper(self, storage_index, lp):
         si_s = si_b2a(storage_index)
         incoming_file = os.path.join(self._chk_incoming, si_s)
         encoding_file = os.path.join(self._chk_encoding, si_s)
@@ -631,7 +642,7 @@ class Helper(Referenceable):
                              self._storage_broker,
                              self._secret_holder,
                              incoming_file, encoding_file,
-                             r, lp)
+                             lp)
         return uh
 
     def _add_upload(self, uh):
index 9d33cb4299bbaf875469c0f115966704959d953c..958962ab75031b997266d5a66d12852b53209a8a 100644 (file)
@@ -32,8 +32,10 @@ from cStringIO import StringIO
 class TooFullError(Exception):
     pass
 
-class UploadResults(Copyable, RemoteCopy):
-    implements(IUploadResults)
+# HelperUploadResults are what we get from the Helper, and to retain
+# backwards compatibility with old Helpers we can't change the format. We
+# convert them into a local UploadResults upon receipt.
+class HelperUploadResults(Copyable, RemoteCopy):
     # note: don't change this string, it needs to match the value used on the
     # helper, and it does *not* need to match the fully-qualified
     # package/module/class name
@@ -55,6 +57,19 @@ class UploadResults(Copyable, RemoteCopy):
         self.preexisting_shares = None # count of shares already present
         self.pushed_shares = None # count of shares we pushed
 
+class UploadResults:
+    implements(IUploadResults)
+
+    def __init__(self):
+        self.timings = {} # dict of name to number of seconds
+        self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
+        self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
+        self.file_size = None
+        self.ciphertext_fetched = None # how much the helper fetched
+        self.uri = None
+        self.preexisting_shares = None # count of shares already present
+        self.pushed_shares = None # count of shares we pushed
+
 
 # our current uri_extension is 846 bytes for small files, a few bytes
 # more for larger ones (since the filesize is encoded in decimal in a
@@ -1179,7 +1194,7 @@ class AssistedUploader:
         d.addCallback(self._contacted_helper)
         return d
 
-    def _contacted_helper(self, (upload_results, upload_helper)):
+    def _contacted_helper(self, (helper_upload_results, upload_helper)):
         now = time.time()
         elapsed = now - self._time_contacting_helper_start
         self._elapsed_time_contacting_helper = elapsed
@@ -1197,7 +1212,7 @@ class AssistedUploader:
             return d
         self.log("helper says file is already uploaded", level=log.OPERATIONAL)
         self._upload_status.set_progress(1, 1.0)
-        return upload_results
+        return helper_upload_results
 
     def _convert_old_upload_results(self, upload_results):
         # pre-1.3.0 helpers return upload results which contain a mapping
@@ -1216,30 +1231,41 @@ class AssistedUploader:
         if str in [type(v) for v in sharemap.values()]:
             upload_results.sharemap = None
 
-    def _build_verifycap(self, upload_results):
+    def _build_verifycap(self, helper_upload_results):
         self.log("upload finished, building readcap", level=log.OPERATIONAL)
-        self._convert_old_upload_results(upload_results)
+        self._convert_old_upload_results(helper_upload_results)
         self._upload_status.set_status("Building Readcap")
-        r = upload_results
-        assert r.uri_extension_data["needed_shares"] == self._needed_shares
-        assert r.uri_extension_data["total_shares"] == self._total_shares
-        assert r.uri_extension_data["segment_size"] == self._segment_size
-        assert r.uri_extension_data["size"] == self._size
-        r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
-                                             uri_extension_hash=r.uri_extension_hash,
-                                             needed_shares=self._needed_shares,
-                                             total_shares=self._total_shares, size=self._size
-                                             ).to_string()
+        hur = helper_upload_results
+        assert hur.uri_extension_data["needed_shares"] == self._needed_shares
+        assert hur.uri_extension_data["total_shares"] == self._total_shares
+        assert hur.uri_extension_data["segment_size"] == self._segment_size
+        assert hur.uri_extension_data["size"] == self._size
+        ur = UploadResults()
+        # hur.verifycap doesn't exist if already found
+        v = uri.CHKFileVerifierURI(self._storage_index,
+                                   uri_extension_hash=hur.uri_extension_hash,
+                                   needed_shares=self._needed_shares,
+                                   total_shares=self._total_shares,
+                                   size=self._size)
+        ur.verifycapstr = v.to_string()
+        ur.timings = hur.timings
+        ur.uri_extension_data = hur.uri_extension_data
+        ur.uri_extension_hash = hur.uri_extension_hash
+        ur.preexisting_shares = hur.preexisting_shares
+        ur.pushed_shares = hur.pushed_shares
+        ur.sharemap = hur.sharemap
+        ur.servermap = hur.servermap # not if already found
+        ur.ciphertext_fetched = hur.ciphertext_fetched # not if already found
         now = time.time()
-        r.file_size = self._size
-        r.timings["storage_index"] = self._storage_index_elapsed
-        r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
-        if "total" in r.timings:
-            r.timings["helper_total"] = r.timings["total"]
-        r.timings["total"] = now - self._started
+        ur.file_size = self._size
+        ur.timings["storage_index"] = self._storage_index_elapsed
+        ur.timings["contacting_helper"] = self._elapsed_time_contacting_helper
+        if "total" in ur.timings:
+            ur.timings["helper_total"] = ur.timings["total"]
+        ur.timings["total"] = now - self._started
         self._upload_status.set_status("Finished")
-        self._upload_status.set_results(r)
-        return r
+        self._upload_status.set_results(ur)
+        return ur
 
     def get_upload_status(self):
         return self._upload_status
index c0851f8cae3a6a5b75439d4d603befd927bdb007..6d7093b57a808e2c6547d3bca602e1946ce7ffed 100644 (file)
@@ -22,24 +22,31 @@ class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
         def _got_size(size):
             d2 = eu.get_all_encoding_parameters()
             def _got_parms(parms):
+                # just pretend we did the upload
                 needed_shares, happy, total_shares, segsize = parms
                 ueb_data = {"needed_shares": needed_shares,
                             "total_shares": total_shares,
                             "segment_size": segsize,
                             "size": size,
                             }
-                self._results.uri_extension_data = ueb_data
-                self._results.verifycapstr = uri.CHKFileVerifierURI(self._storage_index, "x"*32,
-                                                                 needed_shares, total_shares,
-                                                                 size).to_string()
-                return self._results
+
+                r = upload.UploadResults()
+                r.preexisting_shares = 0
+                r.pushed_shares = total_shares
+                r.file_size = size
+                r.uri_extension_data = ueb_data
+                v = uri.CHKFileVerifierURI(self._storage_index, "x"*32,
+                                           needed_shares, total_shares,
+                                           size)
+                r.verifycapstr = v.to_string()
+                return r
             d2.addCallback(_got_parms)
             return d2
         d.addCallback(_got_size)
         return d
 
 class Helper_fake_upload(offloaded.Helper):
-    def _make_chk_upload_helper(self, storage_index, r, lp):
+    def _make_chk_upload_helper(self, storage_index, lp):
         si_s = si_b2a(storage_index)
         incoming_file = os.path.join(self._chk_incoming, si_s)
         encoding_file = os.path.join(self._chk_encoding, si_s)
@@ -47,12 +54,12 @@ class Helper_fake_upload(offloaded.Helper):
                                   self._storage_broker,
                                   self._secret_holder,
                                   incoming_file, encoding_file,
-                                  r, lp)
+                                  lp)
         return uh
 
 class Helper_already_uploaded(Helper_fake_upload):
-    def _check_chk(self, storage_index, results, lp):
-        res = upload.UploadResults()
+    def _check_chk(self, storage_index, lp):
+        res = upload.HelperUploadResults()
         res.uri_extension_hash = hashutil.uri_extension_hash("")
 
         # we're pretending that the file they're trying to upload was already