This patch also renames some instances of "find_shares()" to "find_all_shares()" and other instances to "find_uri_shares()" as appropriate -- the conflation between those names confused me at first when writing these tests.
d.addCallback(_stash_it)
return d
- def find_shares(self, unused=None):
+ def find_all_shares(self, unused=None):
"""Locate shares on disk. Returns a dict that maps
(clientnum,sharenum) to a string that contains the share container
(copied directly from the disk, containing leases etc). You can
def replace_shares(self, newshares, storage_index):
"""Replace shares on disk. Takes a dictionary in the same form
- as find_shares() returns."""
+ as find_all_shares() returns."""
for i, c in enumerate(self.clients):
sharedir = c.getServiceNamed("storage").sharedir
def _delete_a_share(self, unused=None, sharenum=None):
""" Delete one share. """
- shares = self.find_shares()
+ shares = self.find_all_shares()
ks = shares.keys()
if sharenum is not None:
k = [ key for key in shares.keys() if key[1] == sharenum ][0]
return unused
def _corrupt_a_share(self, unused, corruptor_func, sharenum):
- shares = self.find_shares()
+ shares = self.find_all_shares()
ks = [ key for key in shares.keys() if key[1] == sharenum ]
assert ks, (shares.keys(), sharenum)
k = ks[0]
def _corrupt_all_shares(self, unused, corruptor_func):
""" All shares on disk will be corrupted by corruptor_func. """
- shares = self.find_shares()
+ shares = self.find_all_shares()
for k in shares.keys():
self._corrupt_a_share(unused, corruptor_func, k[1])
return corruptor_func
def _corrupt_a_random_share(self, unused, corruptor_func):
""" Exactly one share on disk will be corrupted by corruptor_func. """
- shares = self.find_shares()
+ shares = self.find_all_shares()
ks = shares.keys()
k = random.choice(ks)
self._corrupt_a_share(unused, corruptor_func, k[1])
ss = self.g.servers_by_number[i]
yield (i, ss, ss.storedir)
- def find_shares(self, uri):
+ def find_uri_shares(self, uri):
si = tahoe_uri.from_string(uri).get_storage_index()
prefixdir = storage_index_to_dir(si)
shares = []
os.unlink(sharefile)
def delete_shares_numbered(self, uri, shnums):
- for (i_shnum, i_serverid, i_sharefile) in self.find_shares(uri):
+ for (i_shnum, i_serverid, i_sharefile) in self.find_uri_shares(uri):
if i_shnum in shnums:
os.unlink(i_sharefile)
open(sharefile, "wb").write(corruptdata)
def corrupt_shares_numbered(self, uri, shnums, corruptor, debug=False):
- for (i_shnum, i_serverid, i_sharefile) in self.find_shares(uri):
+ for (i_shnum, i_serverid, i_sharefile) in self.find_uri_shares(uri):
if i_shnum in shnums:
sharedata = open(i_sharefile, "rb").read()
corruptdata = corruptor(sharedata, debug=debug)
def _clobber_shares(ignored):
# delete one, corrupt a second
- shares = self.find_shares(self.uri)
+ shares = self.find_uri_shares(self.uri)
self.failUnlessReallyEqual(len(shares), 10)
os.unlink(shares[0][2])
cso = debug.CorruptShareOptions()
d.addCallback(_check_stats)
def _clobber_shares(ignored):
- shares = self.find_shares(self.uris[u"gööd"])
+ shares = self.find_uri_shares(self.uris[u"gööd"])
self.failUnlessReallyEqual(len(shares), 10)
os.unlink(shares[0][2])
- shares = self.find_shares(self.uris["mutable"])
+ shares = self.find_uri_shares(self.uris["mutable"])
cso = debug.CorruptShareOptions()
cso.stdout = StringIO()
cso.parseOptions([shares[1][2]])
self.delete_shares_numbered(node.get_uri(), [0,1])
def _corrupt_some_shares(self, node):
- for (shnum, serverid, sharefile) in self.find_shares(node.get_uri()):
+ for (shnum, serverid, sharefile) in self.find_uri_shares(node.get_uri()):
if shnum in (0,1):
self._run_cli(["debug", "corrupt-share", sharefile])
os.makedirs(si_dir)
new_sharefile = os.path.join(si_dir, str(sharenum))
shutil.copy(sharefile, new_sharefile)
- self.shares = self.find_shares(self.uri)
+ self.shares = self.find_uri_shares(self.uri)
# Make sure that the storage server has the share.
self.failUnless((sharenum, ss.original.my_nodeid, new_sharefile)
in self.shares)
d = nm.create_mutable_file(mutable_plaintext)
def _uploaded_mutable(node):
self.uri = node.get_uri()
- self.shares = self.find_shares(self.uri)
+ self.shares = self.find_uri_shares(self.uri)
d.addCallback(_uploaded_mutable)
else:
data = upload.Data(immutable_plaintext, convergence="")
d = self.c0.upload(data)
def _uploaded_immutable(upload_res):
self.uri = upload_res.uri
- self.shares = self.find_shares(self.uri)
+ self.shares = self.find_uri_shares(self.uri)
d.addCallback(_uploaded_immutable)
return d
# replace_shares, and asserting that the new set of shares equals the
# old is more to test this test code than to test the Tahoe code...
d = defer.succeed(None)
- d.addCallback(self.find_shares)
+ d.addCallback(self.find_all_shares)
stash = [None]
def _stash_it(res):
stash[0] = res
d.addCallback(_check)
def _remove_all(ignored):
- for sh in self.find_shares(self.uri):
+ for sh in self.find_uri_shares(self.uri):
self.delete_share(sh)
d.addCallback(_remove_all)
def _grab_sh0(res):
self.sh0_file = [sharefile
for (shnum, serverid, sharefile)
- in self.find_shares(self.uri)
+ in self.find_uri_shares(self.uri)
if shnum == 0][0]
self.sh0_orig = open(self.sh0_file, "rb").read()
d.addCallback(_grab_sh0)
self.set_up_grid(num_clients=2)
d = self.upload_and_stash()
- d.addCallback(lambda ignored: self.find_shares(self.uri))
+ d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
def _stash_shares(oldshares):
self.oldshares = oldshares
d.addCallback(_stash_shares)
- d.addCallback(lambda ignored: self.find_shares(self.uri))
+ d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
def _compare(newshares):
self.failUnlessEqual(newshares, self.oldshares)
d.addCallback(_compare)
for sh in self.oldshares[1:8]:
self.delete_share(sh)
d.addCallback(_delete_8)
- d.addCallback(lambda ignored: self.find_shares(self.uri))
+ d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
d.addCallback(lambda shares: self.failUnlessEqual(len(shares), 2))
d.addCallback(lambda ignored:
# test share corruption
def _test_corrupt(ignored):
olddata = {}
- shares = self.find_shares(self.uri)
+ shares = self.find_uri_shares(self.uri)
for (shnum, serverid, sharefile) in shares:
olddata[ (shnum, serverid) ] = open(sharefile, "rb").read()
for sh in shares:
d.addCallback(_test_corrupt)
def _remove_all(ignored):
- for sh in self.find_shares(self.uri):
+ for sh in self.find_uri_shares(self.uri):
self.delete_share(sh)
d.addCallback(_remove_all)
- d.addCallback(lambda ignored: self.find_shares(self.uri))
+ d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
d.addCallback(lambda shares: self.failUnlessEqual(shares, []))
return d
# Now we inspect the filesystem to make sure that it has 10
# shares.
- shares = self.find_shares(self.uri)
+ shares = self.find_uri_shares(self.uri)
self.failIf(len(shares) < 10)
d.addCallback(_check_results)
self.failUnless(post.is_healthy(), post.data)
# Make sure we really have 10 shares.
- shares = self.find_shares(self.uri)
+ shares = self.find_uri_shares(self.uri)
self.failIf(len(shares) < 10)
d.addCallback(_check_results)
def OFF_test_repair_from_corruption_of_1(self):
d = defer.succeed(None)
- d.addCallback(self.find_shares)
+ d.addCallback(self.find_all_shares)
stash = [None]
def _stash_it(res):
stash[0] = res
# Now we inspect the filesystem to make sure that it has 10
# shares.
- shares = self.find_shares()
+ shares = self.find_all_shares()
self.failIf(len(shares) < 10)
# Now we assert that the verifier reports the file as healthy.
return d
- def _find_shares(self, basedir):
+ def _find_all_shares(self, basedir):
shares = []
for (dirpath, dirnames, filenames) in os.walk(basedir):
if "storage" not in dirpath:
def _test_debug(res):
# find a share. It is important to run this while there is only
# one slot in the grid.
- shares = self._find_shares(self.basedir)
+ shares = self._find_all_shares(self.basedir)
(client_num, storage_index, filename, shnum) = shares[0]
log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
% filename)
def _corrupt_shares(res):
# run around and flip bits in all but k of the shares, to test
# the hash checks
- shares = self._find_shares(self.basedir)
+ shares = self._find_all_shares(self.basedir)
## sort by share number
#shares.sort( lambda a,b: cmp(a[3], b[3]) )
where = dict([ (shnum, filename)
+# -*- coding: utf-8 -*-
+
import os, shutil
from cStringIO import StringIO
from twisted.trial import unittest
d.addCallback(_done)
return d
+# copied from python docs because itertools.combinations was added in
+# python 2.6 and we support >= 2.4.
+def combinations(iterable, r):
+ # combinations('ABCD', 2) --> AB AC AD BC BD CD
+ # combinations(range(4), 3) --> 012 013 023 123
+ pool = tuple(iterable)
+ n = len(pool)
+ if r > n:
+ return
+ indices = range(r)
+ yield tuple(pool[i] for i in indices)
+ while True:
+ for i in reversed(range(r)):
+ if indices[i] != i + n - r:
+ break
+ else:
+ return
+ indices[i] += 1
+ for j in range(i+1, r):
+ indices[j] = indices[j-1] + 1
+ yield tuple(pool[i] for i in indices)
+
+def is_happy_enough(servertoshnums, h, k):
+ """ I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """
+ if len(servertoshnums) < h:
+ return False
+ # print "servertoshnums: ", servertoshnums, h, k
+ for happysetcombo in combinations(servertoshnums.iterkeys(), h):
+ # print "happysetcombo: ", happysetcombo
+ for subsetcombo in combinations(happysetcombo, k):
+ shnums = reduce(set.union, [ servertoshnums[s] for s in subsetcombo ])
+ # print "subsetcombo: ", subsetcombo, ", shnums: ", shnums
+ if len(shnums) < k:
+ # print "NOT HAAPP{Y", shnums, k
+ return False
+ # print "HAAPP{Y"
+ return True
+
class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
ShouldFailMixin):
+ def find_all_shares(self, unused=None):
+ """Locate shares on disk. Returns a dict that maps
+ server to set of sharenums.
+ """
+ assert self.g, "I tried to find a grid at self.g, but failed"
+ servertoshnums = {} # k: server, v: set(shnum)
+
+ for i, c in self.g.servers_by_number.iteritems():
+ for (dirp, dirns, fns) in os.walk(c.sharedir):
+ for fn in fns:
+ try:
+ sharenum = int(fn)
+ except TypeError:
+ # Whoops, I guess that's not a share file then.
+ pass
+ else:
+ servertoshnums.setdefault(i, set()).add(sharenum)
+
+ return servertoshnums
+
def _do_upload_with_broken_servers(self, servers_to_break):
"""
I act like a normal upload, but before I send the results of
d.addCallback(_have_shareholders)
return d
+ def _has_happy_share_distribution(self):
+ servertoshnums = self.find_all_shares()
+ k = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['k']
+ h = self.g.clients[0].DEFAULT_ENCODING_PARAMETERS['happy']
+ return is_happy_enough(servertoshnums, h, k)
def _add_server(self, server_number, readonly=False):
assert self.g, "I tried to find a grid at self.g, but failed"
str(share_number))
if old_share_location != new_share_location:
shutil.copy(old_share_location, new_share_location)
- shares = self.find_shares(self.uri)
+ shares = self.find_uri_shares(self.uri)
# Make sure that the storage server has the share.
self.failUnless((share_number, ss.my_nodeid, new_share_location)
in shares)
self.uri = ur.uri
d.addCallback(_store_uri)
d.addCallback(lambda ign:
- self.find_shares(self.uri))
+ self.find_uri_shares(self.uri))
def _store_shares(shares):
self.shares = shares
d.addCallback(_store_shares)
d.addCallback(lambda ign: self._add_server(4, False))
# and this time the upload ought to succeed
d.addCallback(lambda ign: c.upload(DATA))
+ d.addCallback(lambda ign:
+ self.failUnless(self._has_happy_share_distribution()))
return d
d.addCallback(_reset_encoding_parameters)
d.addCallback(lambda client:
client.upload(upload.Data("data" * 10000, convergence="")))
+ d.addCallback(lambda ign:
+ self.failUnless(self._has_happy_share_distribution()))
# This scenario is basically comment:53, but changed so that the
d.addCallback(_reset_encoding_parameters)
d.addCallback(lambda client:
client.upload(upload.Data("data" * 10000, convergence="")))
+ d.addCallback(lambda ign:
+ self.failUnless(self._has_happy_share_distribution()))
# Try the same thing, but with empty servers after the first one
# servers of happiness were pushed.
d.addCallback(lambda results:
self.failUnlessEqual(results.pushed_shares, 3))
+ d.addCallback(lambda ign:
+ self.failUnless(self._has_happy_share_distribution()))
return d
def test_problem_layout_ticket1124(self):
d.addCallback(_setup)
d.addCallback(lambda client:
client.upload(upload.Data("data" * 10000, convergence="")))
+ d.addCallback(lambda ign:
+ self.failUnless(self._has_happy_share_distribution()))
return d
test_problem_layout_ticket1124.todo = "Fix this after 1.7.1 release."
d.addCallback(_reset_encoding_parameters)
d.addCallback(lambda client:
client.upload(upload.Data("data" * 10000, convergence="")))
+ d.addCallback(lambda ign:
+ self.failUnless(self._has_happy_share_distribution()))
return d
d.addCallback(_reset_encoding_parameters)
d.addCallback(lambda client:
client.upload(upload.Data("data" * 10000, convergence="")))
+ d.addCallback(lambda ign:
+ self.failUnless(self._has_happy_share_distribution()))
return d
d.addCallback(_prepare_client)
d.addCallback(lambda client:
client.upload(upload.Data("data" * 10000, convergence="")))
+ d.addCallback(lambda ign:
+ self.failUnless(self._has_happy_share_distribution()))
return d
d.addCallback(_setup)
d.addCallback(lambda client:
client.upload(upload.Data("data" * 10000, convergence="")))
+ d.addCallback(lambda ign:
+ self.failUnless(self._has_happy_share_distribution()))
return d
test_problem_layout_comment_187.todo = "this isn't fixed yet"
d.addCallback(_setup)
d.addCallback(lambda client:
client.upload(upload.Data("data" * 10000, convergence="")))
+ d.addCallback(lambda ign:
+ self.failUnless(self._has_happy_share_distribution()))
return d
def test_upload_succeeds_with_some_homeless_shares(self):
d.addCallback(_server_setup)
d.addCallback(lambda client:
client.upload(upload.Data("data" * 10000, convergence="")))
+ d.addCallback(lambda ign:
+ self.failUnless(self._has_happy_share_distribution()))
return d
d.addCallback(_server_setup)
d.addCallback(lambda client:
client.upload(upload.Data("data" * 10000, convergence="")))
+ d.addCallback(lambda ign:
+ self.failUnless(self._has_happy_share_distribution()))
return d
d.addCallback(_compute_fileurls)
def _clobber_shares(ignored):
- good_shares = self.find_shares(self.uris["good"])
+ good_shares = self.find_uri_shares(self.uris["good"])
self.failUnlessReallyEqual(len(good_shares), 10)
- sick_shares = self.find_shares(self.uris["sick"])
+ sick_shares = self.find_uri_shares(self.uris["sick"])
os.unlink(sick_shares[0][2])
- dead_shares = self.find_shares(self.uris["dead"])
+ dead_shares = self.find_uri_shares(self.uris["dead"])
for i in range(1, 10):
os.unlink(dead_shares[i][2])
- c_shares = self.find_shares(self.uris["corrupt"])
+ c_shares = self.find_uri_shares(self.uris["corrupt"])
cso = CorruptShareOptions()
cso.stdout = StringIO()
cso.parseOptions([c_shares[0][2]])
d.addCallback(_compute_fileurls)
def _clobber_shares(ignored):
- good_shares = self.find_shares(self.uris["good"])
+ good_shares = self.find_uri_shares(self.uris["good"])
self.failUnlessReallyEqual(len(good_shares), 10)
- sick_shares = self.find_shares(self.uris["sick"])
+ sick_shares = self.find_uri_shares(self.uris["sick"])
os.unlink(sick_shares[0][2])
- dead_shares = self.find_shares(self.uris["dead"])
+ dead_shares = self.find_uri_shares(self.uris["dead"])
for i in range(1, 10):
os.unlink(dead_shares[i][2])
- c_shares = self.find_shares(self.uris["corrupt"])
+ c_shares = self.find_uri_shares(self.uris["corrupt"])
cso = CorruptShareOptions()
cso.stdout = StringIO()
cso.parseOptions([c_shares[0][2]])
d.addCallback(_compute_fileurls)
def _clobber_shares(ignored):
- sick_shares = self.find_shares(self.uris["sick"])
+ sick_shares = self.find_uri_shares(self.uris["sick"])
os.unlink(sick_shares[0][2])
d.addCallback(_clobber_shares)
#d.addCallback(_stash_uri, "corrupt")
def _clobber_shares(ignored):
- good_shares = self.find_shares(self.uris["good"])
+ good_shares = self.find_uri_shares(self.uris["good"])
self.failUnlessReallyEqual(len(good_shares), 10)
- sick_shares = self.find_shares(self.uris["sick"])
+ sick_shares = self.find_uri_shares(self.uris["sick"])
os.unlink(sick_shares[0][2])
- #dead_shares = self.find_shares(self.uris["dead"])
+ #dead_shares = self.find_uri_shares(self.uris["dead"])
#for i in range(1, 10):
# os.unlink(dead_shares[i][2])
- #c_shares = self.find_shares(self.uris["corrupt"])
+ #c_shares = self.find_uri_shares(self.uris["corrupt"])
#cso = CorruptShareOptions()
#cso.stdout = StringIO()
#cso.parseOptions([c_shares[0][2]])
def _count_leases(self, ignored, which):
u = self.uris[which]
- shares = self.find_shares(u)
+ shares = self.find_uri_shares(u)
lease_counts = []
for shnum, serverid, fn in shares:
sf = get_share_file(fn)