From: Brian Warner Date: Thu, 19 Apr 2007 01:29:10 +0000 (-0700) Subject: encode: handle uploads of the same file multiple times. Unfortunately we have to... X-Git-Tag: tahoe_v0.1.0-0-UNSTABLE~48 X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/simplejson/cyclelanguage?a=commitdiff_plain;h=2d0e240466b55fffb7cacbcfe09ae14a30cf8037;p=tahoe-lafs%2Ftahoe-lafs.git encode: handle uploads of the same file multiple times. Unfortunately we have to do almost as much work the second time around, to compute the full URI --- diff --git a/src/allmydata/encode.py b/src/allmydata/encode.py index 9cdb8751..8e2bcc95 100644 --- a/src/allmydata/encode.py +++ b/src/allmydata/encode.py @@ -210,7 +210,12 @@ class Encoder(object): return d def _encoded_segment(self, (shares, shareids), segnum): - _assert(set(shareids) == set(self.landlords.keys()), + # To generate the URI, we must generate the roothash, so we must + # generate all shares, even if we aren't actually giving them to + # anybody. This means that the set of share we create will be equal + # to or larger than the set of landlords. If we have any landlord who + # *doesn't* have a share, that's an error. + _assert(set(self.landlords.keys()).issubset(set(shareids)), shareids=shareids, landlords=self.landlords) dl = [] for i in range(len(shares)): @@ -228,6 +233,8 @@ class Encoder(object): return dl def send_subshare(self, shareid, segment_num, subshare): + if shareid not in self.landlords: + return defer.succeed(None) sh = self.landlords[shareid] return sh.callRemote("put_block", segment_num, subshare) @@ -247,6 +254,8 @@ class Encoder(object): # all_hashes[1] is the left child, == hash(ah[3]+ah[4]) # all_hashes[n] == hash(all_hashes[2*n+1] + all_hashes[2*n+2]) self.share_root_hashes[shareid] = t[0] + if shareid not in self.landlords: + return defer.succeed(None) sh = self.landlords[shareid] return sh.callRemote("put_block_hashes", all_hashes) @@ -273,13 +282,15 @@ class Encoder(object): return defer.DeferredList(dl) def send_one_share_hash_tree(self, shareid, needed_hashes): + if shareid not in self.landlords: + return defer.succeed(None) sh = self.landlords[shareid] return sh.callRemote("put_share_hashes", needed_hashes) def close_all_shareholders(self): log.msg("%s: closing shareholders" % self) dl = [] - for shareid in range(self.num_shares): + for shareid in self.landlords: dl.append(self.landlords[shareid].callRemote("close")) return defer.DeferredList(dl) diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index d9615dbf..00e903f2 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -3,10 +3,11 @@ import os from twisted.trial import unittest from twisted.internet import defer, reactor from twisted.application import service -from allmydata import client, queen +from allmydata import client, queen, uri, download from allmydata.util import idlib, fileutil from foolscap.eventual import flushEventualQueue from twisted.python import log +from twisted.python.failure import Failure from twisted.web.client import getPage def flush_but_dont_ignore(res): @@ -129,6 +130,7 @@ class SystemTest(unittest.TestCase): def _do_upload(res): log.msg("UPLOADING") u = self.clients[0].getServiceNamed("uploader") + self.uploader = u # we crank the max segsize down to 1024b for the duration of this # test, so we can exercise multiple segments. It is important # that this is not a multiple of the segment size, so that the @@ -144,13 +146,26 @@ class SystemTest(unittest.TestCase): self.uri = uri dl = self.clients[1].getServiceNamed("downloader") self.downloader = dl - d1 = dl.download_to_data(uri) - return d1 d.addCallback(_upload_done) - def _download_done(data): + + def _upload_again(res): + # upload again. This ought to be short-circuited, however with + # the way we currently generate URIs (i.e. because they include + # the roothash), we have to do all of the encoding work, and only + # get to save on the upload part. + log.msg("UPLOADING AGAIN") + options = {"max_segment_size": 1024} + d1 = self.uploader.upload_data(DATA, options) + d.addCallback(_upload_again) + + def _download_to_data(res): + log.msg("DOWNLOADING") + return self.downloader.download_to_data(self.uri) + d.addCallback(_download_to_data) + def _download_to_data_done(data): log.msg("download finished") self.failUnlessEqual(data, DATA) - d.addCallback(_download_done) + d.addCallback(_download_to_data_done) target_filename = os.path.join(self.basedir, "download.target") def _download_to_filename(res): @@ -173,9 +188,31 @@ class SystemTest(unittest.TestCase): self.failUnlessEqual(newdata, DATA) d.addCallback(_download_to_filehandle_done) + def _download_nonexistent_uri(res): + baduri = self.mangle_uri(self.uri) + d1 = self.downloader.download_to_data(baduri) + def _baduri_should_fail(res): + self.failUnless(isinstance(res, Failure)) + self.failUnless(res.check(download.NotEnoughPeersError)) + # TODO: files that have zero peers should get a special kind + # of NotEnoughPeersError, which can be used to suggest that + # the URI might be wrong or that they've nver uploaded the + # file in the first place. + d1.addBoth(_baduri_should_fail) + return d1 + d.addCallback(_download_nonexistent_uri) return d test_upload_and_download.timeout = 600 + def flip_bit(self, good): + return good[:-1] + chr(ord(good[-1]) ^ 0x01) + + def mangle_uri(self, gooduri): + pieces = list(uri.unpack_uri(gooduri)) + # [4] is the verifierid + pieces[4] = self.flip_bit(pieces[4]) + return uri.pack_uri(*pieces) + def test_vdrive(self): self.basedir = "test_system/SystemTest/test_vdrive" self.data = DATA = "Some data to publish to the virtual drive\n"