No functional changes, but remove unused code, improve or fix docstrings, etc.
index", which is used for both server selection (described below) and
to index shares within the Storage Servers on the selected peers.
-A variety of hashes are computed while the shares are being produced,
-to validate the plaintext, the ciphertext, and the shares
-themselves. Merkle hash trees are also produced to enable validation
-of individual segments of plaintext or ciphertext without requiring
+Hashes are computed while the shares are being produced, to validate
+the ciphertext and the shares themselves. Merkle hash trees are used to
+enable validation of individual segments of ciphertext without requiring
the download/decoding of the whole file. These hashes go into the
"Capability Extension Block", which will be stored with each share.
Return a list of node indices that are necessary for the hash chain.
"""
if i < 0 or i >= len(self):
- raise IndexError('index out of range: ' + repr(i))
+ raise IndexError('index out of range: 0 >= %s < %s' % (i, len(self)))
needed = []
here = i
while here != 0:
from allmydata.immutable import layout
from pycryptopp.cipher.aes import AES
-class HaveAllPeersError(Exception):
- # we use this to jump out of the loop
- pass
-
class IntegrityCheckError(Exception):
pass
"""
def __init__(self, vbucket, blocknum, parent, results):
+ precondition(isinstance(vbucket, ValidatedReadBucketProxy), vbucket)
prefix = "%s-%d" % (vbucket, blocknum)
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
self.vbucket = vbucket
# through it.
downloaders = []
for blocknum, vbucket in active_buckets.iteritems():
+ assert isinstance(vbucket, ValidatedReadBucketProxy), vbucket
bd = BlockDownloader(vbucket, blocknum, self, self.results)
downloaders.append(bd)
if self.results:
return d
def send_all_share_hash_trees(self):
- # each bucket gets a set of share hash tree nodes that are needed to
- # validate their share. This includes the share hash itself, but does
- # not include the top-level hash root (which is stored securely in
- # the URI instead).
+ # Each bucket gets a set of share hash tree nodes that are needed to validate their
+ # share. This includes the share hash itself, but does not include the top-level hash
+ # root (which is stored securely in the URI instead).
self.log("sending all share hash trees", level=log.NOISY)
self.set_status("Sending Share Hash Trees")
self.set_encode_and_push_progress(extra=0.6)
def needs_rebalancing():
"""Return a boolean, True if the file/dir's reliability could be
improved by moving shares to new servers. Non-distributed LIT files
- always returne False."""
+ always return False."""
def get_data():
- """Return a dictionary that describes the state of the file/dir.
- Non-distributed LIT files always return an empty dictionary. Normal
- files and directories return a dictionary with the following keys
- (note that these use base32-encoded strings rather than binary ones)
- (also note that for mutable files, these counts are for the 'best'
- version)::
+ """Return a dictionary that describes the state of the file/dir. Non-distributed LIT
+ files always return an empty dictionary. Normal files and directories return a
+ dictionary with the following keys (note that these use binary strings rather than
+ base32-encoded ones) (also note that for mutable files, these counts are for the 'best'
+ version):
count-shares-good: the number of distinct good shares that were found
count-shares-needed: 'k', the number of shares required for recovery
sharenum).
servers-responding: list of (binary) storage server identifiers,
one for each server which responded to the share
- query.
+ query (even if they said they didn't have shares,
+ and even if they said they did have shares but then
+ refused to send them when asked, and even if they
+ said they did have shares and sent incorrect ones
+ when asked)
sharemap: dict mapping share identifier to list of serverids
(binary strings). This indicates which servers are holding
which shares. For immutable files, the shareid is an
self.home = filename
f = open(self.home, 'rb')
(version, size, num_leases) = struct.unpack(">LLL", f.read(0xc))
- assert version == 1
+ assert version == 1, version
self._size = size
self._num_leases = num_leases
self._data_offset = 0xc
for i in range(7, 10)])
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
def _done(res):
- self.failUnless(isinstance(res, Failure))
- self.failUnless(res.check(NotEnoughSharesError))
+ self.failUnless(isinstance(res, Failure), res)
+ self.failUnless(res.check(NotEnoughSharesError), res)
d.addBoth(_done)
return d
for i in range(7, 10)])
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
def _done(res):
- self.failUnless(isinstance(res, Failure))
- self.failUnless(res.check(NotEnoughSharesError))
+ self.failUnless(isinstance(res, Failure), res)
+ self.failUnless(res.check(NotEnoughSharesError), res)
d.addBoth(_done)
return d
# 'tahoe stop' command takes a while.
def _stop(res):
open(HOTLINE_FILE, "w").write("")
- self.failUnless(os.path.exists(TWISTD_PID_FILE))
+ self.failUnless(os.path.exists(TWISTD_PID_FILE), (TWISTD_PID_FILE, os.listdir(os.path.dirname(TWISTD_PID_FILE))))
argv = ["--quiet", "stop", c1]
out,err = StringIO(), StringIO()
rc = runner.runner(argv, stdout=out, stderr=err)
base32.b2a(n.get_storage_index()), where)
needs_rebalancing = bool( len(self.clients) < 10 )
if not incomplete:
- self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, where)
+ self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, str((where, cr, cr.get_data())))
d = cr.get_data()
self.failUnlessEqual(d["count-shares-good"], 10, where)
self.failUnlessEqual(d["count-shares-needed"], 3, where)
self.failUnlessEqual(sorted(d["servers-responding"]),
sorted([c.nodeid for c in self.clients]),
where)
- self.failUnless("sharemap" in d, where)
+ self.failUnless("sharemap" in d, str((where, d)))
all_serverids = set()
for (shareid, serverids) in d["sharemap"].items():
all_serverids.update(serverids)
def check_is_healthy(self, cr, where):
- self.failUnless(ICheckerResults.providedBy(cr), (cr, type(cr), where))
- self.failUnless(cr.is_healthy(), (cr.get_report(), cr.is_healthy(), cr.get_summary(), where))
- self.failUnless(cr.is_recoverable(), where)
- d = cr.get_data()
- self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
- self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
- return cr
+ try:
+ self.failUnless(ICheckerResults.providedBy(cr), (cr, type(cr), where))
+ self.failUnless(cr.is_healthy(), (cr.get_report(), cr.is_healthy(), cr.get_summary(), where))
+ self.failUnless(cr.is_recoverable(), where)
+ d = cr.get_data()
+ self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
+ self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
+ return cr
+ except Exception, le:
+ le.args = tuple(le.args + (where,))
+ raise
def check_is_missing_shares(self, cr, where):
self.failUnless(ICheckerResults.providedBy(cr), where)
# by "corrupt-shares" we mean the file is still recoverable
self.failUnless(ICheckerResults.providedBy(cr), where)
d = cr.get_data()
- self.failIf(cr.is_healthy(), where)
+ self.failIf(cr.is_healthy(), (where, cr))
self.failUnless(cr.is_recoverable(), where)
d = cr.get_data()
self.failUnless(d["count-shares-good"] < 10, where)