self.progress = 0.0
self.counter = self.statusid_counter.next()
self.started = time.time()
+ self.finished = None
- def add_per_server_time(self, peerid, op, elapsed):
- assert op in ("query", "privkey")
+ def add_per_server_time(self, peerid, op, sent, elapsed):
+ assert op in ("query", "late", "privkey")
if peerid not in self.timings["per_server"]:
self.timings["per_server"][peerid] = []
- self.timings["per_server"][peerid].append((op,elapsed))
+ self.timings["per_server"][peerid].append((op,sent,elapsed))
def get_started(self):
return self.started
+ def get_finished(self):
+ return self.finished
def get_storage_index(self):
return self.storage_index
def get_mode(self):
self.progress = value
def set_active(self, value):
self.active = value
+ def set_finished(self, when):
+ self.finished = when
class ServerMap:
"""I record the placement of mutable shares.
(idlib.shortnodeid_b2a(peerid), shnum,
seqnum, base32.b2a(root_hash)[:4], k, N,
datalength))
+ if self.problems:
+ print >>out, "%d PROBLEMS" % len(self.problems)
+ for f in self.problems:
+ print >>out, str(f)
return out
def all_peers(self):
(verinfo, timestamp) = self.servermap[key]
return verinfo
return None
- return None
def shares_available(self):
"""Return a dict that maps verinfo to tuples of
# Return a dict of versionid -> health, for versions that are
# unrecoverable and have later seqnums than any recoverable versions.
# These indicate that a write will lose data.
- pass
+ versionmap = self.make_versionmap()
+ healths = {} # maps verinfo to (found,k)
+ unrecoverable = set()
+ highest_recoverable_seqnum = -1
+ for (verinfo, shares) in versionmap.items():
+ (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+ offsets_tuple) = verinfo
+ shnums = set([shnum for (shnum, peerid, timestamp) in shares])
+ healths[verinfo] = (len(shnums),k)
+ if len(shnums) < k:
+ unrecoverable.add(verinfo)
+ else:
+ highest_recoverable_seqnum = max(seqnum,
+ highest_recoverable_seqnum)
+
+ newversions = {}
+ for verinfo in unrecoverable:
+ (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+ offsets_tuple) = verinfo
+ if seqnum > highest_recoverable_seqnum:
+ newversions[verinfo] = healths[verinfo]
+
+ return newversions
+
def needs_merge(self):
# return True if there are multiple recoverable versions with the
# same seqnum, meaning that MutableFileNode.read_best_version is not
# giving you the whole story, and that using its data to do a
# subsequent publish will lose information.
- pass
+ return bool(len(self.recoverable_versions()) > 1)
+
class ServermapUpdater:
def __init__(self, filenode, servermap, mode=MODE_READ):
level=log.NOISY)
now = time.time()
elapsed = now - started
- self._status.add_per_server_time(peerid, "query", elapsed)
self._queries_outstanding.discard(peerid)
self._must_query.discard(peerid)
self._queries_completed += 1
if not self._running:
self.log("but we're not running, so we'll ignore it", parent=lp)
+ self._status.add_per_server_time(peerid, "late", started, elapsed)
return
+ self._status.add_per_server_time(peerid, "query", started, elapsed)
if datavs:
self._good_peers.add(peerid)
pubkey, signature, share_hash_chain, block_hash_tree,
share_data, enc_privkey) = r
- return self._try_to_validate_privkey(self, enc_privkey, peerid, shnum)
+ return self._try_to_validate_privkey(enc_privkey, peerid, shnum)
def _try_to_validate_privkey(self, enc_privkey, peerid, shnum):
def _got_privkey_results(self, datavs, peerid, shnum, started):
now = time.time()
elapsed = now - started
- self._status.add_per_server_time(peerid, "privkey", elapsed)
+ self._status.add_per_server_time(peerid, "privkey", started, elapsed)
self._queries_outstanding.discard(peerid)
if not self._need_privkey:
return
if not self._running:
return
self._running = False
- elapsed = time.time() - self._started
+ now = time.time()
+ elapsed = now - self._started
+ self._status.set_finished(now)
self._status.timings["total"] = elapsed
self._status.set_progress(1.0)
self._status.set_status("Done")
d = self._client.create_mutable_file("New contents go here")
def _created(node):
self._fn = node
+ self._fn2 = self._client.create_node_from_uri(node.get_uri())
d.addCallback(_created)
return d
- def make_servermap(self, mode=MODE_CHECK):
- smu = ServermapUpdater(self._fn, ServerMap(), mode)
+ def make_servermap(self, mode=MODE_CHECK, fn=None):
+ if fn is None:
+ fn = self._fn
+ smu = ServermapUpdater(fn, ServerMap(), mode)
d = smu.update()
return d
d.addCallback(lambda sm: us(sm, mode=MODE_READ))
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
d.addCallback(lambda sm: us(sm, mode=MODE_WRITE))
+ d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
d.addCallback(lambda sm: us(sm, mode=MODE_CHECK))
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
d.addCallback(lambda sm: us(sm, mode=MODE_ANYTHING))
return d
+ def test_fetch_privkey(self):
+ d = defer.succeed(None)
+ # use the sibling filenode (which hasn't been used yet), and make
+ # sure it can fetch the privkey. The file is small, so the privkey
+ # will be fetched on the first (query) pass.
+ d.addCallback(lambda res: self.make_servermap(MODE_WRITE, self._fn2))
+ d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
+
+ # create a new file, which is large enough to knock the privkey out
+ # of the early part of the fil
+ LARGE = "These are Larger contents" * 200 # about 5KB
+ d.addCallback(lambda res: self._client.create_mutable_file(LARGE))
+ def _created(large_fn):
+ large_fn2 = self._client.create_node_from_uri(large_fn.get_uri())
+ return self.make_servermap(MODE_WRITE, large_fn2)
+ d.addCallback(_created)
+ d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
+ return d
+
def test_mark_bad(self):
d = defer.succeed(None)
ms = self.make_servermap
self.failUnlessEqual(best, None)
self.failUnlessEqual(len(sm.shares_available()), 1)
self.failUnlessEqual(sm.shares_available().values()[0], (2,3) )
+ return sm
def test_not_quite_enough_shares(self):
s = self._client._storage
d.addCallback(lambda res: ms(mode=MODE_CHECK))
d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
+ d.addCallback(lambda sm:
+ self.failUnlessEqual(len(sm.make_sharemap()), 2))
d.addCallback(lambda res: ms(mode=MODE_ANYTHING))
d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
d.addCallback(lambda res: ms(mode=MODE_WRITE))
if substring:
allproblems = [str(f) for f in servermap.problems]
self.failUnless(substring in "".join(allproblems))
- return
+ return servermap
if should_succeed:
d1 = self._fn.download_best_version()
d1.addCallback(lambda new_contents:
self.failUnlessEqual(new_contents, self.CONTENTS))
- return d1
else:
- return self.shouldFail(NotEnoughSharesError,
- "_corrupt_all(offset=%s)" % (offset,),
- substring,
- self._fn.download_best_version)
+ d1 = self.shouldFail(NotEnoughSharesError,
+ "_corrupt_all(offset=%s)" % (offset,),
+ substring,
+ self._fn.download_best_version)
+ d1.addCallback(lambda res: servermap)
+ return d1
d.addCallback(_do_retrieve)
return d
def test_corrupt_all_verbyte(self):
# when the version byte is not 0, we hit an assertion error in
# unpack_share().
- return self._test_corrupt_all(0, "AssertionError")
+ d = self._test_corrupt_all(0, "AssertionError")
+ def _check_servermap(servermap):
+ # and the dump should mention the problems
+ s = StringIO()
+ dump = servermap.dump(s).getvalue()
+ self.failUnless("10 PROBLEMS" in dump, dump)
+ d.addCallback(_check_servermap)
+ return d
def test_corrupt_all_seqnum(self):
# a corrupt sequence number will trigger a bad signature
fn2._pubkey = fn._pubkey
fn2._privkey = fn._privkey
fn2._encprivkey = fn._encprivkey
- fn2._current_seqnum = 0
- fn2._current_roothash = "\x00" * 32
# and set the encoding parameters to something completely different
fn2._required_shares = k
fn2._total_shares = n
d.addCallback(_retrieved)
return d
+class MultipleVersions(unittest.TestCase):
+ def setUp(self):
+ self.CONTENTS = ["Contents 0",
+ "Contents 1",
+ "Contents 2",
+ "Contents 3a",
+ "Contents 3b"]
+ self._copied_shares = {}
+ num_peers = 20
+ self._client = FakeClient(num_peers)
+ self._storage = self._client._storage
+ d = self._client.create_mutable_file(self.CONTENTS[0]) # seqnum=1
+ def _created(node):
+ self._fn = node
+ # now create multiple versions of the same file, and accumulate
+ # their shares, so we can mix and match them later.
+ d = defer.succeed(None)
+ d.addCallback(self._copy_shares, 0)
+ d.addCallback(lambda res: node.overwrite(self.CONTENTS[1])) #s2
+ d.addCallback(self._copy_shares, 1)
+ d.addCallback(lambda res: node.overwrite(self.CONTENTS[2])) #s3
+ d.addCallback(self._copy_shares, 2)
+ d.addCallback(lambda res: node.overwrite(self.CONTENTS[3])) #s4a
+ d.addCallback(self._copy_shares, 3)
+ # now we replace all the shares with version s3, and upload a new
+ # version to get s4b.
+ rollback = dict([(i,2) for i in range(10)])
+ d.addCallback(lambda res: self._set_versions(rollback))
+ d.addCallback(lambda res: node.overwrite(self.CONTENTS[4])) #s4b
+ d.addCallback(self._copy_shares, 4)
+ # we leave the storage in state 4
+ return d
+ d.addCallback(_created)
+ return d
+
+ def _copy_shares(self, ignored, index):
+ shares = self._client._storage._peers
+ # we need a deep copy
+ new_shares = {}
+ for peerid in shares:
+ new_shares[peerid] = {}
+ for shnum in shares[peerid]:
+ new_shares[peerid][shnum] = shares[peerid][shnum]
+ self._copied_shares[index] = new_shares
+
+ def _set_versions(self, versionmap):
+ # versionmap maps shnums to which version (0,1,2,3,4) we want the
+ # share to be at. Any shnum which is left out of the map will stay at
+ # its current version.
+ shares = self._client._storage._peers
+ oldshares = self._copied_shares
+ for peerid in shares:
+ for shnum in shares[peerid]:
+ if shnum in versionmap:
+ index = versionmap[shnum]
+ shares[peerid][shnum] = oldshares[index][peerid][shnum]
+
+ def test_multiple_versions(self):
+ # if we see a mix of versions in the grid, download_best_version
+ # should get the latest one
+ self._set_versions(dict([(i,2) for i in (0,2,4,6,8)]))
+ d = self._fn.download_best_version()
+ d.addCallback(lambda res: self.failUnlessEqual(res, self.CONTENTS[4]))
+ # but if everything is at version 2, that's what we should download
+ d.addCallback(lambda res:
+ self._set_versions(dict([(i,2) for i in range(10)])))
+ d.addCallback(lambda res: self._fn.download_best_version())
+ d.addCallback(lambda res: self.failUnlessEqual(res, self.CONTENTS[2]))
+ # if exactly one share is at version 3, we should still get v2
+ d.addCallback(lambda res:
+ self._set_versions({0:3}))
+ d.addCallback(lambda res: self._fn.download_best_version())
+ d.addCallback(lambda res: self.failUnlessEqual(res, self.CONTENTS[2]))
+ # but the servermap should see the unrecoverable version. This
+ # depends upon the single newer share being queried early.
+ d.addCallback(lambda res: self._fn.get_servermap(MODE_READ))
+ def _check_smap(smap):
+ self.failUnlessEqual(len(smap.unrecoverable_versions()), 1)
+ newer = smap.unrecoverable_newer_versions()
+ self.failUnlessEqual(len(newer), 1)
+ verinfo, health = newer.items()[0]
+ self.failUnlessEqual(verinfo[0], 4)
+ self.failUnlessEqual(health, (1,3))
+ self.failIf(smap.needs_merge())
+ d.addCallback(_check_smap)
+ # if we have a mix of two parallel versions (s4a and s4b), we could
+ # recover either
+ d.addCallback(lambda res:
+ self._set_versions({0:3,2:3,4:3,6:3,8:3,
+ 1:4,3:4,5:4,7:4,9:4}))
+ d.addCallback(lambda res: self._fn.get_servermap(MODE_READ))
+ def _check_smap_mixed(smap):
+ self.failUnlessEqual(len(smap.unrecoverable_versions()), 0)
+ newer = smap.unrecoverable_newer_versions()
+ self.failUnlessEqual(len(newer), 0)
+ self.failUnless(smap.needs_merge())
+ d.addCallback(_check_smap_mixed)
+ d.addCallback(lambda res: self._fn.download_best_version())
+ d.addCallback(lambda res: self.failUnless(res == self.CONTENTS[3] or
+ res == self.CONTENTS[4]))
+ return d
+
class Utils(unittest.TestCase):
def test_dict_of_sets(self):