self.progress = 0.0
self.counter = self.statusid_counter.next()
self.started = time.time()
+ self.finished = None
- def add_per_server_time(self, peerid, op, elapsed):
- assert op in ("query", "privkey")
+ def add_per_server_time(self, peerid, op, sent, elapsed):
+ assert op in ("query", "late", "privkey")
if peerid not in self.timings["per_server"]:
self.timings["per_server"][peerid] = []
- self.timings["per_server"][peerid].append((op,elapsed))
+ self.timings["per_server"][peerid].append((op,sent,elapsed))
def get_started(self):
return self.started
+ def get_finished(self):
+ return self.finished
def get_storage_index(self):
return self.storage_index
def get_mode(self):
self.progress = value
def set_active(self, value):
self.active = value
+ def set_finished(self, when):
+ self.finished = when
class ServerMap:
"""I record the placement of mutable shares.
(idlib.shortnodeid_b2a(peerid), shnum,
seqnum, base32.b2a(root_hash)[:4], k, N,
datalength))
+ if self.problems:
+ print >>out, "%d PROBLEMS" % len(self.problems)
+ for f in self.problems:
+ print >>out, str(f)
return out
def all_peers(self):
(verinfo, timestamp) = self.servermap[key]
return verinfo
return None
- return None
def shares_available(self):
"""Return a dict that maps verinfo to tuples of
# Return a dict of versionid -> health, for versions that are
# unrecoverable and have later seqnums than any recoverable versions.
# These indicate that a write will lose data.
- pass
+ versionmap = self.make_versionmap()
+ healths = {} # maps verinfo to (found,k)
+ unrecoverable = set()
+ highest_recoverable_seqnum = -1
+ for (verinfo, shares) in versionmap.items():
+ (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+ offsets_tuple) = verinfo
+ shnums = set([shnum for (shnum, peerid, timestamp) in shares])
+ healths[verinfo] = (len(shnums),k)
+ if len(shnums) < k:
+ unrecoverable.add(verinfo)
+ else:
+ highest_recoverable_seqnum = max(seqnum,
+ highest_recoverable_seqnum)
+
+ newversions = {}
+ for verinfo in unrecoverable:
+ (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+ offsets_tuple) = verinfo
+ if seqnum > highest_recoverable_seqnum:
+ newversions[verinfo] = healths[verinfo]
+
+ return newversions
+
def needs_merge(self):
# return True if there are multiple recoverable versions with the
# same seqnum, meaning that MutableFileNode.read_best_version is not
# giving you the whole story, and that using its data to do a
# subsequent publish will lose information.
- pass
+ return bool(len(self.recoverable_versions()) > 1)
+
class ServermapUpdater:
def __init__(self, filenode, servermap, mode=MODE_READ):
level=log.NOISY)
now = time.time()
elapsed = now - started
- self._status.add_per_server_time(peerid, "query", elapsed)
self._queries_outstanding.discard(peerid)
self._must_query.discard(peerid)
self._queries_completed += 1
if not self._running:
self.log("but we're not running, so we'll ignore it", parent=lp)
+ self._status.add_per_server_time(peerid, "late", started, elapsed)
return
+ self._status.add_per_server_time(peerid, "query", started, elapsed)
if datavs:
self._good_peers.add(peerid)
pubkey, signature, share_hash_chain, block_hash_tree,
share_data, enc_privkey) = r
- return self._try_to_validate_privkey(self, enc_privkey, peerid, shnum)
+ return self._try_to_validate_privkey(enc_privkey, peerid, shnum)
def _try_to_validate_privkey(self, enc_privkey, peerid, shnum):
def _got_privkey_results(self, datavs, peerid, shnum, started):
now = time.time()
elapsed = now - started
- self._status.add_per_server_time(peerid, "privkey", elapsed)
+ self._status.add_per_server_time(peerid, "privkey", started, elapsed)
self._queries_outstanding.discard(peerid)
if not self._need_privkey:
return
if not self._running:
return
self._running = False
- elapsed = time.time() - self._started
+ now = time.time()
+ elapsed = now - self._started
+ self._status.set_finished(now)
self._status.timings["total"] = elapsed
self._status.set_progress(1.0)
self._status.set_status("Done")