]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
encode: handle uploads of the same file multiple times. Unfortunately we have to...
authorBrian Warner <warner@allmydata.com>
Thu, 19 Apr 2007 01:29:10 +0000 (18:29 -0700)
committerBrian Warner <warner@allmydata.com>
Thu, 19 Apr 2007 01:29:10 +0000 (18:29 -0700)
src/allmydata/encode.py
src/allmydata/test/test_system.py

index 9cdb87510fd088417df7f84df67c3a6540c8f263..8e2bcc953ad78d87ee99f5f9fe9b76d9abaf9033 100644 (file)
@@ -210,7 +210,12 @@ class Encoder(object):
         return d
 
     def _encoded_segment(self, (shares, shareids), segnum):
-        _assert(set(shareids) == set(self.landlords.keys()),
+        # To generate the URI, we must generate the roothash, so we must
+        # generate all shares, even if we aren't actually giving them to
+        # anybody. This means that the set of share we create will be equal
+        # to or larger than the set of landlords. If we have any landlord who
+        # *doesn't* have a share, that's an error.
+        _assert(set(self.landlords.keys()).issubset(set(shareids)),
                 shareids=shareids, landlords=self.landlords)
         dl = []
         for i in range(len(shares)):
@@ -228,6 +233,8 @@ class Encoder(object):
         return dl
 
     def send_subshare(self, shareid, segment_num, subshare):
+        if shareid not in self.landlords:
+            return defer.succeed(None)
         sh = self.landlords[shareid]
         return sh.callRemote("put_block", segment_num, subshare)
 
@@ -247,6 +254,8 @@ class Encoder(object):
         # all_hashes[1] is the left child, == hash(ah[3]+ah[4])
         # all_hashes[n] == hash(all_hashes[2*n+1] + all_hashes[2*n+2])
         self.share_root_hashes[shareid] = t[0]
+        if shareid not in self.landlords:
+            return defer.succeed(None)
         sh = self.landlords[shareid]
         return sh.callRemote("put_block_hashes", all_hashes)
 
@@ -273,13 +282,15 @@ class Encoder(object):
         return defer.DeferredList(dl)
 
     def send_one_share_hash_tree(self, shareid, needed_hashes):
+        if shareid not in self.landlords:
+            return defer.succeed(None)
         sh = self.landlords[shareid]
         return sh.callRemote("put_share_hashes", needed_hashes)
 
     def close_all_shareholders(self):
         log.msg("%s: closing shareholders" % self)
         dl = []
-        for shareid in range(self.num_shares):
+        for shareid in self.landlords:
             dl.append(self.landlords[shareid].callRemote("close"))
         return defer.DeferredList(dl)
 
index d9615dbf2f7395ce7c3c59501d7efe5408336a18..00e903f223637ee80ac2a99570b4e3824b1f7fd9 100644 (file)
@@ -3,10 +3,11 @@ import os
 from twisted.trial import unittest
 from twisted.internet import defer, reactor
 from twisted.application import service
-from allmydata import client, queen
+from allmydata import client, queen, uri, download
 from allmydata.util import idlib, fileutil
 from foolscap.eventual import flushEventualQueue
 from twisted.python import log
+from twisted.python.failure import Failure
 from twisted.web.client import getPage
 
 def flush_but_dont_ignore(res):
@@ -129,6 +130,7 @@ class SystemTest(unittest.TestCase):
         def _do_upload(res):
             log.msg("UPLOADING")
             u = self.clients[0].getServiceNamed("uploader")
+            self.uploader = u
             # we crank the max segsize down to 1024b for the duration of this
             # test, so we can exercise multiple segments. It is important
             # that this is not a multiple of the segment size, so that the
@@ -144,13 +146,26 @@ class SystemTest(unittest.TestCase):
             self.uri = uri
             dl = self.clients[1].getServiceNamed("downloader")
             self.downloader = dl
-            d1 = dl.download_to_data(uri)
-            return d1
         d.addCallback(_upload_done)
-        def _download_done(data):
+
+        def _upload_again(res):
+            # upload again. This ought to be short-circuited, however with
+            # the way we currently generate URIs (i.e. because they include
+            # the roothash), we have to do all of the encoding work, and only
+            # get to save on the upload part.
+            log.msg("UPLOADING AGAIN")
+            options = {"max_segment_size": 1024}
+            d1 = self.uploader.upload_data(DATA, options)
+        d.addCallback(_upload_again)
+
+        def _download_to_data(res):
+            log.msg("DOWNLOADING")
+            return self.downloader.download_to_data(self.uri)
+        d.addCallback(_download_to_data)
+        def _download_to_data_done(data):
             log.msg("download finished")
             self.failUnlessEqual(data, DATA)
-        d.addCallback(_download_done)
+        d.addCallback(_download_to_data_done)
 
         target_filename = os.path.join(self.basedir, "download.target")
         def _download_to_filename(res):
@@ -173,9 +188,31 @@ class SystemTest(unittest.TestCase):
             self.failUnlessEqual(newdata, DATA)
         d.addCallback(_download_to_filehandle_done)
 
+        def _download_nonexistent_uri(res):
+            baduri = self.mangle_uri(self.uri)
+            d1 = self.downloader.download_to_data(baduri)
+            def _baduri_should_fail(res):
+                self.failUnless(isinstance(res, Failure))
+                self.failUnless(res.check(download.NotEnoughPeersError))
+                # TODO: files that have zero peers should get a special kind
+                # of NotEnoughPeersError, which can be used to suggest that
+                # the URI might be wrong or that they've nver uploaded the
+                # file in the first place.
+            d1.addBoth(_baduri_should_fail)
+            return d1
+        d.addCallback(_download_nonexistent_uri)
         return d
     test_upload_and_download.timeout = 600
 
+    def flip_bit(self, good):
+        return good[:-1] + chr(ord(good[-1]) ^ 0x01)
+
+    def mangle_uri(self, gooduri):
+        pieces = list(uri.unpack_uri(gooduri))
+        # [4] is the verifierid
+        pieces[4] = self.flip_bit(pieces[4])
+        return uri.pack_uri(*pieces)
+
     def test_vdrive(self):
         self.basedir = "test_system/SystemTest/test_vdrive"
         self.data = DATA = "Some data to publish to the virtual drive\n"