d.addCallback(self._got_mapupdate_results)
if verify:
d.addCallback(self._verify_all_shares)
+ d.addCallback(self._generate_results)
if repair:
d.addCallback(self._maybe_do_repair)
- d.addCallback(self._generate_results)
d.addCallback(self._return_results)
return d
if alleged_writekey != self._node.get_writekey():
raise CorruptShareError(peerid, shnum, "invalid privkey")
- def _maybe_do_repair(self, res):
- if not self.need_repair:
- return
- self.results.repair_attempted = True
- d = self._node.repair(self)
- def _repair_finished(repair_results):
- self.results.repair_succeeded = True
- self.results.repair_results = repair_results
- def _repair_error(f):
- # I'm not sure if I want to pass through a failure or not.
- self.results.repair_succeeded = False
- self.results.repair_failure = f
- return f
- d.addCallbacks(_repair_finished, _repair_error)
- return d
-
def _generate_results(self, res):
self.results.healthy = True
smap = self.results.servermap
self.results.status_report = "\n".join(report) + "\n"
+ def _maybe_do_repair(self, res):
+ if not self.need_repair:
+ return
+ self.results.repair_attempted = True
+ d = self._node.repair(self.results)
+ def _repair_finished(repair_results):
+ self.results.repair_succeeded = True
+ self.results.repair_results = repair_results
+ def _repair_error(f):
+ # I'm not sure if I want to pass through a failure or not.
+ self.results.repair_succeeded = False
+ self.results.repair_failure = f
+ return f
+ d.addCallbacks(_repair_finished, _repair_error)
+ return d
+
def _return_results(self, res):
return self.results
self.storage_index_s = base32.b2a(storage_index)[:6]
self.repair_attempted = False
self.status_report = "[not generated yet]" # string
+ self.repair_report = None
self.problems = [] # list of (peerid, storage_index, shnum, failure)
def is_healthy(self):
s += "\n"
s += self.status_report
s += "\n"
+ if self.repair_attempted:
+ s += "Repair attempted "
+ if self.repair_succeeded:
+ s += "and successful\n"
+ else:
+ s += "and failed\n"
+ s += "\n"
+ s += self.repair_results.to_string()
+ s += "\n"
return s
self._client.notify_mapupdate(u.get_status())
return u.update()
- def download_version(self, servermap, version):
+ def download_version(self, servermap, version, fetch_privkey=False):
return self._do_serialized(self._try_once_to_download_version,
- servermap, version)
- def _try_once_to_download_version(self, servermap, version):
- r = Retrieve(self, servermap, version)
+ servermap, version, fetch_privkey)
+ def _try_once_to_download_version(self, servermap, version,
+ fetch_privkey=False):
+ r = Retrieve(self, servermap, version, fetch_privkey)
self._client.notify_retrieve(r.get_status())
return r.download()
self.connections[peerid] = self._servermap.connections[peerid]
# then we add in all the shares that were bad (corrupted, bad
# signatures, etc). We want to replace these.
- for (peerid, shnum, old_checkstring) in self._servermap.bad_shares:
- self.goal.add( (peerid, shnum) )
- self.bad_share_checkstrings[ (peerid, shnum) ] = old_checkstring
+ for key, old_checkstring in self._servermap.bad_shares.items():
+ (peerid, shnum) = key
+ self.goal.add(key)
+ self.bad_share_checkstrings[key] = old_checkstring
self.connections[peerid] = self._servermap.connections[peerid]
# create the shares. We'll discard these as they are delivered. SDMF:
class RepairResults:
implements(IRepairResults)
+ def to_string(self):
+ return ""
+
class MustForceRepairError(Exception):
pass
# servermap.bad_shares . Publish knows that it should try and replace
# these.
+ # I chose to use the retrieve phase to ensure that the privkey is
+ # available, to avoid the extra roundtrip that would occur if we,
+ # say, added an smap.get_privkey() method.
+
+ assert self.node.get_writekey() # repair currently requires a writecap
+
best_version = smap.best_recoverable_version()
- d = self.node.download_version(smap, best_version)
+ d = self.node.download_version(smap, best_version, fetch_privkey=True)
d.addCallback(self.node.upload, smap)
d.addCallback(self.get_results)
return d
from allmydata import hashtree, codec, storage
from allmydata.immutable.encode import NotEnoughSharesError
from pycryptopp.cipher.aes import AES
+from pycryptopp.publickey import rsa
from common import DictOfSets, CorruptShareError, UncoordinatedWriteError
from layout import SIGNED_PREFIX, unpack_share_data
# Retrieve object will remain tied to a specific version of the file, and
# will use a single ServerMap instance.
- def __init__(self, filenode, servermap, verinfo):
+ def __init__(self, filenode, servermap, verinfo, fetch_privkey=False):
self._node = filenode
assert self._node._pubkey
self._storage_index = filenode.get_storage_index()
self.servermap = servermap
assert self._node._pubkey
self.verinfo = verinfo
+ # during repair, we may be called upon to grab the private key, since
+ # it wasn't picked up during a verify=False checker run, and we'll
+ # need it for repair to generate the a new version.
+ self._need_privkey = fetch_privkey
+ if self._node._privkey:
+ self._need_privkey = False
self._status = RetrieveStatus()
self._status.set_storage_index(self._storage_index)
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.verinfo
offsets = dict(offsets_tuple)
+
# we read the checkstring, to make sure that the data we grab is from
- # the right version. We also read the data, and the hashes necessary
- # to validate them (share_hash_chain, block_hash_tree, share_data).
- # We don't read the signature or the pubkey, since that was handled
- # during the servermap phase, and we'll be comparing the share hash
- # chain against the roothash that was validated back then.
- readv = [ (0, struct.calcsize(SIGNED_PREFIX)),
- (offsets['share_hash_chain'],
- offsets['enc_privkey'] - offsets['share_hash_chain']),
- ]
+ # the right version.
+ readv = [ (0, struct.calcsize(SIGNED_PREFIX)) ]
+
+ # We also read the data, and the hashes necessary to validate them
+ # (share_hash_chain, block_hash_tree, share_data). We don't read the
+ # signature or the pubkey, since that was handled during the
+ # servermap phase, and we'll be comparing the share hash chain
+ # against the roothash that was validated back then.
+
+ readv.append( (offsets['share_hash_chain'],
+ offsets['enc_privkey'] - offsets['share_hash_chain'] ) )
+
+ # if we need the private key (for repair), we also fetch that
+ if self._need_privkey:
+ readv.append( (offsets['enc_privkey'],
+ offsets['EOF'] - offsets['enc_privkey']) )
m = Marker()
self._outstanding_queries[m] = (peerid, shnum, started)
# shares if we get them.. seems better than an assert().
for shnum,datav in datavs.items():
- (prefix, hash_and_data) = datav
+ (prefix, hash_and_data) = datav[:2]
try:
self._got_results_one_share(shnum, peerid,
prefix, hash_and_data)
self._status.problems[peerid] = f
self._last_failure = f
pass
+ if self._need_privkey and len(datav) > 2:
+ lp = None
+ self._try_to_validate_privkey(datav[2], peerid, shnum, lp)
# all done!
def _got_results_one_share(self, shnum, peerid,
# self.shares
self.shares[shnum] = share_data
+ def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
+
+ alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
+ alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
+ if alleged_writekey != self._node.get_writekey():
+ self.log("invalid privkey from %s shnum %d" %
+ (idlib.nodeid_b2a(peerid)[:8], shnum),
+ parent=lp, level=log.WEIRD, umid="YIw4tA")
+ return
+
+ # it's good
+ self.log("got valid privkey from shnum %d on peerid %s" %
+ (shnum, idlib.shortnodeid_b2a(peerid)),
+ parent=lp)
+ privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
+ self._node._populate_encprivkey(enc_privkey)
+ self._node._populate_privkey(privkey)
+ self._need_privkey = False
+
def _query_failed(self, f, marker, peerid):
self.log(format="query to [%(peerid)s] failed",
peerid=idlib.shortnodeid_b2a(peerid),
if len(self.shares) < k:
# we don't have enough shares yet
return self._maybe_send_more_queries(k)
+ if self._need_privkey:
+ # we got k shares, but none of them had a valid privkey. TODO:
+ # look further. Adding code to do this is a bit complicated, and
+ # I want to avoid that complication, and this should be pretty
+ # rare (k shares with bitflips in the enc_privkey but not in the
+ # data blocks). If we actually do get here, the subsequent repair
+ # will fail for lack of a privkey.
+ self.log("got k shares but still need_privkey, bummer",
+ level=log.WEIRD, umid="MdRHPA")
# we have enough to finish. All the shares have had their hashes
# checked, so if something fails at this point, we don't know how
@ivar connections: maps peerid to a RemoteReference
- @ivar bad_shares: a sequence of (peerid, shnum) tuples, describing
+ @ivar bad_shares: dict with keys of (peerid, shnum) tuples, describing
shares that I should ignore (because a previous user of
the servermap determined that they were invalid). The
updater only locates a certain number of shares: if
some of these turn out to have integrity problems and
are unusable, the caller will need to mark those shares
as bad, then re-update the servermap, then try again.
+ The dict maps (peerid, shnum) tuple to old checkstring.
"""
def __init__(self):
self._need_privkey = False
if mode == MODE_WRITE and not self._node._privkey:
self._need_privkey = True
+ # check+repair: repair requires the privkey, so if we didn't happen
+ # to ask for it during the check, we'll have problems doing the
+ # publish.
prefix = storage.si_b2a(self._storage_index)[:5]
self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
shid_re = (r"Corrupt Shares:\s+%s: block hash tree failure" %
self.corrupt_shareid)
self.failUnless(re.search(shid_re, out), out)
+ d.addCallback(_got_results)
+ # now make sure the webapi repairer can fix it
+ def _do_repair(res):
+ url = (self.webish_url +
+ "uri/%s" % urllib.quote(self.node.get_uri()) +
+ "?t=check&verify=true&repair=true")
+ return getPage(url, method="POST")
+ d.addCallback(_do_repair)
+ def _got_repair_results(out):
+ self.failUnless("Repair attempted and successful" in out)
+ d.addCallback(_got_repair_results)
+ d.addCallback(_do_check)
+ def _got_postrepair_results(out):
+ self.failIf("Not Healthy!" in out, out)
+ self.failUnless("Recoverable Versions: 10*seq" in out)
+ d.addCallback(_got_postrepair_results)
+
+ return d
+
+ def test_delete_share(self):
+ self.basedir = self.mktemp()
+ d = self.set_up_nodes()
+ CONTENTS = "a little bit of data"
+ d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
+ def _created(node):
+ self.node = node
+ si = self.node.get_storage_index()
+ out = self._run_cli(["debug", "find-shares", base32.b2a(si),
+ self.clients[1].basedir])
+ files = out.split("\n")
+ # corrupt one of them, using the CLI debug command
+ f = files[0]
+ shnum = os.path.basename(f)
+ nodeid = self.clients[1].nodeid
+ nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
+ self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
+ os.unlink(files[0])
+ d.addCallback(_created)
+ # now make sure the webapi checker notices it
+ def _do_check(res):
+ url = (self.webish_url +
+ "uri/%s" % urllib.quote(self.node.get_uri()) +
+ "?t=check&verify=false")
+ return getPage(url, method="POST")
+ d.addCallback(_do_check)
+ def _got_results(out):
+ self.failUnless("Not Healthy!" in out, out)
+ self.failUnless("Unhealthy: best recoverable version has only 9 shares (encoding is 3-of-10)" in out, out)
+ self.failIf("Corrupt Shares" in out, out)
d.addCallback(_got_results)
+
+ # now make sure the webapi repairer can fix it
+ def _do_repair(res):
+ url = (self.webish_url +
+ "uri/%s" % urllib.quote(self.node.get_uri()) +
+ "?t=check&verify=false&repair=true")
+ return getPage(url, method="POST")
+ d.addCallback(_do_repair)
+ def _got_repair_results(out):
+ self.failUnless("Repair attempted and successful" in out)
+ d.addCallback(_got_repair_results)
+ d.addCallback(_do_check)
+ def _got_postrepair_results(out):
+ self.failIf("Not Healthy!" in out, out)
+ self.failUnless("Recoverable Versions: 10*seq" in out)
+ d.addCallback(_got_postrepair_results)
+
return d