]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/immutable/upload.py
Let Uploader retain History instead of passing it into upload(). Fixes #1079.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
index b36a435ce67ed7ec70e35dfd32327c9f10cf15d0..013aca97cbc6a9ce2c221531cc9e444f5bfdd33e 100644 (file)
@@ -14,7 +14,7 @@ from allmydata.storage.server import si_b2a
 from allmydata.immutable import encode
 from allmydata.util import base32, dictutil, idlib, log, mathutil
 from allmydata.util.happinessutil import servers_of_happiness, \
-                                         shares_by_server, merge_peers, \
+                                         shares_by_server, merge_servers, \
                                          failure_message
 from allmydata.util.assertutil import precondition
 from allmydata.util.rrefutil import add_version_to_remote_reference
@@ -69,21 +69,18 @@ def pretty_print_shnum_to_servers(s):
     return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
 
 class ServerTracker:
-    def __init__(self, serverid, storage_server,
+    def __init__(self, server,
                  sharesize, blocksize, num_segments, num_share_hashes,
                  storage_index,
                  bucket_renewal_secret, bucket_cancel_secret):
-        precondition(isinstance(serverid, str), serverid)
-        precondition(len(serverid) == 20, serverid)
-        self.serverid = serverid
-        self._storageserver = storage_server # to an RIStorageServer
+        self._server = server
         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
         self.sharesize = sharesize
 
-        wbp = layout.make_write_bucket_proxy(None, sharesize,
+        wbp = layout.make_write_bucket_proxy(None, None, sharesize,
                                              blocksize, num_segments,
                                              num_share_hashes,
-                                             EXTENSION_SIZE, serverid)
+                                             EXTENSION_SIZE)
         self.wbp_class = wbp.__class__ # to create more of them
         self.allocated_size = wbp.get_allocated_size()
         self.blocksize = blocksize
@@ -96,34 +93,38 @@ class ServerTracker:
 
     def __repr__(self):
         return ("<ServerTracker for server %s and SI %s>"
-                % (idlib.shortnodeid_b2a(self.serverid),
-                   si_b2a(self.storage_index)[:5]))
+                % (self._server.get_name(), si_b2a(self.storage_index)[:5]))
+
+    def get_serverid(self):
+        return self._server.get_serverid()
+    def get_name(self):
+        return self._server.get_name()
 
     def query(self, sharenums):
-        d = self._storageserver.callRemote("allocate_buckets",
-                                           self.storage_index,
-                                           self.renew_secret,
-                                           self.cancel_secret,
-                                           sharenums,
-                                           self.allocated_size,
-                                           canary=Referenceable())
+        rref = self._server.get_rref()
+        d = rref.callRemote("allocate_buckets",
+                            self.storage_index,
+                            self.renew_secret,
+                            self.cancel_secret,
+                            sharenums,
+                            self.allocated_size,
+                            canary=Referenceable())
         d.addCallback(self._got_reply)
         return d
 
     def ask_about_existing_shares(self):
-        return self._storageserver.callRemote("get_buckets",
-                                              self.storage_index)
+        rref = self._server.get_rref()
+        return rref.callRemote("get_buckets", self.storage_index)
 
     def _got_reply(self, (alreadygot, buckets)):
         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
         b = {}
         for sharenum, rref in buckets.iteritems():
-            bp = self.wbp_class(rref, self.sharesize,
+            bp = self.wbp_class(rref, self._server, self.sharesize,
                                 self.blocksize,
                                 self.num_segments,
                                 self.num_share_hashes,
-                                EXTENSION_SIZE,
-                                self.serverid)
+                                EXTENSION_SIZE)
             b[sharenum] = bp
         self.buckets.update(b)
         return (alreadygot, set(b.keys()))
@@ -147,7 +148,7 @@ class ServerTracker:
 
 
 def str_shareloc(shnum, bucketwriter):
-    return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),)
+    return "%s: %s" % (shnum, bucketwriter.get_servername(),)
 
 class Tahoe2ServerSelector(log.PrefixingLogMixin):
 
@@ -171,11 +172,12 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                          num_segments, total_shares, needed_shares,
                          servers_of_happiness):
         """
-        @return: (upload_trackers, already_servers), where upload_trackers is
-                 a set of ServerTracker instances that have agreed to hold
+        @return: (upload_trackers, already_serverids), where upload_trackers
+                 is a set of ServerTracker instances that have agreed to hold
                  some shares for us (the shareids are stashed inside the
-                 ServerTracker), and already_servers is a dict mapping shnum
-                 to a set of serverids which claim to already have the share.
+                 ServerTracker), and already_serverids is a dict mapping
+                 shnum to a set of serverids for servers which claim to
+                 already have the share.
         """
 
         if self._status:
@@ -186,9 +188,6 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
         self.needed_shares = needed_shares
 
         self.homeless_shares = set(range(total_shares))
-        self.contacted_trackers = [] # servers worth asking again
-        self.contacted_trackers2 = [] # servers that we have asked again
-        self._started_second_pass = False
         self.use_trackers = set() # ServerTrackers that have shares assigned
                                   # to them
         self.preexisting_shares = {} # shareid => set(serverids) holding shareid
@@ -205,12 +204,11 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
 
         # figure out how much space to ask for
-        wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
-                                             num_share_hashes, EXTENSION_SIZE,
-                                             None)
+        wbp = layout.make_write_bucket_proxy(None, None,
+                                             share_size, 0, num_segments,
+                                             num_share_hashes, EXTENSION_SIZE)
         allocated_size = wbp.get_allocated_size()
-        all_servers = [(s.get_serverid(), s.get_rref())
-                       for s in storage_broker.get_servers_for_psi(storage_index)]
+        all_servers = storage_broker.get_servers_for_psi(storage_index)
         if not all_servers:
             raise NoServersError("client gave us zero servers")
 
@@ -219,12 +217,12 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
         # field) from getting large shares (for files larger than about
         # 12GiB). See #439 for details.
         def _get_maxsize(server):
-            (serverid, conn) = server
-            v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
+            v0 = server.get_rref().version
+            v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
             return v1["maximum-immutable-share-size"]
-        writable_servers = [server for server in all_servers
+        writeable_servers = [server for server in all_servers
                             if _get_maxsize(server) >= allocated_size]
-        readonly_servers = set(all_servers[:2*total_shares]) - set(writable_servers)
+        readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)
 
         # decide upon the renewal/cancel secrets, to include them in the
         # allocate_buckets query.
@@ -236,16 +234,33 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
                                                      storage_index)
         def _make_trackers(servers):
-           return [ServerTracker(serverid, conn,
-                                 share_size, block_size,
-                                 num_segments, num_share_hashes,
-                                 storage_index,
-                                 bucket_renewal_secret_hash(file_renewal_secret,
-                                                            serverid),
-                                 bucket_cancel_secret_hash(file_cancel_secret,
-                                                           serverid))
-                   for (serverid, conn) in servers]
-        self.uncontacted_trackers = _make_trackers(writable_servers)
+            trackers = []
+            for s in servers:
+                seed = s.get_lease_seed()
+                renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
+                cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
+                st = ServerTracker(s,
+                                   share_size, block_size,
+                                   num_segments, num_share_hashes,
+                                   storage_index,
+                                   renew, cancel)
+                trackers.append(st)
+            return trackers
+
+        # We assign each servers/trackers into one three lists. They all
+        # start in the "first pass" list. During the first pass, as we ask
+        # each one to hold a share, we move their tracker to the "second
+        # pass" list, until the first-pass list is empty. Then during the
+        # second pass, as we ask each to hold more shares, we move their
+        # tracker to the "next pass" list, until the second-pass list is
+        # empty. Then we move everybody from the next-pass list back to the
+        # second-pass list and repeat the "second" pass (really the third,
+        # fourth, etc pass), until all shares are assigned, or we've run out
+        # of potential servers.
+        self.first_pass_trackers = _make_trackers(writeable_servers)
+        self.second_pass_trackers = [] # servers worth asking again
+        self.next_pass_trackers = [] # servers that we have asked again
+        self._started_second_pass = False
 
         # We don't try to allocate shares to these servers, since they've
         # said that they're incapable of storing shares of the size that we'd
@@ -265,27 +280,26 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
         for tracker in readonly_trackers:
             assert isinstance(tracker, ServerTracker)
             d = tracker.ask_about_existing_shares()
-            d.addBoth(self._handle_existing_response, tracker.serverid)
+            d.addBoth(self._handle_existing_response, tracker)
             ds.append(d)
             self.num_servers_contacted += 1
             self.query_count += 1
             self.log("asking server %s for any existing shares" %
-                     (idlib.shortnodeid_b2a(tracker.serverid),),
-                    level=log.NOISY)
+                     (tracker.get_name(),), level=log.NOISY)
         dl = defer.DeferredList(ds)
         dl.addCallback(lambda ign: self._loop())
         return dl
 
 
-    def _handle_existing_response(self, res, serverid):
+    def _handle_existing_response(self, res, tracker):
         """
         I handle responses to the queries sent by
         Tahoe2ServerSelector._existing_shares.
         """
+        serverid = tracker.get_serverid()
         if isinstance(res, failure.Failure):
             self.log("%s got error during existing shares check: %s"
-                    % (idlib.shortnodeid_b2a(serverid), res),
-                    level=log.UNUSUAL)
+                    % (tracker.get_name(), res), level=log.UNUSUAL)
             self.error_count += 1
             self.bad_query_count += 1
         else:
@@ -293,7 +307,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
             if buckets:
                 self.serverids_with_shares.add(serverid)
             self.log("response to get_buckets() from server %s: alreadygot=%s"
-                    % (idlib.shortnodeid_b2a(serverid), tuple(sorted(buckets))),
+                    % (tracker.get_name(), tuple(sorted(buckets))),
                     level=log.NOISY)
             for bucket in buckets:
                 self.preexisting_shares.setdefault(bucket, set()).add(serverid)
@@ -324,7 +338,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
 
     def _loop(self):
         if not self.homeless_shares:
-            merged = merge_peers(self.preexisting_shares, self.use_trackers)
+            merged = merge_servers(self.preexisting_shares, self.use_trackers)
             effective_happiness = servers_of_happiness(merged)
             if self.servers_of_happiness <= effective_happiness:
                 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
@@ -353,7 +367,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                 shares_to_spread = sum([len(list(sharelist)) - 1
                                         for (server, sharelist)
                                         in shares.items()])
-                if delta <= len(self.uncontacted_trackers) and \
+                if delta <= len(self.first_pass_trackers) and \
                    shares_to_spread >= delta:
                     items = shares.items()
                     while len(self.homeless_shares) < delta:
@@ -389,8 +403,8 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                     self.log(servmsg, level=log.INFREQUENT)
                     return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
 
-        if self.uncontacted_trackers:
-            tracker = self.uncontacted_trackers.pop(0)
+        if self.first_pass_trackers:
+            tracker = self.first_pass_trackers.pop(0)
             # TODO: don't pre-convert all serverids to ServerTrackers
             assert isinstance(tracker, ServerTracker)
 
@@ -401,42 +415,42 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
             if self._status:
                 self._status.set_status("Contacting Servers [%s] (first query),"
                                         " %d shares left.."
-                                        % (idlib.shortnodeid_b2a(tracker.serverid),
+                                        % (tracker.get_name(),
                                            len(self.homeless_shares)))
             d = tracker.query(shares_to_ask)
             d.addBoth(self._got_response, tracker, shares_to_ask,
-                      self.contacted_trackers)
+                      self.second_pass_trackers)
             return d
-        elif self.contacted_trackers:
+        elif self.second_pass_trackers:
             # ask a server that we've already asked.
             if not self._started_second_pass:
                 self.log("starting second pass",
                         level=log.NOISY)
                 self._started_second_pass = True
             num_shares = mathutil.div_ceil(len(self.homeless_shares),
-                                           len(self.contacted_trackers))
-            tracker = self.contacted_trackers.pop(0)
+                                           len(self.second_pass_trackers))
+            tracker = self.second_pass_trackers.pop(0)
             shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
             self.homeless_shares -= shares_to_ask
             self.query_count += 1
             if self._status:
                 self._status.set_status("Contacting Servers [%s] (second query),"
                                         " %d shares left.."
-                                        % (idlib.shortnodeid_b2a(tracker.serverid),
+                                        % (tracker.get_name(),
                                            len(self.homeless_shares)))
             d = tracker.query(shares_to_ask)
             d.addBoth(self._got_response, tracker, shares_to_ask,
-                      self.contacted_trackers2)
+                      self.next_pass_trackers)
             return d
-        elif self.contacted_trackers2:
+        elif self.next_pass_trackers:
             # we've finished the second-or-later pass. Move all the remaining
-            # servers back into self.contacted_trackers for the next pass.
-            self.contacted_trackers.extend(self.contacted_trackers2)
-            self.contacted_trackers2[:] = []
+            # servers back into self.second_pass_trackers for the next pass.
+            self.second_pass_trackers.extend(self.next_pass_trackers)
+            self.next_pass_trackers[:] = []
             return self._loop()
         else:
             # no more servers. If we haven't placed enough shares, we fail.
-            merged = merge_peers(self.preexisting_shares, self.use_trackers)
+            merged = merge_servers(self.preexisting_shares, self.use_trackers)
             effective_happiness = servers_of_happiness(merged)
             if effective_happiness < self.servers_of_happiness:
                 msg = failure_message(len(self.serverids_with_shares),
@@ -467,9 +481,9 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
             self.error_count += 1
             self.bad_query_count += 1
             self.homeless_shares |= shares_to_ask
-            if (self.uncontacted_trackers
-                or self.contacted_trackers
-                or self.contacted_trackers2):
+            if (self.first_pass_trackers
+                or self.second_pass_trackers
+                or self.next_pass_trackers):
                 # there is still hope, so just loop
                 pass
             else:
@@ -483,12 +497,12 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
         else:
             (alreadygot, allocated) = res
             self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
-                    % (idlib.shortnodeid_b2a(tracker.serverid),
+                    % (tracker.get_name(),
                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
                     level=log.NOISY)
             progress = False
             for s in alreadygot:
-                self.preexisting_shares.setdefault(s, set()).add(tracker.serverid)
+                self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
                 if s in self.homeless_shares:
                     self.homeless_shares.remove(s)
                     progress = True
@@ -502,7 +516,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
                 progress = True
 
             if allocated or alreadygot:
-                self.serverids_with_shares.add(tracker.serverid)
+                self.serverids_with_shares.add(tracker.get_serverid())
 
             not_yet_present = set(shares_to_ask) - set(alreadygot)
             still_homeless = not_yet_present - set(allocated)
@@ -920,39 +934,41 @@ class CHKUploader:
         d.addCallback(_done)
         return d
 
-    def set_shareholders(self, (upload_trackers, already_servers), encoder):
+    def set_shareholders(self, (upload_trackers, already_serverids), encoder):
         """
         @param upload_trackers: a sequence of ServerTracker objects that
                                 have agreed to hold some shares for us (the
                                 shareids are stashed inside the ServerTracker)
-        @paran already_servers: a dict mapping sharenum to a set of serverids
-                                that claim to already have this share
+
+        @paran already_serverids: a dict mapping sharenum to a set of
+                                  serverids for servers that claim to already
+                                  have this share
         """
-        msgtempl = "set_shareholders; upload_trackers is %s, already_servers is %s"
+        msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
         values = ([', '.join([str_shareloc(k,v)
                               for k,v in st.buckets.iteritems()])
-                   for st in upload_trackers], already_servers)
+                   for st in upload_trackers], already_serverids)
         self.log(msgtempl % values, level=log.OPERATIONAL)
         # record already-present shares in self._results
-        self._results.preexisting_shares = len(already_servers)
+        self._results.preexisting_shares = len(already_serverids)
 
         self._server_trackers = {} # k: shnum, v: instance of ServerTracker
         for tracker in upload_trackers:
             assert isinstance(tracker, ServerTracker)
         buckets = {}
-        servermap = already_servers.copy()
+        servermap = already_serverids.copy()
         for tracker in upload_trackers:
             buckets.update(tracker.buckets)
             for shnum in tracker.buckets:
                 self._server_trackers[shnum] = tracker
-                servermap.setdefault(shnum, set()).add(tracker.serverid)
+                servermap.setdefault(shnum, set()).add(tracker.get_serverid())
         assert len(buckets) == sum([len(tracker.buckets)
                                     for tracker in upload_trackers]), \
             "%s (%s) != %s (%s)" % (
                 len(buckets),
                 buckets,
                 sum([len(tracker.buckets) for tracker in upload_trackers]),
-                [(t.buckets, t.serverid) for t in upload_trackers]
+                [(t.buckets, t.get_serverid()) for t in upload_trackers]
                 )
         encoder.set_shareholders(buckets, servermap)
 
@@ -961,7 +977,7 @@ class CHKUploader:
         r = self._results
         for shnum in self._encoder.get_shares_placed():
             server_tracker = self._server_trackers[shnum]
-            serverid = server_tracker.serverid
+            serverid = server_tracker.get_serverid()
             r.sharemap.add(shnum, serverid)
             r.servermap.add(serverid, shnum)
         r.pushed_shares = len(self._encoder.get_shares_placed())
@@ -1393,9 +1409,10 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
     name = "uploader"
     URI_LIT_SIZE_THRESHOLD = 55
 
-    def __init__(self, helper_furl=None, stats_provider=None):
+    def __init__(self, helper_furl=None, stats_provider=None, history=None):
         self._helper_furl = helper_furl
         self.stats_provider = stats_provider
+        self._history = history
         self._helper = None
         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
@@ -1431,7 +1448,7 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
         return (self._helper_furl, bool(self._helper))
 
 
-    def upload(self, uploadable, history=None):
+    def upload(self, uploadable):
         """
         Returns a Deferred that will fire with the UploadResults instance.
         """
@@ -1467,8 +1484,8 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
                     d2.addCallback(lambda x: uploader.start(eu))
 
                 self._all_uploads[uploader] = None
-                if history:
-                    history.add_upload(uploader.get_upload_status())
+                if self._history:
+                    self._history.add_upload(uploader.get_upload_status())
                 def turn_verifycap_into_read_cap(uploadresults):
                     # Generate the uri from the verifycap plus the key.
                     d3 = uploadable.get_encryption_key()