Alter 'immutable/encode.py' and 'immutable/upload.py' to use servers_of_happiness...
authorKevan Carstensen <kevan@isnotajoke.com>
Wed, 4 Nov 2009 12:12:22 +0000 (05:12 -0700)
committerKevan Carstensen <kevan@isnotajoke.com>
Wed, 4 Nov 2009 12:12:22 +0000 (05:12 -0700)
src/allmydata/immutable/encode.py
src/allmydata/immutable/upload.py

index bb0b79ee62326adb9d005001767e3b363d8ab5b0..c3ff0d21899f115c75636c2dd519b4b71aa0e391 100644 (file)
@@ -118,7 +118,7 @@ class Encoder(object):
         assert not self._codec
         k, happy, n, segsize = params
         self.required_shares = k
-        self.shares_of_happiness = happy
+        self.servers_of_happiness = happy
         self.num_shares = n
         self.segment_size = segsize
         self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize))
@@ -176,7 +176,7 @@ class Encoder(object):
         if name == "storage_index":
             return self._storage_index
         elif name == "share_counts":
-            return (self.required_shares, self.shares_of_happiness,
+            return (self.required_shares, self.servers_of_happiness,
                     self.num_shares)
         elif name == "num_segments":
             return self.num_segments
@@ -191,11 +191,13 @@ class Encoder(object):
         else:
             raise KeyError("unknown parameter name '%s'" % name)
 
-    def set_shareholders(self, landlords):
+    def set_shareholders(self, landlords, servermap):
         assert isinstance(landlords, dict)
         for k in landlords:
             assert IStorageBucketWriter.providedBy(landlords[k])
         self.landlords = landlords.copy()
+        assert isinstance(servermap, dict)
+        self.servermap = servermap.copy()
 
     def start(self):
         """ Returns a Deferred that will fire with the verify cap (an instance of
@@ -486,16 +488,19 @@ class Encoder(object):
             # even more UNUSUAL
             self.log("they weren't in our list of landlords", parent=ln,
                      level=log.WEIRD, umid="TQGFRw")
-        if len(self.landlords) < self.shares_of_happiness:
-            msg = "lost too many shareholders during upload (still have %d, want %d): %s" % \
-                  (len(self.landlords), self.shares_of_happiness, why)
-            if self.landlords:
+        del(self.servermap[shareid])
+        servers_left = list(set(self.servermap.values()))
+        if len(servers_left) < self.servers_of_happiness:
+            msg = "lost too many servers during upload (still have %d, want %d): %s" % \
+                  (len(servers_left),
+                   self.servers_of_happiness, why)
+            if servers_left:
                 raise NotEnoughSharesError(msg)
             else:
                 raise NoSharesError(msg)
         self.log("but we can still continue with %s shares, we'll be happy "
-                 "with at least %s" % (len(self.landlords),
-                                       self.shares_of_happiness),
+                 "with at least %s" % (len(servers_left),
+                                       self.servers_of_happiness),
                  parent=ln)
 
     def _gather_responses(self, dl):
index 72826994588dbd89e61a28ac81e94f93463584a8..00efceaacded4215529b9086c0d2d0b67d40407b 100644 (file)
@@ -128,6 +128,23 @@ class PeerTracker:
         self.buckets.update(b)
         return (alreadygot, set(b.keys()))
 
+def servers_with_unique_shares(existing_shares, used_peers=None):
+    servers = []
+    if used_peers:
+        peers = list(used_peers.copy())
+        # We do this because the preexisting shares list goes by peerid.
+        peers = [x.peerid for x in peers]
+        servers.extend(peers)
+    servers.extend(existing_shares.values())
+    return list(set(servers))
+
+def shares_by_server(existing_shares):
+    servers = {}
+    for server in set(existing_shares.values()):
+        servers[server] = set([x for x in existing_shares.keys()
+                               if existing_shares[x] == server])
+    return servers
+
 class Tahoe2PeerSelector:
 
     def __init__(self, upload_id, logparent=None, upload_status=None):
@@ -144,7 +161,7 @@ class Tahoe2PeerSelector:
 
     def get_shareholders(self, storage_broker, secret_holder,
                          storage_index, share_size, block_size,
-                         num_segments, total_shares, shares_of_happiness):
+                         num_segments, total_shares, servers_of_happiness):
         """
         @return: (used_peers, already_peers), where used_peers is a set of
                  PeerTracker instances that have agreed to hold some shares
@@ -157,7 +174,7 @@ class Tahoe2PeerSelector:
             self._status.set_status("Contacting Peers..")
 
         self.total_shares = total_shares
-        self.shares_of_happiness = shares_of_happiness
+        self.servers_of_happiness = servers_of_happiness
 
         self.homeless_shares = range(total_shares)
         # self.uncontacted_peers = list() # peers we haven't asked yet
@@ -222,20 +239,52 @@ class Tahoe2PeerSelector:
         d = defer.maybeDeferred(self._loop)
         return d
 
+
     def _loop(self):
         if not self.homeless_shares:
-            # all done
-            msg = ("placed all %d shares, "
-                   "sent %d queries to %d peers, "
-                   "%d queries placed some shares, %d placed none, "
-                   "got %d errors" %
-                   (self.total_shares,
-                    self.query_count, self.num_peers_contacted,
-                    self.good_query_count, self.bad_query_count,
-                    self.error_count))
-            log.msg("peer selection successful for %s: %s" % (self, msg),
+            effective_happiness = servers_with_unique_shares(
+                                                   self.preexisting_shares,
+                                                   self.use_peers)
+            if self.servers_of_happiness <= len(effective_happiness):
+                msg = ("placed all %d shares, "
+                       "sent %d queries to %d peers, "
+                       "%d queries placed some shares, %d placed none, "
+                       "got %d errors" %
+                       (self.total_shares,
+                        self.query_count, self.num_peers_contacted,
+                        self.good_query_count, self.bad_query_count,
+                        self.error_count))
+                log.msg("peer selection successful for %s: %s" % (self, msg),
                     parent=self._log_parent)
-            return (self.use_peers, self.preexisting_shares)
+                return (self.use_peers, self.preexisting_shares)
+            else:
+                delta = self.servers_of_happiness - len(effective_happiness)
+                shares = shares_by_server(self.preexisting_shares)
+                # Each server in shares maps to a set of shares stored on it.
+                # Since we want to keep at least one share on each server 
+                # that has one (otherwise we'd only be making
+                # the situation worse by removing distinct servers),
+                # each server has len(its shares) - 1 to spread around.
+                shares_to_spread = sum([len(list(sharelist)) - 1
+                                        for (server, sharelist)
+                                        in shares.items()])
+                if delta <= len(self.uncontacted_peers) and \
+                   shares_to_spread >= delta:
+                    # Loop through the allocated shares, removing 
+                    items = shares.items()
+                    while len(self.homeless_shares) < delta:
+                        servernum, sharelist = items.pop()
+                        if len(sharelist) > 1:
+                            share = sharelist.pop()
+                            self.homeless_shares.append(share)
+                            del(self.preexisting_shares[share])
+                            items.append((servernum, sharelist))
+                    return self._loop()
+                else:
+                    raise NotEnoughSharesError("shares could only be placed on %d "
+                                            "servers (%d were requested)" %
+                                            (len(effective_happiness),
+                                             self.servers_of_happiness))
 
         if self.uncontacted_peers:
             peer = self.uncontacted_peers.pop(0)
@@ -284,15 +333,18 @@ class Tahoe2PeerSelector:
         else:
             # no more peers. If we haven't placed enough shares, we fail.
             placed_shares = self.total_shares - len(self.homeless_shares)
-            if placed_shares < self.shares_of_happiness:
+            effective_happiness = servers_with_unique_shares(
+                                                   self.preexisting_shares,
+                                                   self.use_peers)
+            if len(effective_happiness) < self.servers_of_happiness:
                 msg = ("placed %d shares out of %d total (%d homeless), "
-                       "want to place %d, "
+                       "want to place on %d servers, "
                        "sent %d queries to %d peers, "
                        "%d queries placed some shares, %d placed none, "
                        "got %d errors" %
                        (self.total_shares - len(self.homeless_shares),
                         self.total_shares, len(self.homeless_shares),
-                        self.shares_of_happiness,
+                        self.servers_of_happiness,
                         self.query_count, self.num_peers_contacted,
                         self.good_query_count, self.bad_query_count,
                         self.error_count))
@@ -339,6 +391,12 @@ class Tahoe2PeerSelector:
                     level=log.NOISY, parent=self._log_parent)
             progress = False
             for s in alreadygot:
+                if self.preexisting_shares.has_key(s):
+                    old_size = len(servers_with_unique_shares(self.preexisting_shares))
+                    new_candidate = self.preexisting_shares.copy()
+                    new_candidate[s] = peer.peerid
+                    new_size = len(servers_with_unique_shares(new_candidate))
+                    if old_size >= new_size: continue
                 self.preexisting_shares[s] = peer.peerid
                 if s in self.homeless_shares:
                     self.homeless_shares.remove(s)
@@ -764,12 +822,14 @@ class CHKUploader:
         for peer in used_peers:
             assert isinstance(peer, PeerTracker)
         buckets = {}
+        servermap = already_peers.copy()
         for peer in used_peers:
             buckets.update(peer.buckets)
             for shnum in peer.buckets:
                 self._peer_trackers[shnum] = peer
+                servermap[shnum] = peer.peerid
         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
-        encoder.set_shareholders(buckets)
+        encoder.set_shareholders(buckets, servermap)
 
     def _encrypted_done(self, verifycap):
         """ Returns a Deferred that will fire with the UploadResults instance. """