]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
CHK upload helper: don't let one failed upload prevent us from trying again
authorBrian Warner <warner@allmydata.com>
Mon, 28 Jan 2008 19:58:13 +0000 (12:58 -0700)
committerBrian Warner <warner@allmydata.com>
Mon, 28 Jan 2008 19:58:13 +0000 (12:58 -0700)
src/allmydata/offloaded.py
src/allmydata/test/test_helper.py

index 584c0c0179505cdfd4b4160748ae4bf07f58e3a8..7549a4e75ac82d1f21cb3ca5a29c820bd0318604 100644 (file)
@@ -4,6 +4,7 @@ from zope.interface import implements
 from twisted.application import service
 from twisted.internet import defer
 from foolscap import Referenceable
+from foolscap.eventual import eventually
 from allmydata import upload, interfaces
 from allmydata.util import idlib, log, observer, fileutil
 
@@ -53,11 +54,10 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
         # If not, return (UploadResults,None) .
         self.log("deciding whether to upload the file or not", level=log.NOISY)
         if os.path.exists(self._encoding_file):
-            # we have the whole file, and we're currently encoding it. The
-            # caller will get to see the results when we're done. TODO: how
-            # should they get upload progress in this case?
-            self.log("encoding in progress", level=log.UNUSUAL)
-            return self._finished_observers.when_fired()
+            # we have the whole file, and we might be encoding it (or the
+            # encode/upload might have failed, and we need to restart it).
+            self.log("ciphertext already in place", level=log.UNUSUAL)
+            return ({}, self)
         if os.path.exists(self._incoming_file):
             # we have some of the file, but not all of it (otherwise we'd be
             # encoding). The caller might be useful.
@@ -76,11 +76,6 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
         # reader is an RIEncryptedUploadable. I am specified to return an
         # UploadResults dictionary.
 
-        if os.path.exists(self._encoding_file):
-            # we've already started encoding, so we have no use for the
-            # reader. Notify them when we're done.
-            return self._finished_observers.when_fired()
-
         # let our fetcher pull ciphertext from the reader.
         self._fetcher.add_reader(reader)
         # and also hashes
@@ -159,13 +154,18 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
 
     def add_reader(self, reader):
         AskUntilSuccessMixin.add_reader(self, reader)
-        self._start()
+        eventually(self._start)
 
     def _start(self):
         if self._started:
             return
         self._started = True
 
+        if os.path.exists(self._encoding_file):
+            self.log("ciphertext already present, bypassing fetch",
+                     level=log.UNUSUAL)
+            return self._done2()
+
         # first, find out how large the file is going to be
         d = self.call("get_size")
         d.addCallback(self._got_size)
@@ -249,11 +249,14 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin):
     def _done(self, res):
         self._f.close()
         self._f = None
-        self._readers = []
         self.log(format="done fetching ciphertext, size=%(size)d",
                  size=os.stat(self._incoming_file)[stat.ST_SIZE],
                  level=log.NOISY)
         os.rename(self._incoming_file, self._encoding_file)
+        return self._done2()
+
+    def _done2(self):
+        self._readers = []
         self._done_observers.fire(None)
 
     def _failed(self, f):
index 7707a15158a1318810c8486d1d5992921904d2ef..fd8cbf80bf626aee9c66ab41e643c3bc91815b2b 100644 (file)
@@ -7,7 +7,8 @@ from foolscap import Tub, eventual
 from foolscap.logging import log
 
 from allmydata import upload, offloaded
-from allmydata.util import hashutil, fileutil
+from allmydata.util import hashutil, fileutil, idlib
+from pycryptopp.cipher.aes import AES
 
 MiB = 1024*1024
 
@@ -106,6 +107,48 @@ class AssistedUpload(unittest.TestCase):
 
         return d
 
+    def test_previous_upload_failed(self):
+        self.basedir = "helper/AssistedUpload/test_previous_upload_failed"
+        self.setUpHelper(self.basedir)
+        DATA = "I need help\n" * 1000
+
+        # we want to make sure that an upload which fails (leaving the
+        # ciphertext in the CHK_encoding/ directory) does not prevent a later
+        # attempt to upload that file from working. We simulate this by
+        # populating the directory manually.
+        key = hashutil.key_hash(DATA)[:16]
+        encryptor = AES(key)
+        SI = hashutil.storage_index_chk_hash(key)
+        SI_s = idlib.b2a(SI)
+        encfile = os.path.join(self.basedir, "CHK_encoding", SI_s)
+        f = open(encfile, "wb")
+        f.write(encryptor.process(DATA))
+        f.close()
+
+        u = upload.Uploader(self.helper_furl)
+        u.setServiceParent(self.s)
+
+        # wait a few turns
+        d = eventual.fireEventually()
+        d.addCallback(eventual.fireEventually)
+        d.addCallback(eventual.fireEventually)
+
+        def _ready(res):
+            assert u._helper
+            return u.upload_data(DATA)
+        d.addCallback(_ready)
+        def _uploaded(uri):
+            assert "CHK" in uri
+        d.addCallback(_uploaded)
+
+        def _check_empty(res):
+            files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
+            self.failUnlessEqual(files, [])
+            files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
+            self.failUnlessEqual(files, [])
+        d.addCallback(_check_empty)
+
+        return d
 
     def test_already_uploaded(self):
         self.basedir = "helper/AssistedUpload/test_already_uploaded"