From: Brian Warner Date: Mon, 28 Jan 2008 19:58:13 +0000 (-0700) Subject: CHK upload helper: don't let one failed upload prevent us from trying again X-Git-Tag: allmydata-tahoe-0.8.0~220 X-Git-Url: https://git.rkrishnan.org/specifications/-?a=commitdiff_plain;h=69a0b5cc001ff5dfbfd87d748e63969e446abada;p=tahoe-lafs%2Ftahoe-lafs.git CHK upload helper: don't let one failed upload prevent us from trying again --- diff --git a/src/allmydata/offloaded.py b/src/allmydata/offloaded.py index 584c0c01..7549a4e7 100644 --- a/src/allmydata/offloaded.py +++ b/src/allmydata/offloaded.py @@ -4,6 +4,7 @@ from zope.interface import implements from twisted.application import service from twisted.internet import defer from foolscap import Referenceable +from foolscap.eventual import eventually from allmydata import upload, interfaces from allmydata.util import idlib, log, observer, fileutil @@ -53,11 +54,10 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): # If not, return (UploadResults,None) . self.log("deciding whether to upload the file or not", level=log.NOISY) if os.path.exists(self._encoding_file): - # we have the whole file, and we're currently encoding it. The - # caller will get to see the results when we're done. TODO: how - # should they get upload progress in this case? - self.log("encoding in progress", level=log.UNUSUAL) - return self._finished_observers.when_fired() + # we have the whole file, and we might be encoding it (or the + # encode/upload might have failed, and we need to restart it). + self.log("ciphertext already in place", level=log.UNUSUAL) + return ({}, self) if os.path.exists(self._incoming_file): # we have some of the file, but not all of it (otherwise we'd be # encoding). The caller might be useful. @@ -76,11 +76,6 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): # reader is an RIEncryptedUploadable. I am specified to return an # UploadResults dictionary. - if os.path.exists(self._encoding_file): - # we've already started encoding, so we have no use for the - # reader. Notify them when we're done. - return self._finished_observers.when_fired() - # let our fetcher pull ciphertext from the reader. self._fetcher.add_reader(reader) # and also hashes @@ -159,13 +154,18 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin): def add_reader(self, reader): AskUntilSuccessMixin.add_reader(self, reader) - self._start() + eventually(self._start) def _start(self): if self._started: return self._started = True + if os.path.exists(self._encoding_file): + self.log("ciphertext already present, bypassing fetch", + level=log.UNUSUAL) + return self._done2() + # first, find out how large the file is going to be d = self.call("get_size") d.addCallback(self._got_size) @@ -249,11 +249,14 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin): def _done(self, res): self._f.close() self._f = None - self._readers = [] self.log(format="done fetching ciphertext, size=%(size)d", size=os.stat(self._incoming_file)[stat.ST_SIZE], level=log.NOISY) os.rename(self._incoming_file, self._encoding_file) + return self._done2() + + def _done2(self): + self._readers = [] self._done_observers.fire(None) def _failed(self, f): diff --git a/src/allmydata/test/test_helper.py b/src/allmydata/test/test_helper.py index 7707a151..fd8cbf80 100644 --- a/src/allmydata/test/test_helper.py +++ b/src/allmydata/test/test_helper.py @@ -7,7 +7,8 @@ from foolscap import Tub, eventual from foolscap.logging import log from allmydata import upload, offloaded -from allmydata.util import hashutil, fileutil +from allmydata.util import hashutil, fileutil, idlib +from pycryptopp.cipher.aes import AES MiB = 1024*1024 @@ -106,6 +107,48 @@ class AssistedUpload(unittest.TestCase): return d + def test_previous_upload_failed(self): + self.basedir = "helper/AssistedUpload/test_previous_upload_failed" + self.setUpHelper(self.basedir) + DATA = "I need help\n" * 1000 + + # we want to make sure that an upload which fails (leaving the + # ciphertext in the CHK_encoding/ directory) does not prevent a later + # attempt to upload that file from working. We simulate this by + # populating the directory manually. + key = hashutil.key_hash(DATA)[:16] + encryptor = AES(key) + SI = hashutil.storage_index_chk_hash(key) + SI_s = idlib.b2a(SI) + encfile = os.path.join(self.basedir, "CHK_encoding", SI_s) + f = open(encfile, "wb") + f.write(encryptor.process(DATA)) + f.close() + + u = upload.Uploader(self.helper_furl) + u.setServiceParent(self.s) + + # wait a few turns + d = eventual.fireEventually() + d.addCallback(eventual.fireEventually) + d.addCallback(eventual.fireEventually) + + def _ready(res): + assert u._helper + return u.upload_data(DATA) + d.addCallback(_ready) + def _uploaded(uri): + assert "CHK" in uri + d.addCallback(_uploaded) + + def _check_empty(res): + files = os.listdir(os.path.join(self.basedir, "CHK_encoding")) + self.failUnlessEqual(files, []) + files = os.listdir(os.path.join(self.basedir, "CHK_incoming")) + self.failUnlessEqual(files, []) + d.addCallback(_check_empty) + + return d def test_already_uploaded(self): self.basedir = "helper/AssistedUpload/test_already_uploaded"