UploadResults: store IServers internally, but still return serverids
authorBrian Warner <warner@lothar.com>
Tue, 22 May 2012 04:18:25 +0000 (21:18 -0700)
committerBrian Warner <warner@lothar.com>
Tue, 22 May 2012 04:18:25 +0000 (21:18 -0700)
This stores IDisplayableServer-providing instances (StubServers or
NativeStorageServers) in the .servermap and .sharemap dictionaries. But
get_servermap()/get_sharemap() still return data structures with
serverids, not IServers, by translating their data on the way out. This
lets us put off changing the callers for a little bit longer.

src/allmydata/immutable/upload.py
src/allmydata/test/test_helper.py

index 81f18a8fe256795a32daad4c7eab19957e531fb5..c609d37f4fa626bbca7fc8996ade1f45595aec9b 100644 (file)
@@ -64,8 +64,8 @@ class UploadResults:
                  ciphertext_fetched, # how much the helper fetched
                  preexisting_shares, # count of shares already present
                  pushed_shares, # count of shares we pushed
-                 sharemap, # {shnum: set(serverid)}
-                 servermap, # {serverid: set(shnum)}
+                 sharemap, # {shnum: set(server)}
+                 servermap, # {server: set(shnum)}
                  timings, # dict of name to number of seconds
                  uri_extension_data,
                  uri_extension_hash,
@@ -95,9 +95,17 @@ class UploadResults:
     def get_pushed_shares(self):
         return self._pushed_shares
     def get_sharemap(self):
-        return self._sharemap
+        # returns {shnum: set(serverid)}
+        sharemap = {}
+        for shnum, servers in self._sharemap.items():
+            sharemap[shnum] = set([s.get_serverid() for s in servers])
+        return sharemap
     def get_servermap(self):
-        return self._servermap
+        # returns {serverid: set(shnum)}
+        servermap = {}
+        for server, shnums in self._servermap.items():
+            servermap[server.get_serverid()] = set(shnums)
+        return servermap
     def get_timings(self):
         return self._timings
     def get_uri_extension_data(self):
@@ -144,6 +152,8 @@ class ServerTracker:
         return ("<ServerTracker for server %s and SI %s>"
                 % (self._server.get_name(), si_b2a(self.storage_index)[:5]))
 
+    def get_server(self):
+        return self._server
     def get_serverid(self):
         return self._server.get_serverid()
     def get_name(self):
@@ -1025,10 +1035,9 @@ class CHKUploader:
         sharemap = dictutil.DictOfSets()
         servermap = dictutil.DictOfSets()
         for shnum in e.get_shares_placed():
-            server_tracker = self._server_trackers[shnum]
-            serverid = server_tracker.get_serverid()
-            sharemap.add(shnum, serverid)
-            servermap.add(serverid, shnum)
+            server = self._server_trackers[shnum].get_server()
+            sharemap.add(shnum, server)
+            servermap.add(server, shnum)
         now = time.time()
         timings = {}
         timings["total"] = now - self._started
@@ -1187,8 +1196,9 @@ class RemoteEncryptedUploadable(Referenceable):
 
 class AssistedUploader:
 
-    def __init__(self, helper):
+    def __init__(self, helper, storage_broker):
         self._helper = helper
+        self._storage_broker = storage_broker
         self._log_number = log.msg("AssistedUploader starting")
         self._storage_index = None
         self._upload_status = s = UploadStatus()
@@ -1307,13 +1317,22 @@ class AssistedUploader:
         now = time.time()
         timings["total"] = now - self._started
 
+        gss = self._storage_broker.get_stub_server
+        sharemap = {}
+        servermap = {}
+        for shnum, serverids in hur.sharemap.items():
+            sharemap[shnum] = set([gss(serverid) for serverid in serverids])
+        # if the file was already in the grid, hur.servermap is an empty dict
+        for serverid, shnums in hur.servermap.items():
+            servermap[gss(serverid)] = set(shnums)
+
         ur = UploadResults(file_size=self._size,
                            # not if already found
                            ciphertext_fetched=hur.ciphertext_fetched,
                            preexisting_shares=hur.preexisting_shares,
                            pushed_shares=hur.pushed_shares,
-                           sharemap=hur.sharemap,
-                           servermap=hur.servermap, # not if already found
+                           sharemap=sharemap,
+                           servermap=servermap,
                            timings=timings,
                            uri_extension_data=hur.uri_extension_data,
                            uri_extension_hash=hur.uri_extension_hash,
@@ -1554,8 +1573,9 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
             else:
                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
                 d2 = defer.succeed(None)
+                storage_broker = self.parent.get_storage_broker()
                 if self._helper:
-                    uploader = AssistedUploader(self._helper)
+                    uploader = AssistedUploader(self._helper, storage_broker)
                     d2.addCallback(lambda x: eu.get_storage_index())
                     d2.addCallback(lambda si: uploader.start(eu, si))
                 else:
index 909e3e5df894b619253134adafc5b9621013e399..e1fd54dbe853fde5e4bccfad0a49f7ea0f6dd7b4 100644 (file)
@@ -89,6 +89,8 @@ class FakeClient(service.MultiService):
 
     def get_encoding_parameters(self):
         return self.DEFAULT_ENCODING_PARAMETERS
+    def get_storage_broker(self):
+        return self.storage_broker
 
 def flush_but_dont_ignore(res):
     d = flushEventualQueue()
@@ -114,8 +116,8 @@ class AssistedUpload(unittest.TestCase):
     timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
     def setUp(self):
         self.s = FakeClient()
-        self.storage_broker = StorageFarmBroker(None, True)
-        self.secret_holder = client.SecretHolder("lease secret", "convergence")
+        self.s.storage_broker = StorageFarmBroker(None, True)
+        self.s.secret_holder = client.SecretHolder("lease secret", "converge")
         self.s.startService()
 
         self.tub = t = Tub()
@@ -129,8 +131,8 @@ class AssistedUpload(unittest.TestCase):
     def setUpHelper(self, basedir, helper_class=Helper_fake_upload):
         fileutil.make_dirs(basedir)
         self.helper = h = helper_class(basedir,
-                                       self.storage_broker,
-                                       self.secret_holder,
+                                       self.s.storage_broker,
+                                       self.s.secret_holder,
                                        None, None)
         self.helper_furl = self.tub.registerReference(h)