-import os
+import os, shutil
from cStringIO import StringIO
from twisted.trial import unittest
from twisted.python.failure import Failure
import allmydata # for __full_version__
from allmydata import uri, monitor, client
-from allmydata.immutable import upload
+from allmydata.immutable import upload, encode
from allmydata.interfaces import FileTooLargeError, NoSharesError, \
NotEnoughSharesError
from allmydata.util.assertutil import precondition
from no_network import GridTestMixin
from common_util import ShouldFailMixin
from allmydata.storage_client import StorageFarmBroker
+from allmydata.storage.server import storage_index_to_dir
MiB = 1024*1024
class ServerError(Exception):
pass
+class SetDEPMixin:
+ def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
+ p = {"k": k,
+ "happy": happy,
+ "n": n,
+ "max_segment_size": max_segsize,
+ }
+ self.node.DEFAULT_ENCODING_PARAMETERS = p
+
class FakeStorageServer:
def __init__(self, mode):
self.mode = mode
u = upload.FileHandle(fh, convergence=None)
return uploader.upload(u)
-class GoodServer(unittest.TestCase, ShouldFailMixin):
+class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
def setUp(self):
self.node = FakeClient(mode="good")
self.u = upload.Uploader()
self.u.running = True
self.u.parent = self.node
- def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
- p = {"k": k,
- "happy": happy,
- "n": n,
- "max_segment_size": max_segsize,
- }
- self.node.DEFAULT_ENCODING_PARAMETERS = p
-
def _check_small(self, newuri, size):
u = uri.from_string(newuri)
self.failUnless(isinstance(u, uri.LiteralFileURI))
d.addCallback(self._check_large, SIZE_LARGE)
return d
-class ServerErrors(unittest.TestCase, ShouldFailMixin):
+class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
def make_node(self, mode, num_servers=10):
self.node = FakeClient(mode, num_servers)
self.u = upload.Uploader()
d.addCallback(_done)
return d
-class EncodingParameters(GridTestMixin, unittest.TestCase):
+class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
+ ShouldFailMixin):
+ def _do_upload_with_broken_servers(self, servers_to_break):
+ """
+ I act like a normal upload, but before I send the results of
+ Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
+ PeerTrackers in the used_peers part of the return result.
+ """
+ assert self.g, "I tried to find a grid at self.g, but failed"
+ broker = self.g.clients[0].storage_broker
+ sh = self.g.clients[0]._secret_holder
+ data = upload.Data("data" * 10000, convergence="")
+ data.encoding_param_k = 3
+ data.encoding_param_happy = 4
+ data.encoding_param_n = 10
+ uploadable = upload.EncryptAnUploadable(data)
+ encoder = encode.Encoder()
+ encoder.set_encrypted_uploadable(uploadable)
+ status = upload.UploadStatus()
+ selector = upload.Tahoe2PeerSelector("dglev", "test", status)
+ storage_index = encoder.get_param("storage_index")
+ share_size = encoder.get_param("share_size")
+ block_size = encoder.get_param("block_size")
+ num_segments = encoder.get_param("num_segments")
+ d = selector.get_shareholders(broker, sh, storage_index,
+ share_size, block_size, num_segments,
+ 10, 4)
+ def _have_shareholders((used_peers, already_peers)):
+ assert servers_to_break <= len(used_peers)
+ for index in xrange(servers_to_break):
+ server = list(used_peers)[index]
+ for share in server.buckets.keys():
+ server.buckets[share].abort()
+ buckets = {}
+ for peer in used_peers:
+ buckets.update(peer.buckets)
+ encoder.set_shareholders(buckets)
+ d = encoder.start()
+ return d
+ d.addCallback(_have_shareholders)
+ return d
+
+ def _add_server_with_share(self, server_number, share_number=None,
+ readonly=False):
+ assert self.g, "I tried to find a grid at self.g, but failed"
+ assert self.shares, "I tried to find shares at self.shares, but failed"
+ ss = self.g.make_server(server_number, readonly)
+ self.g.add_server(server_number, ss)
+ if share_number:
+ # Copy share i from the directory associated with the first
+ # storage server to the directory associated with this one.
+ old_share_location = self.shares[share_number][2]
+ new_share_location = os.path.join(ss.storedir, "shares")
+ si = uri.from_string(self.uri).get_storage_index()
+ new_share_location = os.path.join(new_share_location,
+ storage_index_to_dir(si))
+ if not os.path.exists(new_share_location):
+ os.makedirs(new_share_location)
+ new_share_location = os.path.join(new_share_location,
+ str(share_number))
+ shutil.copy(old_share_location, new_share_location)
+ shares = self.find_shares(self.uri)
+ # Make sure that the storage server has the share.
+ self.failUnless((share_number, ss.my_nodeid, new_share_location)
+ in shares)
+
+ def _setup_and_upload(self):
+ """
+ I set up a NoNetworkGrid with a single server and client,
+ upload a file to it, store its uri in self.uri, and store its
+ sharedata in self.shares.
+ """
+ self.set_up_grid(num_clients=1, num_servers=1)
+ client = self.g.clients[0]
+ client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
+ data = upload.Data("data" * 10000, convergence="")
+ self.data = data
+ d = client.upload(data)
+ def _store_uri(ur):
+ self.uri = ur.uri
+ d.addCallback(_store_uri)
+ d.addCallback(lambda ign:
+ self.find_shares(self.uri))
+ def _store_shares(shares):
+ self.shares = shares
+ d.addCallback(_store_shares)
+ return d
+
def test_configure_parameters(self):
self.basedir = self.mktemp()
hooks = {0: self._set_up_nodes_extra_config}
d.addCallback(_check)
return d
+ def _setUp(self, ns):
+ # Used by test_happy_semantics and test_prexisting_share_behavior
+ # to set up the grid.
+ self.node = FakeClient(mode="good", num_servers=ns)
+ self.u = upload.Uploader()
+ self.u.running = True
+ self.u.parent = self.node
+
+ def test_happy_semantics(self):
+ self._setUp(2)
+ DATA = upload.Data("kittens" * 10000, convergence="")
+ # These parameters are unsatisfiable with the client that we've made
+ # -- we'll use them to test that the semnatics work correctly.
+ self.set_encoding_parameters(k=3, happy=5, n=10)
+ d = self.shouldFail(NotEnoughSharesError, "test_happy_semantics",
+ "shares could only be placed on 2 servers "
+ "(5 were requested)",
+ self.u.upload, DATA)
+ # Let's reset the client to have 10 servers
+ d.addCallback(lambda ign:
+ self._setUp(10))
+ # These parameters are satisfiable with the client we've made.
+ d.addCallback(lambda ign:
+ self.set_encoding_parameters(k=3, happy=5, n=10))
+ # this should work
+ d.addCallback(lambda ign:
+ self.u.upload(DATA))
+ # Let's reset the client to have 7 servers
+ # (this is less than n, but more than h)
+ d.addCallback(lambda ign:
+ self._setUp(7))
+ # These encoding parameters should still be satisfiable with our
+ # client setup
+ d.addCallback(lambda ign:
+ self.set_encoding_parameters(k=3, happy=5, n=10))
+ # This, then, should work.
+ d.addCallback(lambda ign:
+ self.u.upload(DATA))
+ return d
+
+ def test_problem_layouts(self):
+ self.basedir = self.mktemp()
+ # This scenario is at
+ # http://allmydata.org/trac/tahoe/ticket/778#comment:52
+ #
+ # The scenario in comment:52 proposes that we have a layout
+ # like:
+ # server 1: share 1
+ # server 2: share 1
+ # server 3: share 1
+ # server 4: shares 2 - 10
+ # To get access to the shares, we will first upload to one
+ # server, which will then have shares 1 - 10. We'll then
+ # add three new servers, configure them to not accept any new
+ # shares, then write share 1 directly into the serverdir of each.
+ # Then each of servers 1 - 3 will report that they have share 1,
+ # and will not accept any new share, while server 4 will report that
+ # it has shares 2 - 10 and will accept new shares.
+ # We'll then set 'happy' = 4, and see that an upload fails
+ # (as it should)
+ d = self._setup_and_upload()
+ d.addCallback(lambda ign:
+ self._add_server_with_share(1, 0, True))
+ d.addCallback(lambda ign:
+ self._add_server_with_share(2, 0, True))
+ d.addCallback(lambda ign:
+ self._add_server_with_share(3, 0, True))
+ # Remove the first share from server 0.
+ def _remove_share_0():
+ share_location = self.shares[0][2]
+ os.remove(share_location)
+ d.addCallback(lambda ign:
+ _remove_share_0())
+ # Set happy = 4 in the client.
+ def _prepare():
+ client = self.g.clients[0]
+ client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
+ return client
+ d.addCallback(lambda ign:
+ _prepare())
+ # Uploading data should fail
+ d.addCallback(lambda client:
+ self.shouldFail(NotEnoughSharesError, "test_happy_semantics",
+ "shares could only be placed on 1 servers "
+ "(4 were requested)",
+ client.upload, upload.Data("data" * 10000,
+ convergence="")))
+
+
+ # This scenario is at
+ # http://allmydata.org/trac/tahoe/ticket/778#comment:53
+ #
+ # Set up the grid to have one server
+ def _change_basedir(ign):
+ self.basedir = self.mktemp()
+ d.addCallback(_change_basedir)
+ d.addCallback(lambda ign:
+ self._setup_and_upload())
+ # We want to have a layout like this:
+ # server 1: share 1
+ # server 2: share 2
+ # server 3: share 3
+ # server 4: shares 1 - 10
+ # (this is an expansion of Zooko's example because it is easier
+ # to code, but it will fail in the same way)
+ # To start, we'll create a server with shares 1-10 of the data
+ # we're about to upload.
+ # Next, we'll add three new servers to our NoNetworkGrid. We'll add
+ # one share from our initial upload to each of these.
+ # The counterintuitive ordering of the share numbers is to deal with
+ # the permuting of these servers -- distributing the shares this
+ # way ensures that the Tahoe2PeerSelector sees them in the order
+ # described above.
+ d.addCallback(lambda ign:
+ self._add_server_with_share(server_number=1, share_number=2))
+ d.addCallback(lambda ign:
+ self._add_server_with_share(server_number=2, share_number=0))
+ d.addCallback(lambda ign:
+ self._add_server_with_share(server_number=3, share_number=1))
+ # So, we now have the following layout:
+ # server 0: shares 1 - 10
+ # server 1: share 0
+ # server 2: share 1
+ # server 3: share 2
+ # We want to change the 'happy' parameter in the client to 4.
+ # We then want to feed the upload process a list of peers that
+ # server 0 is at the front of, so we trigger Zooko's scenario.
+ # Ideally, a reupload of our original data should work.
+ def _reset_encoding_parameters(ign):
+ client = self.g.clients[0]
+ client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
+ return client
+ d.addCallback(_reset_encoding_parameters)
+ # We need this to get around the fact that the old Data
+ # instance already has a happy parameter set.
+ d.addCallback(lambda client:
+ client.upload(upload.Data("data" * 10000, convergence="")))
+ return d
+
+
+ def test_dropped_servers_in_encoder(self):
+ def _set_basedir(ign=None):
+ self.basedir = self.mktemp()
+ _set_basedir()
+ d = self._setup_and_upload();
+ # Add 5 servers, with one share each from the original
+ # Add a readonly server
+ def _do_server_setup(ign):
+ self._add_server_with_share(1, 1, True)
+ self._add_server_with_share(2)
+ self._add_server_with_share(3)
+ self._add_server_with_share(4)
+ self._add_server_with_share(5)
+ d.addCallback(_do_server_setup)
+ # remove the original server
+ # (necessary to ensure that the Tahoe2PeerSelector will distribute
+ # all the shares)
+ def _remove_server(ign):
+ server = self.g.servers_by_number[0]
+ self.g.remove_server(server.my_nodeid)
+ d.addCallback(_remove_server)
+ # This should succeed.
+ d.addCallback(lambda ign:
+ self._do_upload_with_broken_servers(1))
+ # Now, do the same thing over again, but drop 2 servers instead
+ # of 1. This should fail.
+ d.addCallback(_set_basedir)
+ d.addCallback(lambda ign:
+ self._setup_and_upload())
+ d.addCallback(_do_server_setup)
+ d.addCallback(_remove_server)
+ d.addCallback(lambda ign:
+ self.shouldFail(NotEnoughSharesError,
+ "test_dropped_server_in_encoder", "",
+ self._do_upload_with_broken_servers, 2))
+ return d
+
+
+ def test_servers_with_unique_shares(self):
+ # servers_with_unique_shares expects a dict of
+ # shnum => peerid as a preexisting shares argument.
+ test1 = {
+ 1 : "server1",
+ 2 : "server2",
+ 3 : "server3",
+ 4 : "server4"
+ }
+ unique_servers = upload.servers_with_unique_shares(test1)
+ self.failUnlessEqual(4, len(unique_servers))
+ for server in ["server1", "server2", "server3", "server4"]:
+ self.failUnlessIn(server, unique_servers)
+ test1[4] = "server1"
+ # Now there should only be 3 unique servers.
+ unique_servers = upload.servers_with_unique_shares(test1)
+ self.failUnlessEqual(3, len(unique_servers))
+ for server in ["server1", "server2", "server3"]:
+ self.failUnlessIn(server, unique_servers)
+ # servers_with_unique_shares expects a set of PeerTracker
+ # instances as a used_peers argument, but only uses the peerid
+ # instance variable to assess uniqueness. So we feed it some fake
+ # PeerTrackers whose only important characteristic is that they
+ # have peerid set to something.
+ class FakePeerTracker:
+ pass
+ trackers = []
+ for server in ["server5", "server6", "server7", "server8"]:
+ t = FakePeerTracker()
+ t.peerid = server
+ trackers.append(t)
+ # Recall that there are 3 unique servers in test1. Since none of
+ # those overlap with the ones in trackers, we should get 7 back
+ unique_servers = upload.servers_with_unique_shares(test1, set(trackers))
+ self.failUnlessEqual(7, len(unique_servers))
+ expected_servers = ["server" + str(i) for i in xrange(1, 9)]
+ expected_servers.remove("server4")
+ for server in expected_servers:
+ self.failUnlessIn(server, unique_servers)
+ # Now add an overlapping server to trackers.
+ t = FakePeerTracker()
+ t.peerid = "server1"
+ trackers.append(t)
+ unique_servers = upload.servers_with_unique_shares(test1, set(trackers))
+ self.failUnlessEqual(7, len(unique_servers))
+ for server in expected_servers:
+ self.failUnlessIn(server, unique_servers)
+
+
def _set_up_nodes_extra_config(self, clientdir):
cfgfn = os.path.join(clientdir, "tahoe.cfg")
oldcfg = open(cfgfn, "r").read()