from allmydata.util import idlib
-MODE_CHECK = "query all peers"
-MODE_ANYTHING = "one recoverable version"
-MODE_WRITE = "replace all shares, probably" # not for initial creation
-MODE_ENOUGH = "enough"
+MODE_CHECK = "MODE_CHECK" # query all peers
+MODE_ANYTHING = "MODE_ANYTHING" # one recoverable version
+MODE_WRITE = "MODE_WRITE" # replace all shares, probably.. not for initial
+ # creation
+MODE_READ = "MODE_READ"
class NotMutableError(Exception):
pass
from pycryptopp.cipher.aes import AES
from publish import Publish
-from common import MODE_ENOUGH, MODE_WRITE, UnrecoverableFileError, \
+from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \
ResponseCache
from servermap import ServerMap, ServermapUpdater
from retrieve import Retrieve
# methods exposed to the higher-layer application
- def update_servermap(self, old_map=None, mode=MODE_ENOUGH):
+ def update_servermap(self, old_map=None, mode=MODE_READ):
servermap = old_map or ServerMap()
d = self.obtain_lock()
d.addCallback(lambda res:
d.addCallback(_done)
return d
- def _update_and_retrieve_best(self, old_map=None, mode=MODE_ENOUGH):
+ def _update_and_retrieve_best(self, old_map=None, mode=MODE_READ):
d = self.update_servermap(old_map=old_map, mode=mode)
def _updated(smap):
goal = smap.best_recoverable_version()
if isinstance(res, failure.Failure):
self.log("Retrieve done, with failure", failure=res)
else:
- self.log("Retrieve done, success!: res=%s" % (res,))
+ self.log("Retrieve done, success!")
# remember the encoding parameters, use them again next time
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.verinfo
from allmydata import storage
from pycryptopp.publickey import rsa
-from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_ENOUGH, \
+from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
DictOfSets, CorruptShareError, NeedMoreDataError
from layout import unpack_prefix_and_signature, unpack_header, unpack_share
seqnums.append(0)
return max(seqnums)
+ def summarize_versions(self):
+ """Return a string describing which versions we know about."""
+ versionmap = self.make_versionmap()
+ bits = []
+ for (verinfo, shares) in versionmap.items():
+ (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+ offsets_tuple) = verinfo
+ shnums = set([shnum for (shnum, peerid, timestamp) in shares])
+ bits.append("%d*seq%d-%s" %
+ (len(shnums), seqnum, base32.b2a(root_hash)[:4]))
+ return "/".join(bits)
+
def recoverable_versions(self):
"""Return a set of versionids, one for each version that is currently
recoverable."""
pass
class ServermapUpdater:
- def __init__(self, filenode, servermap, mode=MODE_ENOUGH):
+ def __init__(self, filenode, servermap, mode=MODE_READ):
"""I update a servermap, locating a sufficient number of useful
shares and remembering where they are located.
self._need_privkey = True
prefix = storage.si_b2a(self._storage_index)[:5]
- self._log_number = log.msg("SharemapUpdater(%s): starting" % prefix)
+ self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
+ si=prefix, mode=mode)
def log(self, *args, **kwargs):
if "parent" not in kwargs:
if self.mode == MODE_ANYTHING:
if recoverable_versions:
- self.log("MODE_ANYTHING and %d recoverable versions: done"
+ self.log("%d recoverable versions: done"
% len(recoverable_versions),
parent=lp)
return self._done()
if self.mode == MODE_CHECK:
# we used self._must_query, and we know there aren't any
# responses still waiting, so that means we must be done
- self.log("MODE_CHECK: done",
- parent=lp)
+ self.log("done", parent=lp)
return self._done()
MAX_IN_FLIGHT = 5
- if self.mode == MODE_ENOUGH:
+ if self.mode == MODE_READ:
# if we've queried k+epsilon servers, and we see a recoverable
# version, and we haven't seen any unrecoverable higher-seqnum'ed
# versions, then we're done.
if self._queries_completed < self.num_peers_to_query:
- self.log(format="ENOUGH, %(completed)d completed, %(query)d to query: need more",
+ self.log(format="%(completed)d completed, %(query)d to query: need more",
completed=self._queries_completed,
query=self.num_peers_to_query,
parent=lp)
return self._send_more_queries(MAX_IN_FLIGHT)
if not recoverable_versions:
- self.log("ENOUGH, no recoverable versions: need more",
+ self.log("no recoverable versions: need more",
parent=lp)
return self._send_more_queries(MAX_IN_FLIGHT)
highest_recoverable = max(recoverable_versions)
# don't yet see enough shares to recover it. Try harder.
# TODO: consider sending more queries.
# TODO: consider limiting the search distance
- self.log("ENOUGH, evidence of higher seqnum: need more")
+ self.log("evidence of higher seqnum: need more")
return self._send_more_queries(MAX_IN_FLIGHT)
# all the unrecoverable versions were old or concurrent with a
# recoverable version. Good enough.
- self.log("ENOUGH: no higher-seqnum: done",
- parent=lp)
+ self.log("no higher-seqnum: done", parent=lp)
return self._done()
if self.mode == MODE_WRITE:
# every server in the world.
if not recoverable_versions:
- self.log("WRITE, no recoverable versions: need more",
- parent=lp)
+ self.log("no recoverable versions: need more", parent=lp)
return self._send_more_queries(MAX_IN_FLIGHT)
last_found = -1
if last_found != -1:
num_not_found += 1
if num_not_found >= self.EPSILON:
- self.log("MODE_WRITE: found our boundary, %s" %
+ self.log("found our boundary, %s" %
"".join(states),
parent=lp)
found_boundary = True
# if we hit here, we didn't find our boundary, so we're still
# waiting for peers
- self.log("MODE_WRITE: no boundary yet, %s" % "".join(states),
- parent=lp)
+ self.log("no boundary yet, %s" % "".join(states), parent=lp)
return self._send_more_queries(MAX_IN_FLIGHT)
# otherwise, keep up to 5 queries in flight. TODO: this is pretty
self._servermap.last_update_mode = self.mode
self._servermap.last_update_time = self._started
# the servermap will not be touched after this
+ self.log("servermap: %s" % self._servermap.summarize_versions())
eventually(self._done_deferred.callback, self._servermap)
def _fatal_error(self, f):
from allmydata.mutable.node import MutableFileNode
from allmydata.mutable.common import DictOfSets, ResponseCache, \
- MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_ENOUGH, UnrecoverableFileError
+ MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, UnrecoverableFileError
from allmydata.mutable.retrieve import Retrieve
from allmydata.mutable.publish import Publish
from allmydata.mutable.servermap import ServerMap, ServermapUpdater
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
d.addCallback(lambda res: ms(mode=MODE_WRITE))
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
- d.addCallback(lambda res: ms(mode=MODE_ENOUGH))
+ d.addCallback(lambda res: ms(mode=MODE_READ))
# this more stops at k+epsilon, and epsilon=k, so 6 shares
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
d.addCallback(lambda res: ms(mode=MODE_ANYTHING))
# increasing order of number of servers queried, since once a server
# gets into the servermap, we'll always ask it for an update.
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 3))
- d.addCallback(lambda sm: us(sm, mode=MODE_ENOUGH))
+ d.addCallback(lambda sm: us(sm, mode=MODE_READ))
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
d.addCallback(lambda sm: us(sm, mode=MODE_WRITE))
d.addCallback(lambda sm: us(sm, mode=MODE_CHECK))
ms = self.make_servermap
us = self.update_servermap
- d.addCallback(lambda res: ms(mode=MODE_ENOUGH))
+ d.addCallback(lambda res: ms(mode=MODE_READ))
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
def _made_map(sm):
v = sm.best_recoverable_version()
d.addCallback(lambda res: ms(mode=MODE_WRITE))
d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
- d.addCallback(lambda res: ms(mode=MODE_ENOUGH))
+ d.addCallback(lambda res: ms(mode=MODE_READ))
d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
return d
d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
d.addCallback(lambda res: ms(mode=MODE_WRITE))
d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
- d.addCallback(lambda res: ms(mode=MODE_ENOUGH))
+ d.addCallback(lambda res: ms(mode=MODE_READ))
d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
return d
d.addCallback(_created)
return d
- def make_servermap(self, mode=MODE_ENOUGH, oldmap=None):
+ def make_servermap(self, mode=MODE_READ, oldmap=None):
if oldmap is None:
oldmap = ServerMap()
smu = ServermapUpdater(self._fn, oldmap, mode)
def test_corrupt_some(self):
# corrupt the data of first five shares (so the servermap thinks
# they're good but retrieve marks them as bad), so that the
- # MODE_ENOUGH set of 6 will be insufficient, forcing node.download to
+ # MODE_READ set of 6 will be insufficient, forcing node.download to
# retry with more servers.
corrupt(None, self._storage, "share_data", range(5))
d = self.make_servermap()
d.addCallback(_published)
return d
- def make_servermap(self, mode=MODE_ENOUGH, oldmap=None):
+ def make_servermap(self, mode=MODE_READ, oldmap=None):
if oldmap is None:
oldmap = ServerMap()
smu = ServermapUpdater(self._fn, oldmap, mode)