From: "Kevan Carstensen" Date: Fri, 30 Oct 2009 09:19:08 +0000 (-0700) Subject: Refactor some behavior into a mixin, and add tests for the behavior described in... X-Git-Url: https://git.rkrishnan.org/pf/content/en/service/contact.html?a=commitdiff_plain;h=ee9690b357808f2742067dd022de96d5a57573e3;p=tahoe-lafs%2Ftahoe-lafs.git Refactor some behavior into a mixin, and add tests for the behavior described in #778 --- diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index 22497f3e..2370e489 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -1,5 +1,5 @@ -import os +import os, shutil from cStringIO import StringIO from twisted.trial import unittest from twisted.python.failure import Failure @@ -9,7 +9,7 @@ from foolscap.api import fireEventually import allmydata # for __full_version__ from allmydata import uri, monitor, client -from allmydata.immutable import upload +from allmydata.immutable import upload, encode from allmydata.interfaces import FileTooLargeError, NoSharesError, \ NotEnoughSharesError from allmydata.util.assertutil import precondition @@ -17,6 +17,7 @@ from allmydata.util.deferredutil import DeferredListShouldSucceed from no_network import GridTestMixin from common_util import ShouldFailMixin from allmydata.storage_client import StorageFarmBroker +from allmydata.storage.server import storage_index_to_dir MiB = 1024*1024 @@ -87,6 +88,15 @@ class Uploadable(unittest.TestCase): class ServerError(Exception): pass +class SetDEPMixin: + def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB): + p = {"k": k, + "happy": happy, + "n": n, + "max_segment_size": max_segsize, + } + self.node.DEFAULT_ENCODING_PARAMETERS = p + class FakeStorageServer: def __init__(self, mode): self.mode = mode @@ -234,21 +244,13 @@ def upload_filehandle(uploader, fh): u = upload.FileHandle(fh, convergence=None) return uploader.upload(u) -class GoodServer(unittest.TestCase, ShouldFailMixin): +class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin): def setUp(self): self.node = FakeClient(mode="good") self.u = upload.Uploader() self.u.running = True self.u.parent = self.node - def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB): - p = {"k": k, - "happy": happy, - "n": n, - "max_segment_size": max_segsize, - } - self.node.DEFAULT_ENCODING_PARAMETERS = p - def _check_small(self, newuri, size): u = uri.from_string(newuri) self.failUnless(isinstance(u, uri.LiteralFileURI)) @@ -372,7 +374,7 @@ class GoodServer(unittest.TestCase, ShouldFailMixin): d.addCallback(self._check_large, SIZE_LARGE) return d -class ServerErrors(unittest.TestCase, ShouldFailMixin): +class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin): def make_node(self, mode, num_servers=10): self.node = FakeClient(mode, num_servers) self.u = upload.Uploader() @@ -672,7 +674,94 @@ class StorageIndex(unittest.TestCase): d.addCallback(_done) return d -class EncodingParameters(GridTestMixin, unittest.TestCase): +class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, + ShouldFailMixin): + def _do_upload_with_broken_servers(self, servers_to_break): + """ + I act like a normal upload, but before I send the results of + Tahoe2PeerSelector to the Encoder, I break the first servers_to_break + PeerTrackers in the used_peers part of the return result. + """ + assert self.g, "I tried to find a grid at self.g, but failed" + broker = self.g.clients[0].storage_broker + sh = self.g.clients[0]._secret_holder + data = upload.Data("data" * 10000, convergence="") + data.encoding_param_k = 3 + data.encoding_param_happy = 4 + data.encoding_param_n = 10 + uploadable = upload.EncryptAnUploadable(data) + encoder = encode.Encoder() + encoder.set_encrypted_uploadable(uploadable) + status = upload.UploadStatus() + selector = upload.Tahoe2PeerSelector("dglev", "test", status) + storage_index = encoder.get_param("storage_index") + share_size = encoder.get_param("share_size") + block_size = encoder.get_param("block_size") + num_segments = encoder.get_param("num_segments") + d = selector.get_shareholders(broker, sh, storage_index, + share_size, block_size, num_segments, + 10, 4) + def _have_shareholders((used_peers, already_peers)): + assert servers_to_break <= len(used_peers) + for index in xrange(servers_to_break): + server = list(used_peers)[index] + for share in server.buckets.keys(): + server.buckets[share].abort() + buckets = {} + for peer in used_peers: + buckets.update(peer.buckets) + encoder.set_shareholders(buckets) + d = encoder.start() + return d + d.addCallback(_have_shareholders) + return d + + def _add_server_with_share(self, server_number, share_number=None, + readonly=False): + assert self.g, "I tried to find a grid at self.g, but failed" + assert self.shares, "I tried to find shares at self.shares, but failed" + ss = self.g.make_server(server_number, readonly) + self.g.add_server(server_number, ss) + if share_number: + # Copy share i from the directory associated with the first + # storage server to the directory associated with this one. + old_share_location = self.shares[share_number][2] + new_share_location = os.path.join(ss.storedir, "shares") + si = uri.from_string(self.uri).get_storage_index() + new_share_location = os.path.join(new_share_location, + storage_index_to_dir(si)) + if not os.path.exists(new_share_location): + os.makedirs(new_share_location) + new_share_location = os.path.join(new_share_location, + str(share_number)) + shutil.copy(old_share_location, new_share_location) + shares = self.find_shares(self.uri) + # Make sure that the storage server has the share. + self.failUnless((share_number, ss.my_nodeid, new_share_location) + in shares) + + def _setup_and_upload(self): + """ + I set up a NoNetworkGrid with a single server and client, + upload a file to it, store its uri in self.uri, and store its + sharedata in self.shares. + """ + self.set_up_grid(num_clients=1, num_servers=1) + client = self.g.clients[0] + client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1 + data = upload.Data("data" * 10000, convergence="") + self.data = data + d = client.upload(data) + def _store_uri(ur): + self.uri = ur.uri + d.addCallback(_store_uri) + d.addCallback(lambda ign: + self.find_shares(self.uri)) + def _store_shares(shares): + self.shares = shares + d.addCallback(_store_shares) + return d + def test_configure_parameters(self): self.basedir = self.mktemp() hooks = {0: self._set_up_nodes_extra_config} @@ -692,6 +781,233 @@ class EncodingParameters(GridTestMixin, unittest.TestCase): d.addCallback(_check) return d + def _setUp(self, ns): + # Used by test_happy_semantics and test_prexisting_share_behavior + # to set up the grid. + self.node = FakeClient(mode="good", num_servers=ns) + self.u = upload.Uploader() + self.u.running = True + self.u.parent = self.node + + def test_happy_semantics(self): + self._setUp(2) + DATA = upload.Data("kittens" * 10000, convergence="") + # These parameters are unsatisfiable with the client that we've made + # -- we'll use them to test that the semnatics work correctly. + self.set_encoding_parameters(k=3, happy=5, n=10) + d = self.shouldFail(NotEnoughSharesError, "test_happy_semantics", + "shares could only be placed on 2 servers " + "(5 were requested)", + self.u.upload, DATA) + # Let's reset the client to have 10 servers + d.addCallback(lambda ign: + self._setUp(10)) + # These parameters are satisfiable with the client we've made. + d.addCallback(lambda ign: + self.set_encoding_parameters(k=3, happy=5, n=10)) + # this should work + d.addCallback(lambda ign: + self.u.upload(DATA)) + # Let's reset the client to have 7 servers + # (this is less than n, but more than h) + d.addCallback(lambda ign: + self._setUp(7)) + # These encoding parameters should still be satisfiable with our + # client setup + d.addCallback(lambda ign: + self.set_encoding_parameters(k=3, happy=5, n=10)) + # This, then, should work. + d.addCallback(lambda ign: + self.u.upload(DATA)) + return d + + def test_problem_layouts(self): + self.basedir = self.mktemp() + # This scenario is at + # http://allmydata.org/trac/tahoe/ticket/778#comment:52 + # + # The scenario in comment:52 proposes that we have a layout + # like: + # server 1: share 1 + # server 2: share 1 + # server 3: share 1 + # server 4: shares 2 - 10 + # To get access to the shares, we will first upload to one + # server, which will then have shares 1 - 10. We'll then + # add three new servers, configure them to not accept any new + # shares, then write share 1 directly into the serverdir of each. + # Then each of servers 1 - 3 will report that they have share 1, + # and will not accept any new share, while server 4 will report that + # it has shares 2 - 10 and will accept new shares. + # We'll then set 'happy' = 4, and see that an upload fails + # (as it should) + d = self._setup_and_upload() + d.addCallback(lambda ign: + self._add_server_with_share(1, 0, True)) + d.addCallback(lambda ign: + self._add_server_with_share(2, 0, True)) + d.addCallback(lambda ign: + self._add_server_with_share(3, 0, True)) + # Remove the first share from server 0. + def _remove_share_0(): + share_location = self.shares[0][2] + os.remove(share_location) + d.addCallback(lambda ign: + _remove_share_0()) + # Set happy = 4 in the client. + def _prepare(): + client = self.g.clients[0] + client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4 + return client + d.addCallback(lambda ign: + _prepare()) + # Uploading data should fail + d.addCallback(lambda client: + self.shouldFail(NotEnoughSharesError, "test_happy_semantics", + "shares could only be placed on 1 servers " + "(4 were requested)", + client.upload, upload.Data("data" * 10000, + convergence=""))) + + + # This scenario is at + # http://allmydata.org/trac/tahoe/ticket/778#comment:53 + # + # Set up the grid to have one server + def _change_basedir(ign): + self.basedir = self.mktemp() + d.addCallback(_change_basedir) + d.addCallback(lambda ign: + self._setup_and_upload()) + # We want to have a layout like this: + # server 1: share 1 + # server 2: share 2 + # server 3: share 3 + # server 4: shares 1 - 10 + # (this is an expansion of Zooko's example because it is easier + # to code, but it will fail in the same way) + # To start, we'll create a server with shares 1-10 of the data + # we're about to upload. + # Next, we'll add three new servers to our NoNetworkGrid. We'll add + # one share from our initial upload to each of these. + # The counterintuitive ordering of the share numbers is to deal with + # the permuting of these servers -- distributing the shares this + # way ensures that the Tahoe2PeerSelector sees them in the order + # described above. + d.addCallback(lambda ign: + self._add_server_with_share(server_number=1, share_number=2)) + d.addCallback(lambda ign: + self._add_server_with_share(server_number=2, share_number=0)) + d.addCallback(lambda ign: + self._add_server_with_share(server_number=3, share_number=1)) + # So, we now have the following layout: + # server 0: shares 1 - 10 + # server 1: share 0 + # server 2: share 1 + # server 3: share 2 + # We want to change the 'happy' parameter in the client to 4. + # We then want to feed the upload process a list of peers that + # server 0 is at the front of, so we trigger Zooko's scenario. + # Ideally, a reupload of our original data should work. + def _reset_encoding_parameters(ign): + client = self.g.clients[0] + client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4 + return client + d.addCallback(_reset_encoding_parameters) + # We need this to get around the fact that the old Data + # instance already has a happy parameter set. + d.addCallback(lambda client: + client.upload(upload.Data("data" * 10000, convergence=""))) + return d + + + def test_dropped_servers_in_encoder(self): + def _set_basedir(ign=None): + self.basedir = self.mktemp() + _set_basedir() + d = self._setup_and_upload(); + # Add 5 servers, with one share each from the original + # Add a readonly server + def _do_server_setup(ign): + self._add_server_with_share(1, 1, True) + self._add_server_with_share(2) + self._add_server_with_share(3) + self._add_server_with_share(4) + self._add_server_with_share(5) + d.addCallback(_do_server_setup) + # remove the original server + # (necessary to ensure that the Tahoe2PeerSelector will distribute + # all the shares) + def _remove_server(ign): + server = self.g.servers_by_number[0] + self.g.remove_server(server.my_nodeid) + d.addCallback(_remove_server) + # This should succeed. + d.addCallback(lambda ign: + self._do_upload_with_broken_servers(1)) + # Now, do the same thing over again, but drop 2 servers instead + # of 1. This should fail. + d.addCallback(_set_basedir) + d.addCallback(lambda ign: + self._setup_and_upload()) + d.addCallback(_do_server_setup) + d.addCallback(_remove_server) + d.addCallback(lambda ign: + self.shouldFail(NotEnoughSharesError, + "test_dropped_server_in_encoder", "", + self._do_upload_with_broken_servers, 2)) + return d + + + def test_servers_with_unique_shares(self): + # servers_with_unique_shares expects a dict of + # shnum => peerid as a preexisting shares argument. + test1 = { + 1 : "server1", + 2 : "server2", + 3 : "server3", + 4 : "server4" + } + unique_servers = upload.servers_with_unique_shares(test1) + self.failUnlessEqual(4, len(unique_servers)) + for server in ["server1", "server2", "server3", "server4"]: + self.failUnlessIn(server, unique_servers) + test1[4] = "server1" + # Now there should only be 3 unique servers. + unique_servers = upload.servers_with_unique_shares(test1) + self.failUnlessEqual(3, len(unique_servers)) + for server in ["server1", "server2", "server3"]: + self.failUnlessIn(server, unique_servers) + # servers_with_unique_shares expects a set of PeerTracker + # instances as a used_peers argument, but only uses the peerid + # instance variable to assess uniqueness. So we feed it some fake + # PeerTrackers whose only important characteristic is that they + # have peerid set to something. + class FakePeerTracker: + pass + trackers = [] + for server in ["server5", "server6", "server7", "server8"]: + t = FakePeerTracker() + t.peerid = server + trackers.append(t) + # Recall that there are 3 unique servers in test1. Since none of + # those overlap with the ones in trackers, we should get 7 back + unique_servers = upload.servers_with_unique_shares(test1, set(trackers)) + self.failUnlessEqual(7, len(unique_servers)) + expected_servers = ["server" + str(i) for i in xrange(1, 9)] + expected_servers.remove("server4") + for server in expected_servers: + self.failUnlessIn(server, unique_servers) + # Now add an overlapping server to trackers. + t = FakePeerTracker() + t.peerid = "server1" + trackers.append(t) + unique_servers = upload.servers_with_unique_shares(test1, set(trackers)) + self.failUnlessEqual(7, len(unique_servers)) + for server in expected_servers: + self.failUnlessIn(server, unique_servers) + + def _set_up_nodes_extra_config(self, clientdir): cfgfn = os.path.join(clientdir, "tahoe.cfg") oldcfg = open(cfgfn, "r").read()