Refactor some behavior into a mixin, and add tests for the behavior described in...
author"Kevan Carstensen" <kevan@isnotajoke.com>
Fri, 30 Oct 2009 09:19:08 +0000 (02:19 -0700)
committer"Kevan Carstensen" <kevan@isnotajoke.com>
Fri, 30 Oct 2009 09:19:08 +0000 (02:19 -0700)
src/allmydata/test/test_upload.py

index 22497f3ec9d12e6d176cfe7fb2e7d2a75233a5c7..2370e489d379ceb63dfe52729ba5483480824f4d 100644 (file)
@@ -1,5 +1,5 @@
 
-import os
+import os, shutil
 from cStringIO import StringIO
 from twisted.trial import unittest
 from twisted.python.failure import Failure
@@ -9,7 +9,7 @@ from foolscap.api import fireEventually
 
 import allmydata # for __full_version__
 from allmydata import uri, monitor, client
-from allmydata.immutable import upload
+from allmydata.immutable import upload, encode
 from allmydata.interfaces import FileTooLargeError, NoSharesError, \
      NotEnoughSharesError
 from allmydata.util.assertutil import precondition
@@ -17,6 +17,7 @@ from allmydata.util.deferredutil import DeferredListShouldSucceed
 from no_network import GridTestMixin
 from common_util import ShouldFailMixin
 from allmydata.storage_client import StorageFarmBroker
+from allmydata.storage.server import storage_index_to_dir
 
 MiB = 1024*1024
 
@@ -87,6 +88,15 @@ class Uploadable(unittest.TestCase):
 class ServerError(Exception):
     pass
 
+class SetDEPMixin:
+    def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
+        p = {"k": k,
+             "happy": happy,
+             "n": n,
+             "max_segment_size": max_segsize,
+             }
+        self.node.DEFAULT_ENCODING_PARAMETERS = p
+
 class FakeStorageServer:
     def __init__(self, mode):
         self.mode = mode
@@ -234,21 +244,13 @@ def upload_filehandle(uploader, fh):
     u = upload.FileHandle(fh, convergence=None)
     return uploader.upload(u)
 
-class GoodServer(unittest.TestCase, ShouldFailMixin):
+class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
     def setUp(self):
         self.node = FakeClient(mode="good")
         self.u = upload.Uploader()
         self.u.running = True
         self.u.parent = self.node
 
-    def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
-        p = {"k": k,
-             "happy": happy,
-             "n": n,
-             "max_segment_size": max_segsize,
-             }
-        self.node.DEFAULT_ENCODING_PARAMETERS = p
-
     def _check_small(self, newuri, size):
         u = uri.from_string(newuri)
         self.failUnless(isinstance(u, uri.LiteralFileURI))
@@ -372,7 +374,7 @@ class GoodServer(unittest.TestCase, ShouldFailMixin):
         d.addCallback(self._check_large, SIZE_LARGE)
         return d
 
-class ServerErrors(unittest.TestCase, ShouldFailMixin):
+class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
     def make_node(self, mode, num_servers=10):
         self.node = FakeClient(mode, num_servers)
         self.u = upload.Uploader()
@@ -672,7 +674,94 @@ class StorageIndex(unittest.TestCase):
         d.addCallback(_done)
         return d
 
-class EncodingParameters(GridTestMixin, unittest.TestCase):
+class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
+    ShouldFailMixin):
+    def _do_upload_with_broken_servers(self, servers_to_break):
+        """
+        I act like a normal upload, but before I send the results of
+        Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
+        PeerTrackers in the used_peers part of the return result.
+        """
+        assert self.g, "I tried to find a grid at self.g, but failed"
+        broker = self.g.clients[0].storage_broker
+        sh     = self.g.clients[0]._secret_holder
+        data = upload.Data("data" * 10000, convergence="")
+        data.encoding_param_k = 3
+        data.encoding_param_happy = 4
+        data.encoding_param_n = 10
+        uploadable = upload.EncryptAnUploadable(data)
+        encoder = encode.Encoder()
+        encoder.set_encrypted_uploadable(uploadable)
+        status = upload.UploadStatus()
+        selector = upload.Tahoe2PeerSelector("dglev", "test", status)
+        storage_index = encoder.get_param("storage_index")
+        share_size = encoder.get_param("share_size")
+        block_size = encoder.get_param("block_size")
+        num_segments = encoder.get_param("num_segments")
+        d = selector.get_shareholders(broker, sh, storage_index,
+                                      share_size, block_size, num_segments,
+                                      10, 4)
+        def _have_shareholders((used_peers, already_peers)):
+            assert servers_to_break <= len(used_peers)
+            for index in xrange(servers_to_break):
+                server = list(used_peers)[index]
+                for share in server.buckets.keys():
+                    server.buckets[share].abort()
+            buckets = {}
+            for peer in used_peers:
+                buckets.update(peer.buckets)
+            encoder.set_shareholders(buckets)
+            d = encoder.start()
+            return d
+        d.addCallback(_have_shareholders)
+        return d
+
+    def _add_server_with_share(self, server_number, share_number=None,
+                               readonly=False):
+        assert self.g, "I tried to find a grid at self.g, but failed"
+        assert self.shares, "I tried to find shares at self.shares, but failed"
+        ss = self.g.make_server(server_number, readonly)
+        self.g.add_server(server_number, ss)
+        if share_number:
+            # Copy share i from the directory associated with the first 
+            # storage server to the directory associated with this one.
+            old_share_location = self.shares[share_number][2]
+            new_share_location = os.path.join(ss.storedir, "shares")
+            si = uri.from_string(self.uri).get_storage_index()
+            new_share_location = os.path.join(new_share_location,
+                                              storage_index_to_dir(si))
+            if not os.path.exists(new_share_location):
+                os.makedirs(new_share_location)
+            new_share_location = os.path.join(new_share_location,
+                                              str(share_number))
+            shutil.copy(old_share_location, new_share_location)
+            shares = self.find_shares(self.uri)
+            # Make sure that the storage server has the share.
+            self.failUnless((share_number, ss.my_nodeid, new_share_location)
+                            in shares)
+
+    def _setup_and_upload(self):
+        """
+        I set up a NoNetworkGrid with a single server and client,
+        upload a file to it, store its uri in self.uri, and store its
+        sharedata in self.shares.
+        """
+        self.set_up_grid(num_clients=1, num_servers=1)
+        client = self.g.clients[0]
+        client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
+        data = upload.Data("data" * 10000, convergence="")
+        self.data = data
+        d = client.upload(data)
+        def _store_uri(ur):
+            self.uri = ur.uri
+        d.addCallback(_store_uri)
+        d.addCallback(lambda ign:
+            self.find_shares(self.uri))
+        def _store_shares(shares):
+            self.shares = shares
+        d.addCallback(_store_shares)
+        return d
+
     def test_configure_parameters(self):
         self.basedir = self.mktemp()
         hooks = {0: self._set_up_nodes_extra_config}
@@ -692,6 +781,233 @@ class EncodingParameters(GridTestMixin, unittest.TestCase):
         d.addCallback(_check)
         return d
 
+    def _setUp(self, ns):
+        # Used by test_happy_semantics and test_prexisting_share_behavior
+        # to set up the grid.
+        self.node = FakeClient(mode="good", num_servers=ns)
+        self.u = upload.Uploader()
+        self.u.running = True
+        self.u.parent = self.node
+
+    def test_happy_semantics(self):
+        self._setUp(2)
+        DATA = upload.Data("kittens" * 10000, convergence="")
+        # These parameters are unsatisfiable with the client that we've made
+        # -- we'll use them to test that the semnatics work correctly.
+        self.set_encoding_parameters(k=3, happy=5, n=10)
+        d = self.shouldFail(NotEnoughSharesError, "test_happy_semantics",
+                            "shares could only be placed on 2 servers "
+                            "(5 were requested)",
+                            self.u.upload, DATA)
+        # Let's reset the client to have 10 servers
+        d.addCallback(lambda ign:
+            self._setUp(10))
+        # These parameters are satisfiable with the client we've made.
+        d.addCallback(lambda ign:
+            self.set_encoding_parameters(k=3, happy=5, n=10))
+        # this should work
+        d.addCallback(lambda ign:
+            self.u.upload(DATA))
+        # Let's reset the client to have 7 servers
+        # (this is less than n, but more than h)
+        d.addCallback(lambda ign:
+            self._setUp(7))
+        # These encoding parameters should still be satisfiable with our 
+        # client setup
+        d.addCallback(lambda ign:
+            self.set_encoding_parameters(k=3, happy=5, n=10))
+        # This, then, should work.
+        d.addCallback(lambda ign:
+            self.u.upload(DATA))
+        return d
+
+    def test_problem_layouts(self):
+        self.basedir = self.mktemp()
+        # This scenario is at 
+        # http://allmydata.org/trac/tahoe/ticket/778#comment:52
+        #
+        # The scenario in comment:52 proposes that we have a layout
+        # like:
+        # server 1: share 1
+        # server 2: share 1
+        # server 3: share 1
+        # server 4: shares 2 - 10
+        # To get access to the shares, we will first upload to one 
+        # server, which will then have shares 1 - 10. We'll then 
+        # add three new servers, configure them to not accept any new
+        # shares, then write share 1 directly into the serverdir of each.
+        # Then each of servers 1 - 3 will report that they have share 1, 
+        # and will not accept any new share, while server 4 will report that
+        # it has shares 2 - 10 and will accept new shares.
+        # We'll then set 'happy' = 4, and see that an upload fails
+        # (as it should)
+        d = self._setup_and_upload()
+        d.addCallback(lambda ign:
+            self._add_server_with_share(1, 0, True))
+        d.addCallback(lambda ign:
+            self._add_server_with_share(2, 0, True))
+        d.addCallback(lambda ign:
+            self._add_server_with_share(3, 0, True))
+        # Remove the first share from server 0.
+        def _remove_share_0():
+            share_location = self.shares[0][2]
+            os.remove(share_location)
+        d.addCallback(lambda ign:
+            _remove_share_0())
+        # Set happy = 4 in the client.
+        def _prepare():
+            client = self.g.clients[0]
+            client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
+            return client
+        d.addCallback(lambda ign:
+            _prepare())
+        # Uploading data should fail
+        d.addCallback(lambda client:
+            self.shouldFail(NotEnoughSharesError, "test_happy_semantics",
+                            "shares could only be placed on 1 servers "
+                            "(4 were requested)",
+                            client.upload, upload.Data("data" * 10000,
+                                                       convergence="")))
+
+
+        # This scenario is at
+        # http://allmydata.org/trac/tahoe/ticket/778#comment:53
+        #
+        # Set up the grid to have one server
+        def _change_basedir(ign):
+            self.basedir = self.mktemp()
+        d.addCallback(_change_basedir)
+        d.addCallback(lambda ign:
+            self._setup_and_upload())
+        # We want to have a layout like this:
+        # server 1: share 1
+        # server 2: share 2
+        # server 3: share 3
+        # server 4: shares 1 - 10
+        # (this is an expansion of Zooko's example because it is easier
+        #  to code, but it will fail in the same way)
+        # To start, we'll create a server with shares 1-10 of the data 
+        # we're about to upload.
+        # Next, we'll add three new servers to our NoNetworkGrid. We'll add
+        # one share from our initial upload to each of these.
+        # The counterintuitive ordering of the share numbers is to deal with 
+        # the permuting of these servers -- distributing the shares this 
+        # way ensures that the Tahoe2PeerSelector sees them in the order 
+        # described above.
+        d.addCallback(lambda ign:
+            self._add_server_with_share(server_number=1, share_number=2))
+        d.addCallback(lambda ign:
+            self._add_server_with_share(server_number=2, share_number=0))
+        d.addCallback(lambda ign:
+            self._add_server_with_share(server_number=3, share_number=1))
+        # So, we now have the following layout:
+        # server 0: shares 1 - 10
+        # server 1: share 0
+        # server 2: share 1
+        # server 3: share 2
+        # We want to change the 'happy' parameter in the client to 4. 
+        # We then want to feed the upload process a list of peers that
+        # server 0 is at the front of, so we trigger Zooko's scenario.
+        # Ideally, a reupload of our original data should work.
+        def _reset_encoding_parameters(ign):
+            client = self.g.clients[0]
+            client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
+            return client
+        d.addCallback(_reset_encoding_parameters)
+        # We need this to get around the fact that the old Data 
+        # instance already has a happy parameter set.
+        d.addCallback(lambda client:
+            client.upload(upload.Data("data" * 10000, convergence="")))
+        return d
+
+
+    def test_dropped_servers_in_encoder(self):
+        def _set_basedir(ign=None):
+            self.basedir = self.mktemp()
+        _set_basedir()
+        d = self._setup_and_upload();
+        # Add 5 servers, with one share each from the original
+        # Add a readonly server
+        def _do_server_setup(ign):
+            self._add_server_with_share(1, 1, True)
+            self._add_server_with_share(2)
+            self._add_server_with_share(3)
+            self._add_server_with_share(4)
+            self._add_server_with_share(5)
+        d.addCallback(_do_server_setup)
+        # remove the original server
+        # (necessary to ensure that the Tahoe2PeerSelector will distribute
+        #  all the shares)
+        def _remove_server(ign):
+            server = self.g.servers_by_number[0]
+            self.g.remove_server(server.my_nodeid)
+        d.addCallback(_remove_server)
+        # This should succeed.
+        d.addCallback(lambda ign:
+            self._do_upload_with_broken_servers(1))
+        # Now, do the same thing over again, but drop 2 servers instead
+        # of 1. This should fail.
+        d.addCallback(_set_basedir)
+        d.addCallback(lambda ign:
+            self._setup_and_upload())
+        d.addCallback(_do_server_setup)
+        d.addCallback(_remove_server)
+        d.addCallback(lambda ign:
+            self.shouldFail(NotEnoughSharesError,
+                            "test_dropped_server_in_encoder", "",
+                            self._do_upload_with_broken_servers, 2))
+        return d
+
+
+    def test_servers_with_unique_shares(self):
+        # servers_with_unique_shares expects a dict of 
+        # shnum => peerid as a preexisting shares argument.
+        test1 = {
+                 1 : "server1",
+                 2 : "server2",
+                 3 : "server3",
+                 4 : "server4"
+                }
+        unique_servers = upload.servers_with_unique_shares(test1)
+        self.failUnlessEqual(4, len(unique_servers))
+        for server in ["server1", "server2", "server3", "server4"]:
+            self.failUnlessIn(server, unique_servers)
+        test1[4] = "server1"
+        # Now there should only be 3 unique servers.
+        unique_servers = upload.servers_with_unique_shares(test1)
+        self.failUnlessEqual(3, len(unique_servers))
+        for server in ["server1", "server2", "server3"]:
+            self.failUnlessIn(server, unique_servers)
+        # servers_with_unique_shares expects a set of PeerTracker
+        # instances as a used_peers argument, but only uses the peerid
+        # instance variable to assess uniqueness. So we feed it some fake
+        # PeerTrackers whose only important characteristic is that they 
+        # have peerid set to something.
+        class FakePeerTracker:
+            pass
+        trackers = []
+        for server in ["server5", "server6", "server7", "server8"]:
+            t = FakePeerTracker()
+            t.peerid = server
+            trackers.append(t)
+        # Recall that there are 3 unique servers in test1. Since none of
+        # those overlap with the ones in trackers, we should get 7 back
+        unique_servers = upload.servers_with_unique_shares(test1, set(trackers))
+        self.failUnlessEqual(7, len(unique_servers))
+        expected_servers = ["server" + str(i) for i in xrange(1, 9)]
+        expected_servers.remove("server4")
+        for server in expected_servers:
+            self.failUnlessIn(server, unique_servers)
+        # Now add an overlapping server to trackers.
+        t = FakePeerTracker()
+        t.peerid = "server1"
+        trackers.append(t)
+        unique_servers = upload.servers_with_unique_shares(test1, set(trackers))
+        self.failUnlessEqual(7, len(unique_servers))
+        for server in expected_servers:
+            self.failUnlessIn(server, unique_servers)
+
+
     def _set_up_nodes_extra_config(self, clientdir):
         cfgfn = os.path.join(clientdir, "tahoe.cfg")
         oldcfg = open(cfgfn, "r").read()