From 749c42fa2c7a0648983351a313be4648086698e4 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Wed, 16 Apr 2008 17:49:06 -0700 Subject: [PATCH] mutable WIP: re-enable publish/retrieve status --- src/allmydata/client.py | 8 +- src/allmydata/mutable/node.py | 84 ++++++++++---------- src/allmydata/mutable/publish.py | 101 +++++++++++++++++------- src/allmydata/mutable/retrieve.py | 51 ++++++++++-- src/allmydata/mutable/servermap.py | 7 ++ src/allmydata/test/test_keygen.py | 1 + src/allmydata/web/publish-status.xhtml | 7 -- src/allmydata/web/retrieve-status.xhtml | 7 +- src/allmydata/web/status.py | 44 ++--------- 9 files changed, 176 insertions(+), 134 deletions(-) diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 134ed9fd..2cd5d484 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -300,10 +300,10 @@ class Client(node.Node, testutil.PollMixin): assert IMutableFileURI.providedBy(u), u return MutableFileNode(self).init_from_uri(u) - def notify_publish(self, p): - self.getServiceNamed("mutable-watcher").notify_publish(p) - def notify_retrieve(self, r): - self.getServiceNamed("mutable-watcher").notify_retrieve(r) + def notify_publish(self, publish_status): + self.getServiceNamed("mutable-watcher").notify_publish(publish_status) + def notify_retrieve(self, retrieve_status): + self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status) def create_empty_dirnode(self): n = NewDirectoryNode(self) diff --git a/src/allmydata/mutable/node.py b/src/allmydata/mutable/node.py index 52389297..c19e0396 100644 --- a/src/allmydata/mutable/node.py +++ b/src/allmydata/mutable/node.py @@ -22,8 +22,6 @@ from retrieve import Retrieve class MutableFileNode: implements(IMutableFileNode) - publish_class = Publish - retrieve_class = Retrieve SIGNATURE_KEY_SIZE = 2048 DEFAULT_ENCODING = (3, 10) @@ -90,7 +88,7 @@ class MutableFileNode: # nobody knows about us yet" self._current_seqnum = 0 self._current_roothash = "\x00"*32 - return self._publish(initial_contents) + return self._publish(None, initial_contents) d.addCallback(_generated) return d @@ -225,17 +223,13 @@ class MutableFileNode: def download_version(self, servermap, versionid): """Returns a Deferred that fires with a string.""" d = self.obtain_lock() - d.addCallback(lambda res: - Retrieve(self, servermap, versionid).download()) + d.addCallback(lambda res: self._retrieve(servermap, versionid)) d.addBoth(self.release_lock) return d - def publish(self, servermap, newdata): - assert self._pubkey, "update_servermap must be called before publish" + def publish(self, servermap, new_contents): d = self.obtain_lock() - d.addCallback(lambda res: Publish(self, servermap).publish(newdata)) - # p = self.publish_class(self) - # self._client.notify_publish(p) + d.addCallback(lambda res: self._publish(servermap, new_contents)) d.addBoth(self.release_lock) return d @@ -269,16 +263,11 @@ class MutableFileNode: verifier = self.get_verifier() return self._client.getServiceNamed("checker").check(verifier) - def download(self, target): - # fake it. TODO: make this cleaner. - d = self.download_to_data() - def _done(data): - target.open(len(data)) - target.write(data) - target.close() - return target.finish() - d.addCallback(_done) - return d + + def _retrieve(self, servermap, verinfo): + r = Retrieve(self, servermap, verinfo) + self._client.notify_retrieve(r.get_status()) + return r.download() def _update_and_retrieve_best(self, old_map=None, mode=MODE_READ): d = self.update_servermap(old_map=old_map, mode=mode) @@ -306,22 +295,33 @@ class MutableFileNode: d.addBoth(self.release_lock) return d - def _publish(self, initial_contents): - p = Publish(self, None) - d = p.publish(initial_contents) - d.addCallback(lambda res: self) + def download(self, target): + # fake it. TODO: make this cleaner. + d = self.download_to_data() + def _done(data): + target.open(len(data)) + target.write(data) + target.close() + return target.finish() + d.addCallback(_done) return d - def update(self, newdata): + + def _publish(self, servermap, new_contents): + assert self._pubkey, "update_servermap must be called before publish" + p = Publish(self, servermap) + self._client.notify_publish(p.get_status()) + return p.publish(new_contents) + + def update(self, new_contents): d = self.obtain_lock() d.addCallback(lambda res: self.update_servermap(mode=MODE_WRITE)) - d.addCallback(lambda smap: - Publish(self, smap).publish(newdata)) + d.addCallback(self._publish, new_contents) d.addBoth(self.release_lock) return d - def overwrite(self, newdata): - return self.update(newdata) + def overwrite(self, new_contents): + return self.update(new_contents) class MutableWatcher(service.MultiService): @@ -332,42 +332,40 @@ class MutableWatcher(service.MultiService): def __init__(self, stats_provider=None): service.MultiService.__init__(self) self.stats_provider = stats_provider - self._all_publish = weakref.WeakKeyDictionary() + self._all_publish_status = weakref.WeakKeyDictionary() self._recent_publish_status = [] - self._all_retrieve = weakref.WeakKeyDictionary() + self._all_retrieve_status = weakref.WeakKeyDictionary() self._recent_retrieve_status = [] def notify_publish(self, p): - self._all_publish[p] = None - self._recent_publish_status.append(p.get_status()) + self._all_publish_status[p] = None + self._recent_publish_status.append(p) if self.stats_provider: self.stats_provider.count('mutable.files_published', 1) - #self.stats_provider.count('mutable.bytes_published', p._node.get_size()) + self.stats_provider.count('mutable.bytes_published', p.get_size()) while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES: self._recent_publish_status.pop(0) def list_all_publish(self): - return self._all_publish.keys() + return self._all_publish_status.keys() def list_active_publish(self): - return [p.get_status() for p in self._all_publish.keys() - if p.get_status().get_active()] + return [p for p in self._all_publish_status.keys() if p.get_active()] def list_recent_publish(self): return self._recent_publish_status def notify_retrieve(self, r): - self._all_retrieve[r] = None - self._recent_retrieve_status.append(r.get_status()) + self._all_retrieve_status[r] = None + self._recent_retrieve_status.append(r) if self.stats_provider: self.stats_provider.count('mutable.files_retrieved', 1) - #self.stats_provider.count('mutable.bytes_retrieved', r._node.get_size()) + self.stats_provider.count('mutable.bytes_retrieved', r.get_size()) while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES: self._recent_retrieve_status.pop(0) def list_all_retrieve(self): - return self._all_retrieve.keys() + return self._all_retrieve_status.keys() def list_active_retrieve(self): - return [p.get_status() for p in self._all_retrieve.keys() - if p.get_status().get_active()] + return [p for p in self._all_retrieve_status.keys() if p.get_active()] def list_recent_retrieve(self): return self._recent_retrieve_status diff --git a/src/allmydata/mutable/publish.py b/src/allmydata/mutable/publish.py index b53c639a..acbe5cee 100644 --- a/src/allmydata/mutable/publish.py +++ b/src/allmydata/mutable/publish.py @@ -4,10 +4,12 @@ import os, struct, time from itertools import count from zope.interface import implements from twisted.internet import defer +from twisted.python import failure from allmydata.interfaces import IPublishStatus from allmydata.util import base32, hashutil, mathutil, idlib, log from allmydata import hashtree, codec, storage from pycryptopp.cipher.aes import AES +from foolscap.eventual import eventually from common import MODE_WRITE, UncoordinatedWriteError, DictOfSets from servermap import ServerMap @@ -19,27 +21,23 @@ class PublishStatus: statusid_counter = count(0) def __init__(self): self.timings = {} - self.timings["per_server"] = {} - self.privkey_from = None - self.peers_queried = None - self.sharemap = None # DictOfSets + self.timings["send_per_server"] = {} + self.servermap = None self.problems = {} self.active = True self.storage_index = None self.helper = False self.encoding = ("?", "?") - self.initial_read_size = None self.size = None self.status = "Not started" self.progress = 0.0 self.counter = self.statusid_counter.next() self.started = time.time() - def add_per_server_time(self, peerid, op, elapsed): - assert op in ("read", "write") - if peerid not in self.timings["per_server"]: - self.timings["per_server"][peerid] = [] - self.timings["per_server"][peerid].append((op,elapsed)) + def add_per_server_time(self, peerid, elapsed): + if peerid not in self.timings["send_per_server"]: + self.timings["send_per_server"][peerid] = [] + self.timings["send_per_server"][peerid].append(elapsed) def get_started(self): return self.started @@ -49,6 +47,8 @@ class PublishStatus: return self.encoding def using_helper(self): return self.helper + def get_servermap(self): + return self.servermap def get_size(self): return self.size def get_status(self): @@ -64,6 +64,8 @@ class PublishStatus: self.storage_index = si def set_helper(self, helper): self.helper = helper + def set_servermap(self, servermap): + self.servermap = servermap def set_encoding(self, k, n): self.encoding = (k, n) def set_size(self, size): @@ -102,6 +104,13 @@ class Publish: self._log_number = num self._running = True + self._status = PublishStatus() + self._status.set_storage_index(self._storage_index) + self._status.set_helper(False) + self._status.set_progress(0.0) + self._status.set_active(True) + self._status.set_servermap(servermap) + def log(self, *args, **kwargs): if 'parent' not in kwargs: kwargs['parent'] = self._log_number @@ -129,6 +138,9 @@ class Publish: # 5: when enough responses are back, we're done self.log("starting publish, datalen is %s" % len(newdata)) + self._status.set_size(len(newdata)) + self._status.set_status("Started") + self._started = time.time() self.done_deferred = defer.Deferred() @@ -160,6 +172,8 @@ class Publish: assert self.required_shares is not None self.total_shares = self._node.get_total_shares() assert self.total_shares is not None + self._status.set_encoding(self.required_shares, self.total_shares) + self._pubkey = self._node.get_pubkey() assert self._pubkey self._privkey = self._node.get_privkey() @@ -209,8 +223,13 @@ class Publish: # create the shares. We'll discard these as they are delivered. SMDF: # we're allowed to hold everything in memory. + self._status.timings["setup"] = time.time() - self._started d = self._encrypt_and_encode() d.addCallback(self._generate_shares) + def _start_pushing(res): + self._started_pushing = time.time() + return res + d.addCallback(_start_pushing) d.addCallback(self.loop) # trigger delivery d.addErrback(self._fatal_error) @@ -233,11 +252,22 @@ class Publish: self.log("error during loop", failure=f, level=log.SCARY) self._done(f) + def _update_status(self): + self._status.set_status("Sending Shares: %d placed out of %d, " + "%d messages outstanding" % + (len(self.placed), + len(self.goal), + len(self.outstanding))) + self._status.set_progress(1.0 * len(self.placed) / len(self.goal)) + def loop(self, ignored=None): self.log("entering loop", level=log.NOISY) + if not self._running: + return self.update_goal() # how far are we from our goal? needed = self.goal - self.placed - self.outstanding + self._update_status() if needed: # we need to send out new shares @@ -258,6 +288,9 @@ class Publish: # no queries outstanding, no placements needed: we're done self.log("no queries outstanding, no placements needed: done", level=log.OPERATIONAL) + now = time.time() + elapsed = now - self._started_pushing + self._status.timings["push"] = elapsed return self._done(None) def log_goal(self, goal): @@ -331,19 +364,21 @@ class Publish: # shares that we care about. self.log("_encrypt_and_encode") - #started = time.time() + self._status.set_status("Encrypting") + started = time.time() key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey) enc = AES(key) crypttext = enc.process(self.newdata) assert len(crypttext) == len(self.newdata) - #now = time.time() - #self._status.timings["encrypt"] = now - started - #started = now + now = time.time() + self._status.timings["encrypt"] = now - started + started = now # now apply FEC + self._status.set_status("Encoding") fec = codec.CRSEncoder() fec.set_params(self.segment_size, self.required_shares, self.total_shares) @@ -358,8 +393,8 @@ class Publish: d = fec.encode(crypttext_pieces) def _done_encoding(res): - #elapsed = time.time() - started - #self._status.timings["encode"] = elapsed + elapsed = time.time() - started + self._status.timings["encode"] = elapsed return res d.addCallback(_done_encoding) return d @@ -367,7 +402,8 @@ class Publish: def _generate_shares(self, shares_and_shareids): # this sets self.shares and self.root_hash self.log("_generate_shares") - #started = time.time() + self._status.set_status("Generating Shares") + started = time.time() # we should know these by now privkey = self._privkey @@ -413,9 +449,9 @@ class Publish: # then they all share the same encprivkey at the end. The sizes # of everything are the same for all shares. - #sign_started = time.time() + sign_started = time.time() signature = privkey.sign(prefix) - #self._status.timings["sign"] = time.time() - sign_started + self._status.timings["sign"] = time.time() - sign_started verification_key = pubkey.serialize() @@ -429,8 +465,8 @@ class Publish: all_shares[shnum], encprivkey) final_shares[shnum] = final_share - #elapsed = time.time() - started - #self._status.timings["pack"] = elapsed + elapsed = time.time() - started + self._status.timings["pack"] = elapsed self.shares = final_shares self.root_hash = root_hash @@ -449,7 +485,6 @@ class Publish: def _send_shares(self, needed): self.log("_send_shares") - #started = time.time() # we're finally ready to send out our shares. If we encounter any # surprises here, it's because somebody else is writing at the same @@ -547,6 +582,7 @@ class Publish: d.addErrback(self._fatal_error) dl.append(d) + self._update_status() return defer.DeferredList(dl) # purely for testing def _do_testreadwrite(self, peerid, secrets, @@ -568,6 +604,10 @@ class Publish: for shnum in shnums: self.outstanding.discard( (peerid, shnum) ) + now = time.time() + elapsed = now - started + self._status.add_per_server_time(peerid, elapsed) + wrote, read_data = answer if not wrote: @@ -650,13 +690,16 @@ class Publish: if not self._running: return self._running = False - #now = time.time() - #self._status.timings["total"] = now - self._started - #self._status.set_active(False) - #self._status.set_status("Done") - #self._status.set_progress(1.0) - self.done_deferred.callback(res) - return None + now = time.time() + self._status.timings["total"] = now - self._started + self._status.set_active(False) + if isinstance(res, failure.Failure): + self.log("Retrieve done, with failure", failure=res) + self._status.set_status("Failed") + else: + self._status.set_status("Done") + self._status.set_progress(1.0) + eventually(self.done_deferred.callback, res) def get_status(self): return self._status diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py index 4281ce90..00663f4c 100644 --- a/src/allmydata/mutable/retrieve.py +++ b/src/allmydata/mutable/retrieve.py @@ -21,13 +21,11 @@ class RetrieveStatus: self.timings = {} self.timings["fetch_per_server"] = {} self.timings["cumulative_verify"] = 0.0 - self.sharemap = {} self.problems = {} self.active = True self.storage_index = None self.helper = False self.encoding = ("?","?") - self.search_distance = None self.size = None self.status = "Not started" self.progress = 0.0 @@ -40,8 +38,6 @@ class RetrieveStatus: return self.storage_index def get_encoding(self): return self.encoding - def get_search_distance(self): - return self.search_distance def using_helper(self): return self.helper def get_size(self): @@ -55,14 +51,16 @@ class RetrieveStatus: def get_counter(self): return self.counter + def add_fetch_timing(self, peerid, elapsed): + if peerid not in self.timings["fetch_per_server"]: + self.timings["fetch_per_server"][peerid] = [] + self.timings["fetch_per_server"][peerid].append(elapsed) def set_storage_index(self, si): self.storage_index = si def set_helper(self, helper): self.helper = helper def set_encoding(self, k, n): self.encoding = (k, n) - def set_search_distance(self, value): - self.search_distance = value def set_size(self, size): self.size = size def set_status(self, status): @@ -99,6 +97,19 @@ class Retrieve: assert self._node._pubkey self.verinfo = verinfo + self._status = RetrieveStatus() + self._status.set_storage_index(self._storage_index) + self._status.set_helper(False) + self._status.set_progress(0.0) + self._status.set_active(True) + (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, + offsets_tuple) = self.verinfo + self._status.set_size(datalength) + self._status.set_encoding(k, N) + + def get_status(self): + return self._status + def log(self, *args, **kwargs): if "parent" not in kwargs: kwargs["parent"] = self._log_number @@ -106,6 +117,8 @@ class Retrieve: def download(self): self._done_deferred = defer.Deferred() + self._started = time.time() + self._status.set_status("Retrieving Shares") # first, which servers can we use? versionmap = self.servermap.make_versionmap() @@ -165,6 +178,7 @@ class Retrieve: self._outstanding_queries[m] = (peerid, shnum, started) # ask the cache first + got_from_cache = False datav = [] #for (offset, length) in readv: # (data, timestamp) = self._node._cache.read(self.verinfo, shnum, @@ -173,13 +187,14 @@ class Retrieve: # datav.append(data) if len(datav) == len(readv): self.log("got data from cache") + got_from_cache = True d = defer.succeed(datav) else: self.remaining_sharemap[shnum].remove(peerid) d = self._do_read(ss, peerid, self._storage_index, [shnum], readv) d.addCallback(self._fill_cache, readv) - d.addCallback(self._got_results, m, peerid, started) + d.addCallback(self._got_results, m, peerid, started, got_from_cache) d.addErrback(self._query_failed, m, peerid) # errors that aren't handled by _query_failed (and errors caused by # _query_failed) get logged, but we still want to check for doneness. @@ -216,7 +231,11 @@ class Retrieve: for shnum in list(self.remaining_sharemap.keys()): self.remaining_sharemap.discard(shnum, peerid) - def _got_results(self, datavs, marker, peerid, started): + def _got_results(self, datavs, marker, peerid, started, got_from_cache): + now = time.time() + elapsed = now - started + if not got_from_cache: + self._status.add_fetch_timing(peerid, elapsed) self.log(format="got results (%(shares)d shares) from [%(peerid)s]", shares=len(datavs), peerid=idlib.shortnodeid_b2a(peerid), @@ -241,6 +260,7 @@ class Retrieve: self.remove_peer(peerid) self.servermap.mark_bad_share(peerid, shnum) self._bad_shares.add( (peerid, shnum) ) + self._status.problems[peerid] = f self._last_failure = f pass # all done! @@ -284,6 +304,7 @@ class Retrieve: self.log(format="query to [%(peerid)s] failed", peerid=idlib.shortnodeid_b2a(peerid), level=log.NOISY) + self._status.problems[peerid] = f self._outstanding_queries.pop(marker, None) if not self._running: return @@ -317,6 +338,10 @@ class Retrieve: # to fix it, so the download will fail. self._decoding = True # avoid reentrancy + self._status.set_status("decoding") + now = time.time() + elapsed = now - self._started + self._status.timings["fetch"] = elapsed d = defer.maybeDeferred(self._decode) d.addCallback(self._decrypt, IV, self._node._readkey) @@ -366,6 +391,7 @@ class Retrieve: peerid = list(self.remaining_sharemap[shnum])[0] # get_data will remove that peerid from the sharemap, and add the # query to self._outstanding_queries + self._status.set_status("Retrieving More Shares") self.get_data(shnum, peerid) needed -= 1 if not needed: @@ -400,6 +426,7 @@ class Retrieve: return def _decode(self): + started = time.time() (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, offsets_tuple) = self.verinfo @@ -423,6 +450,7 @@ class Retrieve: self.log("about to decode, shareids=%s" % (shareids,)) d = defer.maybeDeferred(fec.decode, shares, shareids) def _done(buffers): + self._status.timings["decode"] = time.time() - started self.log(" decode done, %d buffers" % len(buffers)) segment = "".join(buffers) self.log(" joined length %d, datalength %d" % @@ -438,21 +466,28 @@ class Retrieve: return d def _decrypt(self, crypttext, IV, readkey): + self._status.set_status("decrypting") started = time.time() key = hashutil.ssk_readkey_data_hash(IV, readkey) decryptor = AES(key) plaintext = decryptor.process(crypttext) + self._status.timings["decrypt"] = time.time() - started return plaintext def _done(self, res): if not self._running: return self._running = False + self._status.set_active(False) + self._status.timings["total"] = time.time() - self._started # res is either the new contents, or a Failure if isinstance(res, failure.Failure): self.log("Retrieve done, with failure", failure=res) + self._status.set_status("Failed") else: self.log("Retrieve done, success!") + self._status.set_status("Done") + self._status.set_progress(1.0) # remember the encoding parameters, use them again next time (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, offsets_tuple) = self.verinfo diff --git a/src/allmydata/mutable/servermap.py b/src/allmydata/mutable/servermap.py index ccfb50d3..edd2a8d6 100644 --- a/src/allmydata/mutable/servermap.py +++ b/src/allmydata/mutable/servermap.py @@ -85,6 +85,13 @@ class ServerMap: for (peerid, shnum) in self.servermap]) + def make_sharemap(self): + """Return a dict that maps shnum to a set of peerds that hold it.""" + sharemap = DictOfSets() + for (peerid, shnum) in self.servermap: + sharemap.add(shnum, peerid) + return sharemap + def make_versionmap(self): """Return a dict that maps versionid to sets of (shnum, peerid, timestamp) tuples.""" diff --git a/src/allmydata/test/test_keygen.py b/src/allmydata/test/test_keygen.py index 38c54d3f..a6f354d6 100644 --- a/src/allmydata/test/test_keygen.py +++ b/src/allmydata/test/test_keygen.py @@ -25,6 +25,7 @@ class KeyGenService(unittest.TestCase, testutil.PollMixin): t.setServiceParent(self.parent) t.listenOn("tcp:0") t.setLocationAutomatically() + return eventual.fireEventually() def tearDown(self): d = self.parent.stopService() diff --git a/src/allmydata/web/publish-status.xhtml b/src/allmydata/web/publish-status.xhtml index 9250371b..cb089ee2 100644 --- a/src/allmydata/web/publish-status.xhtml +++ b/src/allmydata/web/publish-status.xhtml @@ -22,7 +22,6 @@

Retrieve Results