]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/immutable/offloaded.py
Cosmetic changes to match cloud backend branch.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / offloaded.py
index c8da456de6098b4b7c9d2a7134817a16cbb226e8..fbd756be291bc46f38a16b952b7f4f0460cff307 100644 (file)
@@ -1,7 +1,6 @@
 
 import os, stat, time, weakref
 from zope.interface import implements
-from twisted.application import service
 from twisted.internet import defer
 from foolscap.api import Referenceable, DeadReferenceError, eventually
 import allmydata # for __full_version__
@@ -10,7 +9,7 @@ from allmydata.storage.server import si_b2a
 from allmydata.immutable import upload
 from allmydata.immutable.layout import ReadBucketProxy
 from allmydata.util.assertutil import precondition
-from allmydata.util import idlib, log, observer, fileutil, hashutil, dictutil
+from allmydata.util import log, observer, fileutil, hashutil, dictutil
 
 
 class NotEnoughWritersError(Exception):
@@ -54,23 +53,23 @@ class CHKCheckerAndUEBFetcher:
 
     def _get_all_shareholders(self, storage_index):
         dl = []
-        for (peerid, ss) in self._peer_getter("storage", storage_index):
-            d = ss.callRemote("get_buckets", storage_index)
+        for s in self._peer_getter(storage_index):
+            d = s.get_rref().callRemote("get_buckets", storage_index)
             d.addCallbacks(self._got_response, self._got_error,
-                           callbackArgs=(peerid,))
+                           callbackArgs=(s,))
             dl.append(d)
         return defer.DeferredList(dl)
 
-    def _got_response(self, buckets, peerid):
+    def _got_response(self, buckets, server):
         # buckets is a dict: maps shum to an rref of the server who holds it
         shnums_s = ",".join([str(shnum) for shnum in buckets])
         self.log("got_response: [%s] has %d shares (%s)" %
-                 (idlib.shortnodeid_b2a(peerid), len(buckets), shnums_s),
+                 (server.get_name(), len(buckets), shnums_s),
                  level=log.NOISY)
         self._found_shares.update(buckets.keys())
         for k in buckets:
-            self._sharemap.add(k, peerid)
-        self._readers.update( [ (bucket, peerid)
+            self._sharemap.add(k, server.get_serverid())
+        self._readers.update( [ (bucket, server)
                                 for bucket in buckets.values() ] )
 
     def _got_error(self, f):
@@ -85,8 +84,8 @@ class CHKCheckerAndUEBFetcher:
         if not self._readers:
             self.log("no readers, so no UEB", level=log.NOISY)
             return
-        b,peerid = self._readers.pop()
-        rbp = ReadBucketProxy(b, peerid, si_b2a(self._storage_index))
+        b,server = self._readers.pop()
+        rbp = ReadBucketProxy(b, server, si_b2a(self._storage_index))
         d = rbp.get_uri_extension()
         d.addCallback(self._got_uri_extension)
         d.addErrback(self._ueb_error)
@@ -135,16 +134,16 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
                 "application-version": str(allmydata.__full_version__),
                 }
 
-    def __init__(self, storage_index, helper,
+    def __init__(self, storage_index,
+                 helper, storage_broker, secret_holder,
                  incoming_file, encoding_file,
-                 results, log_number):
+                 log_number):
         self._storage_index = storage_index
         self._helper = helper
         self._incoming_file = incoming_file
         self._encoding_file = encoding_file
         self._upload_id = si_b2a(storage_index)[:5]
         self._log_number = log_number
-        self._results = results
         self._upload_status = upload.UploadStatus()
         self._upload_status.set_helper(False)
         self._upload_status.set_storage_index(storage_index)
@@ -153,12 +152,14 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
         self._helper.log("CHKUploadHelper starting for SI %s" % self._upload_id,
                          parent=log_number)
 
-        self._client = helper.parent
+        self._storage_broker = storage_broker
+        self._secret_holder = secret_holder
         self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
                                              self._log_number)
         self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
         self._finished_observers = observer.OneShotObserverList()
 
+        self._started = time.time()
         d = self._fetcher.when_done()
         d.addCallback(lambda res: self._reader.start())
         d.addCallback(lambda res: self.start_encrypted(self._reader))
@@ -170,31 +171,26 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
             kwargs['facility'] = "tahoe.helper.chk"
         return upload.CHKUploader.log(self, *args, **kwargs)
 
-    def start(self):
-        self._started = time.time()
-        # determine if we need to upload the file. If so, return ({},self) .
-        # If not, return (UploadResults,None) .
+    def remote_get_version(self):
+        return self.VERSION
+
+    def remote_upload(self, reader):
+        # reader is an RIEncryptedUploadable. I am specified to return an
+        # UploadResults dictionary.
+
+        # Log how much ciphertext we need to get.
         self.log("deciding whether to upload the file or not", level=log.NOISY)
         if os.path.exists(self._encoding_file):
             # we have the whole file, and we might be encoding it (or the
             # encode/upload might have failed, and we need to restart it).
             self.log("ciphertext already in place", level=log.UNUSUAL)
-            return (self._results, self)
-        if os.path.exists(self._incoming_file):
+        elif os.path.exists(self._incoming_file):
             # we have some of the file, but not all of it (otherwise we'd be
             # encoding). The caller might be useful.
             self.log("partial ciphertext already present", level=log.UNUSUAL)
-            return (self._results, self)
-        # we don't remember uploading this file
-        self.log("no ciphertext yet", level=log.NOISY)
-        return (self._results, self)
-
-    def remote_get_version(self):
-        return self.VERSION
-
-    def remote_upload(self, reader):
-        # reader is an RIEncryptedUploadable. I am specified to return an
-        # UploadResults dictionary.
+        else:
+            # we don't remember uploading this file
+            self.log("no ciphertext yet", level=log.NOISY)
 
         # let our fetcher pull ciphertext from the reader.
         self._fetcher.add_reader(reader)
@@ -204,19 +200,38 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
         # and inform the client when the upload has finished
         return self._finished_observers.when_fired()
 
-    def _finished(self, uploadresults):
-        precondition(isinstance(uploadresults.verifycapstr, str), uploadresults.verifycapstr)
-        assert interfaces.IUploadResults.providedBy(uploadresults), uploadresults
-        r = uploadresults
-        v = uri.from_string(r.verifycapstr)
-        r.uri_extension_hash = v.uri_extension_hash
+    def _finished(self, ur):
+        assert interfaces.IUploadResults.providedBy(ur), ur
+        vcapstr = ur.get_verifycapstr()
+        precondition(isinstance(vcapstr, str), vcapstr)
+        v = uri.from_string(vcapstr)
         f_times = self._fetcher.get_times()
-        r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
-        r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
-        r.timings["total_fetch"] = f_times["total"]
+
+        hur = upload.HelperUploadResults()
+        hur.timings = {"cumulative_fetch": f_times["cumulative_fetch"],
+                       "total_fetch": f_times["total"],
+                       }
+        for key,val in ur.get_timings().items():
+            hur.timings[key] = val
+        hur.uri_extension_hash = v.uri_extension_hash
+        hur.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
+        hur.preexisting_shares = ur.get_preexisting_shares()
+        # hur.sharemap needs to be {shnum: set(serverid)}
+        hur.sharemap = {}
+        for shnum, servers in ur.get_sharemap().items():
+            hur.sharemap[shnum] = set([s.get_serverid() for s in servers])
+        # and hur.servermap needs to be {serverid: set(shnum)}
+        hur.servermap = {}
+        for server, shnums in ur.get_servermap().items():
+            hur.servermap[server.get_serverid()] = set(shnums)
+        hur.pushed_shares = ur.get_pushed_shares()
+        hur.file_size = ur.get_file_size()
+        hur.uri_extension_data = ur.get_uri_extension_data()
+        hur.verifycapstr = vcapstr
+
         self._reader.close()
         os.unlink(self._encoding_file)
-        self._finished_observers.fire(r)
+        self._finished_observers.fire(hur)
         self._helper.upload_finished(self._storage_index, v.size)
         del self._reader
 
@@ -302,6 +317,9 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
         if os.path.exists(self._encoding_file):
             self.log("ciphertext already present, bypassing fetch",
                      level=log.UNUSUAL)
+            # XXX the following comment is probably stale, since
+            # LocalCiphertextReader.get_plaintext_hashtree_leaves does not exist.
+            #
             # we'll still need the plaintext hashes (when
             # LocalCiphertextReader.get_plaintext_hashtree_leaves() is
             # called), and currently the easiest way to get them is to ask
@@ -310,14 +328,12 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
             # else.
             have = os.stat(self._encoding_file)[stat.ST_SIZE]
             d = self.call("read_encrypted", have-1, 1)
-            d.addCallback(self._done2, started)
-            return
-
-        # first, find out how large the file is going to be
-        d = self.call("get_size")
-        d.addCallback(self._got_size)
-        d.addCallback(self._start_reading)
-        d.addCallback(self._done)
+        else:
+            # first, find out how large the file is going to be
+            d = self.call("get_size")
+            d.addCallback(self._got_size)
+            d.addCallback(self._start_reading)
+            d.addCallback(self._done)
         d.addCallback(self._done2, started)
         d.addErrback(self._failed)
 
@@ -469,11 +485,7 @@ class LocalCiphertextReader(AskUntilSuccessMixin):
         d = defer.maybeDeferred(self.f.read, length)
         d.addCallback(lambda data: [data])
         return d
-    def get_plaintext_hashtree_leaves(self, first, last, num_segments):
-        return self.call("get_plaintext_hashtree_leaves", first, last,
-                         num_segments)
-    def get_plaintext_hash(self):
-        return self.call("get_plaintext_hash")
+
     def close(self):
         self.f.close()
         # ??. I'm not sure if it makes sense to forward the close message.
@@ -481,7 +493,7 @@ class LocalCiphertextReader(AskUntilSuccessMixin):
 
 
 
-class Helper(Referenceable, service.MultiService):
+class Helper(Referenceable):
     implements(interfaces.RIHelper, interfaces.IStatsProducer)
     # this is the non-distributed version. When we need to have multiple
     # helpers, this object will become the HelperCoordinator, and will query
@@ -494,19 +506,19 @@ class Helper(Referenceable, service.MultiService):
                  { },
                 "application-version": str(allmydata.__full_version__),
                 }
-    chk_upload_helper_class = CHKUploadHelper
     MAX_UPLOAD_STATUSES = 10
 
-    def __init__(self, basedir, stats_provider=None):
+    def __init__(self, basedir, storage_broker, secret_holder,
+                 stats_provider, history):
         self._basedir = basedir
+        self._storage_broker = storage_broker
+        self._secret_holder = secret_holder
         self._chk_incoming = os.path.join(basedir, "CHK_incoming")
         self._chk_encoding = os.path.join(basedir, "CHK_encoding")
         fileutil.make_dirs(self._chk_incoming)
         fileutil.make_dirs(self._chk_encoding)
         self._active_uploads = {}
         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
-        self._all_upload_statuses = weakref.WeakKeyDictionary()
-        self._recent_upload_statuses = []
         self.stats_provider = stats_provider
         if stats_provider:
             stats_provider.register_producer(self)
@@ -517,15 +529,12 @@ class Helper(Referenceable, service.MultiService):
                           "chk_upload_helper.fetched_bytes": 0,
                           "chk_upload_helper.encoded_bytes": 0,
                           }
-        service.MultiService.__init__(self)
-
-    def setServiceParent(self, parent):
-        service.MultiService.setServiceParent(self, parent)
+        self._history = history
 
     def log(self, *args, **kwargs):
         if 'facility' not in kwargs:
             kwargs['facility'] = "tahoe.helper"
-        return self.parent.log(*args, **kwargs)
+        return log.msg(*args, **kwargs)
 
     def count(self, key, value=1):
         if self.stats_provider:
@@ -571,46 +580,15 @@ class Helper(Referenceable, service.MultiService):
 
     def remote_upload_chk(self, storage_index):
         self.count("chk_upload_helper.upload_requests")
-        r = upload.UploadResults()
-        started = time.time()
-        si_s = si_b2a(storage_index)
-        lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
-        incoming_file = os.path.join(self._chk_incoming, si_s)
-        encoding_file = os.path.join(self._chk_encoding, si_s)
+        lp = self.log(format="helper: upload_chk query for SI %(si)s",
+                      si=si_b2a(storage_index))
         if storage_index in self._active_uploads:
             self.log("upload is currently active", parent=lp)
             uh = self._active_uploads[storage_index]
-            return uh.start()
-
-        d = self._check_for_chk_already_in_grid(storage_index, r, lp)
-        def _checked(already_present):
-            elapsed = time.time() - started
-            r.timings['existence_check'] = elapsed
-            if already_present:
-                # the necessary results are placed in the UploadResults
-                self.count("chk_upload_helper.upload_already_present")
-                self.log("file already found in grid", parent=lp)
-                return (r, None)
-
-            self.count("chk_upload_helper.upload_need_upload")
-            # the file is not present in the grid, by which we mean there are
-            # less than 'N' shares available.
-            self.log("unable to find file in the grid", parent=lp,
-                     level=log.NOISY)
-            # We need an upload helper. Check our active uploads again in
-            # case there was a race.
-            if storage_index in self._active_uploads:
-                self.log("upload is currently active", parent=lp)
-                uh = self._active_uploads[storage_index]
-            else:
-                self.log("creating new upload helper", parent=lp)
-                uh = self.chk_upload_helper_class(storage_index, self,
-                                                  incoming_file, encoding_file,
-                                                  r, lp)
-                self._active_uploads[storage_index] = uh
-                self._add_upload(uh)
-            return uh.start()
-        d.addCallback(_checked)
+            return (None, uh)
+
+        d = self._check_chk(storage_index, lp)
+        d.addCallback(self._did_chk_check, storage_index, lp)
         def _err(f):
             self.log("error while checking for chk-already-in-grid",
                      failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
@@ -618,34 +596,68 @@ class Helper(Referenceable, service.MultiService):
         d.addErrback(_err)
         return d
 
-    def _check_for_chk_already_in_grid(self, storage_index, results, lp):
+    def _check_chk(self, storage_index, lp):
         # see if this file is already in the grid
         lp2 = self.log("doing a quick check+UEBfetch",
                        parent=lp, level=log.NOISY)
-        c = CHKCheckerAndUEBFetcher(self.parent.get_permuted_peers,
-                                    storage_index, lp2)
+        sb = self._storage_broker
+        c = CHKCheckerAndUEBFetcher(sb.get_servers_for_psi, storage_index, lp2)
         d = c.check()
         def _checked(res):
             if res:
                 (sharemap, ueb_data, ueb_hash) = res
                 self.log("found file in grid", level=log.NOISY, parent=lp)
-                results.uri_extension_hash = ueb_hash
-                results.sharemap = sharemap
-                results.uri_extension_data = ueb_data
-                results.preexisting_shares = len(sharemap)
-                results.pushed_shares = 0
-                return True
-            return False
+                hur = upload.HelperUploadResults()
+                hur.uri_extension_hash = ueb_hash
+                hur.sharemap = sharemap
+                hur.uri_extension_data = ueb_data
+                hur.preexisting_shares = len(sharemap)
+                hur.pushed_shares = 0
+                return hur
+            return None
         d.addCallback(_checked)
         return d
 
+    def _did_chk_check(self, already_present, storage_index, lp):
+        if already_present:
+            # the necessary results are placed in the UploadResults
+            self.count("chk_upload_helper.upload_already_present")
+            self.log("file already found in grid", parent=lp)
+            return (already_present, None)
+
+        self.count("chk_upload_helper.upload_need_upload")
+        # the file is not present in the grid, by which we mean there are
+        # less than 'N' shares available.
+        self.log("unable to find file in the grid", parent=lp,
+                 level=log.NOISY)
+        # We need an upload helper. Check our active uploads again in
+        # case there was a race.
+        if storage_index in self._active_uploads:
+            self.log("upload is currently active", parent=lp)
+            uh = self._active_uploads[storage_index]
+        else:
+            self.log("creating new upload helper", parent=lp)
+            uh = self._make_chk_upload_helper(storage_index, lp)
+            self._active_uploads[storage_index] = uh
+            self._add_upload(uh)
+        return (None, uh)
+
+    def _make_chk_upload_helper(self, storage_index, lp):
+        si_s = si_b2a(storage_index)
+        incoming_file = os.path.join(self._chk_incoming, si_s)
+        encoding_file = os.path.join(self._chk_encoding, si_s)
+        uh = CHKUploadHelper(storage_index, self,
+                             self._storage_broker,
+                             self._secret_holder,
+                             incoming_file, encoding_file,
+                             lp)
+        return uh
+
     def _add_upload(self, uh):
         self._all_uploads[uh] = None
-        s = uh.get_upload_status()
-        self._all_upload_statuses[s] = None
-        self._recent_upload_statuses.append(s)
-        while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
-            self._recent_upload_statuses.pop(0)
+        if self._history:
+            s = uh.get_upload_status()
+            self._history.notify_helper_upload(s)
 
     def upload_finished(self, storage_index, size):
         # this is called with size=0 if the upload failed
@@ -654,6 +666,3 @@ class Helper(Referenceable, service.MultiService):
         del self._active_uploads[storage_index]
         s = uh.get_upload_status()
         s.set_active(False)
-
-    def get_all_upload_statuses(self):
-        return self._all_upload_statuses