]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
upload-helper: avoid duplicate uploads: check the grid to see if the file already...
authorBrian Warner <warner@allmydata.com>
Thu, 31 Jan 2008 01:49:02 +0000 (18:49 -0700)
committerBrian Warner <warner@allmydata.com>
Thu, 31 Jan 2008 01:49:02 +0000 (18:49 -0700)
src/allmydata/offloaded.py
src/allmydata/test/test_helper.py
src/allmydata/test/test_system.py

index 6cb9f98a9da8dffa87dc2c3ca74a7819c582a03a..2a5f6bfbb7dab573a2a814b5744eb4c8dc5b5e6b 100644 (file)
@@ -5,14 +5,125 @@ from twisted.application import service
 from twisted.internet import defer
 from foolscap import Referenceable
 from foolscap.eventual import eventually
-from allmydata import upload, interfaces
-from allmydata.util import idlib, log, observer, fileutil
+from allmydata import upload, interfaces, storage, uri
+from allmydata.util import idlib, log, observer, fileutil, hashutil
 
 
 class NotEnoughWritersError(Exception):
     pass
 
 
+class CHKCheckerAndUEBFetcher:
+    """I check to see if a file is already present in the grid. I also fetch
+    the URI Extension Block, which is useful for an uploading client who
+    wants to avoid the work of encryption and encoding.
+
+    I return False if the file is not completely healthy: i.e. if there are
+    less than 'N' shares present.
+
+    If the file is completely healthy, I return a tuple of (sharemap,
+    UEB_data, UEB_hash).
+    """
+
+    def __init__(self, peer_getter, storage_index, logparent=None):
+        self._peer_getter = peer_getter
+        self._found_shares = set()
+        self._storage_index = storage_index
+        self._sharemap = {}
+        self._readers = set()
+        self._ueb_hash = None
+        self._ueb_data = None
+        self._logparent = logparent
+
+    def log(self, *args, **kwargs):
+        if 'facility' not in kwargs:
+            kwargs['facility'] = "tahoe.helper.chk.checkandUEBfetch"
+        if 'parent' not in kwargs:
+            kwargs['parent'] = self._logparent
+        return log.msg(*args, **kwargs)
+
+    def check(self):
+        d = self._get_all_shareholders(self._storage_index)
+        d.addCallback(self._get_uri_extension)
+        d.addCallback(self._done)
+        return d
+
+    def _get_all_shareholders(self, storage_index):
+        dl = []
+        for (pmpeerid, peerid, connection) in self._peer_getter(storage_index):
+            d = connection.callRemote("get_service", "storageserver")
+            d.addCallback(lambda ss: ss.callRemote("get_buckets",
+                                                   storage_index))
+            d.addCallbacks(self._got_response, self._got_error,
+                           callbackArgs=(peerid,))
+            dl.append(d)
+        return defer.DeferredList(dl)
+
+    def _got_response(self, buckets, peerid):
+        # buckets is a dict: maps shum to an rref of the server who holds it
+        shnums_s = ",".join([str(shnum) for shnum in buckets])
+        self.log("got_response: [%s] has %d shares (%s)" %
+                 (idlib.shortnodeid_b2a(peerid), len(buckets), shnums_s),
+                 level=log.NOISY)
+        self._found_shares.update(buckets.keys())
+        for k in buckets:
+            if k not in self._sharemap:
+                self._sharemap[k] = []
+            self._sharemap[k].append(peerid)
+        self._readers.update(buckets.values())
+
+    def _got_error(self, f):
+        if f.check(KeyError):
+            pass
+        log.err(f, parent=self._logparent)
+        pass
+
+    def _get_uri_extension(self, res):
+        # assume that we can pull the UEB from any share. If we get an error,
+        # declare the whole file unavailable.
+        if not self._readers:
+            self.log("no readers, so no UEB", level=log.NOISY)
+            return
+        b = self._readers.pop()
+        rbp = storage.ReadBucketProxy(b)
+        d = rbp.startIfNecessary()
+        d.addCallback(lambda res: rbp.get_uri_extension())
+        d.addCallback(self._got_uri_extension)
+        d.addErrback(self._ueb_error)
+        return d
+
+    def _got_uri_extension(self, ueb):
+        self.log("_got_uri_extension", level=log.NOISY)
+        self._ueb_hash = hashutil.uri_extension_hash(ueb)
+        self._ueb_data = uri.unpack_extension(ueb)
+
+    def _ueb_error(self, f):
+        # an error means the file is unavailable, but the overall check
+        # shouldn't fail.
+        self.log("UEB fetch failed", failure=f, level=log.WEIRD)
+        return None
+
+    def _done(self, res):
+        if self._ueb_data:
+            found = len(self._found_shares)
+            total = self._ueb_data['total_shares']
+            self.log(format="got %(found)d shares of %(total)d",
+                     found=found, total=total, level=log.NOISY)
+            if found < total:
+                # not all shares are present in the grid
+                self.log("not enough to qualify, file not found in grid",
+                         level=log.NOISY)
+                return False
+            # all shares are present
+            self.log("all shares present, file is found in grid",
+                     level=log.NOISY)
+            return (self._sharemap, self._ueb_data, self._ueb_hash)
+        # no shares are present
+        self.log("unable to find UEB data, file not found in grid",
+                 level=log.NOISY)
+        return False
+
+
 class CHKUploadHelper(Referenceable, upload.CHKUploader):
     """I am the helper-server -side counterpart to AssistedUploader. I handle
     peer selection, encoding, and share pushing. I read ciphertext from the
@@ -63,13 +174,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
             # encoding). The caller might be useful.
             self.log("partial ciphertext already present", level=log.UNUSUAL)
             return ({}, self)
-        # we don't remember uploading this file, but it might already be in
-        # the grid. For now we do an unconditional upload. TODO: Do a quick
-        # checker run (send one query to each storage server) to see who has
-        # the file. Then accomodate a lazy uploader by retrieving the UEB
-        # from one of the shares and hash it.
-        #return ({'uri_extension_hash': hashutil.uri_extension_hash("")},self)
-        self.log("no record of having uploaded the file", level=log.NOISY)
+        # we don't remember uploading this file
+        self.log("no ciphertext yet", level=log.NOISY)
         return ({}, self)
 
     def remote_upload(self, reader):
@@ -352,13 +458,47 @@ class Helper(Referenceable, service.MultiService):
         if storage_index in self._active_uploads:
             self.log("upload is currently active", parent=lp)
             uh = self._active_uploads[storage_index]
-        else:
-            self.log("creating new upload helper", parent=lp)
-            uh = self.chk_upload_helper_class(storage_index, self,
-                                              incoming_file, encoding_file,
-                                              lp)
-            self._active_uploads[storage_index] = uh
-        return uh.start()
+            return uh.start()
+
+        d = self._check_for_chk_already_in_grid(storage_index, lp)
+        def _checked(upload_results):
+            if upload_results:
+                return (upload_results, None)
+
+            # the file is not present in the grid, by which we mean there are
+            # less than 'N' shares available.
+            self.log("unable to find file in the grid", level=log.NOISY)
+            # We need an upload helper. Check our active uploads again in
+            # case there was a race.
+            if storage_index in self._active_uploads:
+                self.log("upload is currently active", parent=lp)
+                uh = self._active_uploads[storage_index]
+            else:
+                self.log("creating new upload helper", parent=lp)
+                uh = self.chk_upload_helper_class(storage_index, self,
+                                                  incoming_file, encoding_file,
+                                                  lp)
+                self._active_uploads[storage_index] = uh
+            return uh.start()
+        d.addCallback(_checked)
+        return d
+
+    def _check_for_chk_already_in_grid(self, storage_index, lp):
+        # see if this file is already in the grid
+        lp2 = self.log("doing a quick check+UEBfetch",
+                       parent=lp, level=log.NOISY)
+        c = CHKCheckerAndUEBFetcher(self.parent.get_permuted_peers,
+                                    storage_index, lp2)
+        d = c.check()
+        def _checked(res):
+            if res:
+                (sharemap, ueb_data, ueb_hash) = res
+                self.log("found file in grid", level=log.NOISY, parent=lp)
+                upload_results = {'uri_extension_hash': ueb_hash}
+                return upload_results
+            return False
+        d.addCallback(_checked)
+        return d
 
     def upload_finished(self, storage_index):
         del self._active_uploads[storage_index]
index fd8cbf80bf626aee9c66ab41e643c3bc91815b2b..cf766ac732605a9e66ba6c03018884ade9cbabed 100644 (file)
@@ -43,6 +43,8 @@ class FakeClient(service.MultiService):
         return True
     def get_encoding_parameters(self):
         return self.DEFAULT_ENCODING_PARAMETERS
+    def get_permuted_peers(self, storage_index):
+        return []
 
 def flush_but_dont_ignore(res):
     d = eventual.flushEventualQueue()
index 842adb2268d0003a3c389c92c0232ae4a03a2d46..75de3da627acabedccfcd27ac4c68bc4a3de7517 100644 (file)
@@ -289,19 +289,35 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
             extra_node.getServiceNamed("storageserver").sizelimit = 0
         d.addCallback(_added)
 
+        HELPER_DATA = "Data that needs help to upload" * 1000
         def _upload_with_helper(res):
-            DATA = "Data that needs help to upload" * 1000
-            u = upload.Data(DATA, contenthashkey=contenthashkey)
+            u = upload.Data(HELPER_DATA, contenthashkey=contenthashkey)
             d = self.extra_node.upload(u)
             def _uploaded(uri):
                 return self.downloader.download_to_data(uri)
             d.addCallback(_uploaded)
             def _check(newdata):
-                self.failUnlessEqual(newdata, DATA)
+                self.failUnlessEqual(newdata, HELPER_DATA)
             d.addCallback(_check)
             return d
         d.addCallback(_upload_with_helper)
 
+        def _upload_duplicate_with_helper(res):
+            u = upload.Data(HELPER_DATA, contenthashkey=contenthashkey)
+            u.debug_stash_RemoteEncryptedUploadable = True
+            d = self.extra_node.upload(u)
+            def _uploaded(uri):
+                return self.downloader.download_to_data(uri)
+            d.addCallback(_uploaded)
+            def _check(newdata):
+                self.failUnlessEqual(newdata, HELPER_DATA)
+                self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
+                            "uploadable started uploading, should have been avoided")
+            d.addCallback(_check)
+            return d
+        if contenthashkey:
+            d.addCallback(_upload_duplicate_with_helper)
+
         def _upload_resumable(res):
             DATA = "Data that needs help to upload and gets interrupted" * 1000
             u1 = upload.Data(DATA, contenthashkey=contenthashkey)