]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
mutable WIP: re-enable publish/retrieve status
authorBrian Warner <warner@allmydata.com>
Thu, 17 Apr 2008 00:49:06 +0000 (17:49 -0700)
committerBrian Warner <warner@allmydata.com>
Thu, 17 Apr 2008 00:49:06 +0000 (17:49 -0700)
src/allmydata/client.py
src/allmydata/mutable/node.py
src/allmydata/mutable/publish.py
src/allmydata/mutable/retrieve.py
src/allmydata/mutable/servermap.py
src/allmydata/test/test_keygen.py
src/allmydata/web/publish-status.xhtml
src/allmydata/web/retrieve-status.xhtml
src/allmydata/web/status.py

index 134ed9fd8880276becf92042c4b47877ecd1131e..2cd5d48450ce0197b32115922f15332615c1d2d2 100644 (file)
@@ -300,10 +300,10 @@ class Client(node.Node, testutil.PollMixin):
         assert IMutableFileURI.providedBy(u), u
         return MutableFileNode(self).init_from_uri(u)
 
         assert IMutableFileURI.providedBy(u), u
         return MutableFileNode(self).init_from_uri(u)
 
-    def notify_publish(self, p):
-        self.getServiceNamed("mutable-watcher").notify_publish(p)
-    def notify_retrieve(self, r):
-        self.getServiceNamed("mutable-watcher").notify_retrieve(r)
+    def notify_publish(self, publish_status):
+        self.getServiceNamed("mutable-watcher").notify_publish(publish_status)
+    def notify_retrieve(self, retrieve_status):
+        self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
 
     def create_empty_dirnode(self):
         n = NewDirectoryNode(self)
 
     def create_empty_dirnode(self):
         n = NewDirectoryNode(self)
index 52389297408709c029d47a6eaa44e726e032c1b8..c19e0396e6d35cf170d4f2b4676607b77836acce 100644 (file)
@@ -22,8 +22,6 @@ from retrieve import Retrieve
 
 class MutableFileNode:
     implements(IMutableFileNode)
 
 class MutableFileNode:
     implements(IMutableFileNode)
-    publish_class = Publish
-    retrieve_class = Retrieve
     SIGNATURE_KEY_SIZE = 2048
     DEFAULT_ENCODING = (3, 10)
 
     SIGNATURE_KEY_SIZE = 2048
     DEFAULT_ENCODING = (3, 10)
 
@@ -90,7 +88,7 @@ class MutableFileNode:
             # nobody knows about us yet"
             self._current_seqnum = 0
             self._current_roothash = "\x00"*32
             # nobody knows about us yet"
             self._current_seqnum = 0
             self._current_roothash = "\x00"*32
-            return self._publish(initial_contents)
+            return self._publish(None, initial_contents)
         d.addCallback(_generated)
         return d
 
         d.addCallback(_generated)
         return d
 
@@ -225,17 +223,13 @@ class MutableFileNode:
     def download_version(self, servermap, versionid):
         """Returns a Deferred that fires with a string."""
         d = self.obtain_lock()
     def download_version(self, servermap, versionid):
         """Returns a Deferred that fires with a string."""
         d = self.obtain_lock()
-        d.addCallback(lambda res:
-                      Retrieve(self, servermap, versionid).download())
+        d.addCallback(lambda res: self._retrieve(servermap, versionid))
         d.addBoth(self.release_lock)
         return d
 
         d.addBoth(self.release_lock)
         return d
 
-    def publish(self, servermap, newdata):
-        assert self._pubkey, "update_servermap must be called before publish"
+    def publish(self, servermap, new_contents):
         d = self.obtain_lock()
         d = self.obtain_lock()
-        d.addCallback(lambda res: Publish(self, servermap).publish(newdata))
-        # p = self.publish_class(self)
-        # self._client.notify_publish(p)
+        d.addCallback(lambda res: self._publish(servermap, new_contents))
         d.addBoth(self.release_lock)
         return d
 
         d.addBoth(self.release_lock)
         return d
 
@@ -269,16 +263,11 @@ class MutableFileNode:
         verifier = self.get_verifier()
         return self._client.getServiceNamed("checker").check(verifier)
 
         verifier = self.get_verifier()
         return self._client.getServiceNamed("checker").check(verifier)
 
-    def download(self, target):
-        # fake it. TODO: make this cleaner.
-        d = self.download_to_data()
-        def _done(data):
-            target.open(len(data))
-            target.write(data)
-            target.close()
-            return target.finish()
-        d.addCallback(_done)
-        return d
+
+    def _retrieve(self, servermap, verinfo):
+        r = Retrieve(self, servermap, verinfo)
+        self._client.notify_retrieve(r.get_status())
+        return r.download()
 
     def _update_and_retrieve_best(self, old_map=None, mode=MODE_READ):
         d = self.update_servermap(old_map=old_map, mode=mode)
 
     def _update_and_retrieve_best(self, old_map=None, mode=MODE_READ):
         d = self.update_servermap(old_map=old_map, mode=mode)
@@ -306,22 +295,33 @@ class MutableFileNode:
         d.addBoth(self.release_lock)
         return d
 
         d.addBoth(self.release_lock)
         return d
 
-    def _publish(self, initial_contents):
-        p = Publish(self, None)
-        d = p.publish(initial_contents)
-        d.addCallback(lambda res: self)
+    def download(self, target):
+        # fake it. TODO: make this cleaner.
+        d = self.download_to_data()
+        def _done(data):
+            target.open(len(data))
+            target.write(data)
+            target.close()
+            return target.finish()
+        d.addCallback(_done)
         return d
 
         return d
 
-    def update(self, newdata):
+
+    def _publish(self, servermap, new_contents):
+        assert self._pubkey, "update_servermap must be called before publish"
+        p = Publish(self, servermap)
+        self._client.notify_publish(p.get_status())
+        return p.publish(new_contents)
+
+    def update(self, new_contents):
         d = self.obtain_lock()
         d.addCallback(lambda res: self.update_servermap(mode=MODE_WRITE))
         d = self.obtain_lock()
         d.addCallback(lambda res: self.update_servermap(mode=MODE_WRITE))
-        d.addCallback(lambda smap:
-                      Publish(self, smap).publish(newdata))
+        d.addCallback(self._publish, new_contents)
         d.addBoth(self.release_lock)
         return d
 
         d.addBoth(self.release_lock)
         return d
 
-    def overwrite(self, newdata):
-        return self.update(newdata)
+    def overwrite(self, new_contents):
+        return self.update(new_contents)
 
 
 class MutableWatcher(service.MultiService):
 
 
 class MutableWatcher(service.MultiService):
@@ -332,42 +332,40 @@ class MutableWatcher(service.MultiService):
     def __init__(self, stats_provider=None):
         service.MultiService.__init__(self)
         self.stats_provider = stats_provider
     def __init__(self, stats_provider=None):
         service.MultiService.__init__(self)
         self.stats_provider = stats_provider
-        self._all_publish = weakref.WeakKeyDictionary()
+        self._all_publish_status = weakref.WeakKeyDictionary()
         self._recent_publish_status = []
         self._recent_publish_status = []
-        self._all_retrieve = weakref.WeakKeyDictionary()
+        self._all_retrieve_status = weakref.WeakKeyDictionary()
         self._recent_retrieve_status = []
 
     def notify_publish(self, p):
         self._recent_retrieve_status = []
 
     def notify_publish(self, p):
-        self._all_publish[p] = None
-        self._recent_publish_status.append(p.get_status())
+        self._all_publish_status[p] = None
+        self._recent_publish_status.append(p)
         if self.stats_provider:
             self.stats_provider.count('mutable.files_published', 1)
         if self.stats_provider:
             self.stats_provider.count('mutable.files_published', 1)
-            #self.stats_provider.count('mutable.bytes_published', p._node.get_size())
+            self.stats_provider.count('mutable.bytes_published', p.get_size())
         while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES:
             self._recent_publish_status.pop(0)
 
     def list_all_publish(self):
         while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES:
             self._recent_publish_status.pop(0)
 
     def list_all_publish(self):
-        return self._all_publish.keys()
+        return self._all_publish_status.keys()
     def list_active_publish(self):
     def list_active_publish(self):
-        return [p.get_status() for p in self._all_publish.keys()
-                if p.get_status().get_active()]
+        return [p for p in self._all_publish_status.keys() if p.get_active()]
     def list_recent_publish(self):
         return self._recent_publish_status
 
 
     def notify_retrieve(self, r):
     def list_recent_publish(self):
         return self._recent_publish_status
 
 
     def notify_retrieve(self, r):
-        self._all_retrieve[r] = None
-        self._recent_retrieve_status.append(r.get_status())
+        self._all_retrieve_status[r] = None
+        self._recent_retrieve_status.append(r)
         if self.stats_provider:
             self.stats_provider.count('mutable.files_retrieved', 1)
         if self.stats_provider:
             self.stats_provider.count('mutable.files_retrieved', 1)
-            #self.stats_provider.count('mutable.bytes_retrieved', r._node.get_size())
+            self.stats_provider.count('mutable.bytes_retrieved', r.get_size())
         while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES:
             self._recent_retrieve_status.pop(0)
 
     def list_all_retrieve(self):
         while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES:
             self._recent_retrieve_status.pop(0)
 
     def list_all_retrieve(self):
-        return self._all_retrieve.keys()
+        return self._all_retrieve_status.keys()
     def list_active_retrieve(self):
     def list_active_retrieve(self):
-        return [p.get_status() for p in self._all_retrieve.keys()
-                if p.get_status().get_active()]
+        return [p for p in self._all_retrieve_status.keys() if p.get_active()]
     def list_recent_retrieve(self):
         return self._recent_retrieve_status
     def list_recent_retrieve(self):
         return self._recent_retrieve_status
index b53c639ae806c0b073239d4fa458c6c9eb22ced3..acbe5cee6bae655ebb8df54abb9eda8bc39202b1 100644 (file)
@@ -4,10 +4,12 @@ import os, struct, time
 from itertools import count
 from zope.interface import implements
 from twisted.internet import defer
 from itertools import count
 from zope.interface import implements
 from twisted.internet import defer
+from twisted.python import failure
 from allmydata.interfaces import IPublishStatus
 from allmydata.util import base32, hashutil, mathutil, idlib, log
 from allmydata import hashtree, codec, storage
 from pycryptopp.cipher.aes import AES
 from allmydata.interfaces import IPublishStatus
 from allmydata.util import base32, hashutil, mathutil, idlib, log
 from allmydata import hashtree, codec, storage
 from pycryptopp.cipher.aes import AES
+from foolscap.eventual import eventually
 
 from common import MODE_WRITE, UncoordinatedWriteError, DictOfSets
 from servermap import ServerMap
 
 from common import MODE_WRITE, UncoordinatedWriteError, DictOfSets
 from servermap import ServerMap
@@ -19,27 +21,23 @@ class PublishStatus:
     statusid_counter = count(0)
     def __init__(self):
         self.timings = {}
     statusid_counter = count(0)
     def __init__(self):
         self.timings = {}
-        self.timings["per_server"] = {}
-        self.privkey_from = None
-        self.peers_queried = None
-        self.sharemap = None # DictOfSets
+        self.timings["send_per_server"] = {}
+        self.servermap = None
         self.problems = {}
         self.active = True
         self.storage_index = None
         self.helper = False
         self.encoding = ("?", "?")
         self.problems = {}
         self.active = True
         self.storage_index = None
         self.helper = False
         self.encoding = ("?", "?")
-        self.initial_read_size = None
         self.size = None
         self.status = "Not started"
         self.progress = 0.0
         self.counter = self.statusid_counter.next()
         self.started = time.time()
 
         self.size = None
         self.status = "Not started"
         self.progress = 0.0
         self.counter = self.statusid_counter.next()
         self.started = time.time()
 
-    def add_per_server_time(self, peerid, op, elapsed):
-        assert op in ("read", "write")
-        if peerid not in self.timings["per_server"]:
-            self.timings["per_server"][peerid] = []
-        self.timings["per_server"][peerid].append((op,elapsed))
+    def add_per_server_time(self, peerid, elapsed):
+        if peerid not in self.timings["send_per_server"]:
+            self.timings["send_per_server"][peerid] = []
+        self.timings["send_per_server"][peerid].append(elapsed)
 
     def get_started(self):
         return self.started
 
     def get_started(self):
         return self.started
@@ -49,6 +47,8 @@ class PublishStatus:
         return self.encoding
     def using_helper(self):
         return self.helper
         return self.encoding
     def using_helper(self):
         return self.helper
+    def get_servermap(self):
+        return self.servermap
     def get_size(self):
         return self.size
     def get_status(self):
     def get_size(self):
         return self.size
     def get_status(self):
@@ -64,6 +64,8 @@ class PublishStatus:
         self.storage_index = si
     def set_helper(self, helper):
         self.helper = helper
         self.storage_index = si
     def set_helper(self, helper):
         self.helper = helper
+    def set_servermap(self, servermap):
+        self.servermap = servermap
     def set_encoding(self, k, n):
         self.encoding = (k, n)
     def set_size(self, size):
     def set_encoding(self, k, n):
         self.encoding = (k, n)
     def set_size(self, size):
@@ -102,6 +104,13 @@ class Publish:
         self._log_number = num
         self._running = True
 
         self._log_number = num
         self._running = True
 
+        self._status = PublishStatus()
+        self._status.set_storage_index(self._storage_index)
+        self._status.set_helper(False)
+        self._status.set_progress(0.0)
+        self._status.set_active(True)
+        self._status.set_servermap(servermap)
+
     def log(self, *args, **kwargs):
         if 'parent' not in kwargs:
             kwargs['parent'] = self._log_number
     def log(self, *args, **kwargs):
         if 'parent' not in kwargs:
             kwargs['parent'] = self._log_number
@@ -129,6 +138,9 @@ class Publish:
         # 5: when enough responses are back, we're done
 
         self.log("starting publish, datalen is %s" % len(newdata))
         # 5: when enough responses are back, we're done
 
         self.log("starting publish, datalen is %s" % len(newdata))
+        self._status.set_size(len(newdata))
+        self._status.set_status("Started")
+        self._started = time.time()
 
         self.done_deferred = defer.Deferred()
 
 
         self.done_deferred = defer.Deferred()
 
@@ -160,6 +172,8 @@ class Publish:
         assert self.required_shares is not None
         self.total_shares = self._node.get_total_shares()
         assert self.total_shares is not None
         assert self.required_shares is not None
         self.total_shares = self._node.get_total_shares()
         assert self.total_shares is not None
+        self._status.set_encoding(self.required_shares, self.total_shares)
+
         self._pubkey = self._node.get_pubkey()
         assert self._pubkey
         self._privkey = self._node.get_privkey()
         self._pubkey = self._node.get_pubkey()
         assert self._pubkey
         self._privkey = self._node.get_privkey()
@@ -209,8 +223,13 @@ class Publish:
         # create the shares. We'll discard these as they are delivered. SMDF:
         # we're allowed to hold everything in memory.
 
         # create the shares. We'll discard these as they are delivered. SMDF:
         # we're allowed to hold everything in memory.
 
+        self._status.timings["setup"] = time.time() - self._started
         d = self._encrypt_and_encode()
         d.addCallback(self._generate_shares)
         d = self._encrypt_and_encode()
         d.addCallback(self._generate_shares)
+        def _start_pushing(res):
+            self._started_pushing = time.time()
+            return res
+        d.addCallback(_start_pushing)
         d.addCallback(self.loop) # trigger delivery
         d.addErrback(self._fatal_error)
 
         d.addCallback(self.loop) # trigger delivery
         d.addErrback(self._fatal_error)
 
@@ -233,11 +252,22 @@ class Publish:
         self.log("error during loop", failure=f, level=log.SCARY)
         self._done(f)
 
         self.log("error during loop", failure=f, level=log.SCARY)
         self._done(f)
 
+    def _update_status(self):
+        self._status.set_status("Sending Shares: %d placed out of %d, "
+                                "%d messages outstanding" %
+                                (len(self.placed),
+                                 len(self.goal),
+                                 len(self.outstanding)))
+        self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
+
     def loop(self, ignored=None):
         self.log("entering loop", level=log.NOISY)
     def loop(self, ignored=None):
         self.log("entering loop", level=log.NOISY)
+        if not self._running:
+            return
         self.update_goal()
         # how far are we from our goal?
         needed = self.goal - self.placed - self.outstanding
         self.update_goal()
         # how far are we from our goal?
         needed = self.goal - self.placed - self.outstanding
+        self._update_status()
 
         if needed:
             # we need to send out new shares
 
         if needed:
             # we need to send out new shares
@@ -258,6 +288,9 @@ class Publish:
         # no queries outstanding, no placements needed: we're done
         self.log("no queries outstanding, no placements needed: done",
                  level=log.OPERATIONAL)
         # no queries outstanding, no placements needed: we're done
         self.log("no queries outstanding, no placements needed: done",
                  level=log.OPERATIONAL)
+        now = time.time()
+        elapsed = now - self._started_pushing
+        self._status.timings["push"] = elapsed
         return self._done(None)
 
     def log_goal(self, goal):
         return self._done(None)
 
     def log_goal(self, goal):
@@ -331,19 +364,21 @@ class Publish:
         # shares that we care about.
         self.log("_encrypt_and_encode")
 
         # shares that we care about.
         self.log("_encrypt_and_encode")
 
-        #started = time.time()
+        self._status.set_status("Encrypting")
+        started = time.time()
 
         key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
         enc = AES(key)
         crypttext = enc.process(self.newdata)
         assert len(crypttext) == len(self.newdata)
 
 
         key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
         enc = AES(key)
         crypttext = enc.process(self.newdata)
         assert len(crypttext) == len(self.newdata)
 
-        #now = time.time()
-        #self._status.timings["encrypt"] = now - started
-        #started = now
+        now = time.time()
+        self._status.timings["encrypt"] = now - started
+        started = now
 
         # now apply FEC
 
 
         # now apply FEC
 
+        self._status.set_status("Encoding")
         fec = codec.CRSEncoder()
         fec.set_params(self.segment_size,
                        self.required_shares, self.total_shares)
         fec = codec.CRSEncoder()
         fec.set_params(self.segment_size,
                        self.required_shares, self.total_shares)
@@ -358,8 +393,8 @@ class Publish:
 
         d = fec.encode(crypttext_pieces)
         def _done_encoding(res):
 
         d = fec.encode(crypttext_pieces)
         def _done_encoding(res):
-            #elapsed = time.time() - started
-            #self._status.timings["encode"] = elapsed
+            elapsed = time.time() - started
+            self._status.timings["encode"] = elapsed
             return res
         d.addCallback(_done_encoding)
         return d
             return res
         d.addCallback(_done_encoding)
         return d
@@ -367,7 +402,8 @@ class Publish:
     def _generate_shares(self, shares_and_shareids):
         # this sets self.shares and self.root_hash
         self.log("_generate_shares")
     def _generate_shares(self, shares_and_shareids):
         # this sets self.shares and self.root_hash
         self.log("_generate_shares")
-        #started = time.time()
+        self._status.set_status("Generating Shares")
+        started = time.time()
 
         # we should know these by now
         privkey = self._privkey
 
         # we should know these by now
         privkey = self._privkey
@@ -413,9 +449,9 @@ class Publish:
         # then they all share the same encprivkey at the end. The sizes
         # of everything are the same for all shares.
 
         # then they all share the same encprivkey at the end. The sizes
         # of everything are the same for all shares.
 
-        #sign_started = time.time()
+        sign_started = time.time()
         signature = privkey.sign(prefix)
         signature = privkey.sign(prefix)
-        #self._status.timings["sign"] = time.time() - sign_started
+        self._status.timings["sign"] = time.time() - sign_started
 
         verification_key = pubkey.serialize()
 
 
         verification_key = pubkey.serialize()
 
@@ -429,8 +465,8 @@ class Publish:
                                      all_shares[shnum],
                                      encprivkey)
             final_shares[shnum] = final_share
                                      all_shares[shnum],
                                      encprivkey)
             final_shares[shnum] = final_share
-        #elapsed = time.time() - started
-        #self._status.timings["pack"] = elapsed
+        elapsed = time.time() - started
+        self._status.timings["pack"] = elapsed
         self.shares = final_shares
         self.root_hash = root_hash
 
         self.shares = final_shares
         self.root_hash = root_hash
 
@@ -449,7 +485,6 @@ class Publish:
 
     def _send_shares(self, needed):
         self.log("_send_shares")
 
     def _send_shares(self, needed):
         self.log("_send_shares")
-        #started = time.time()
 
         # we're finally ready to send out our shares. If we encounter any
         # surprises here, it's because somebody else is writing at the same
 
         # we're finally ready to send out our shares. If we encounter any
         # surprises here, it's because somebody else is writing at the same
@@ -547,6 +582,7 @@ class Publish:
             d.addErrback(self._fatal_error)
             dl.append(d)
 
             d.addErrback(self._fatal_error)
             dl.append(d)
 
+        self._update_status()
         return defer.DeferredList(dl) # purely for testing
 
     def _do_testreadwrite(self, peerid, secrets,
         return defer.DeferredList(dl) # purely for testing
 
     def _do_testreadwrite(self, peerid, secrets,
@@ -568,6 +604,10 @@ class Publish:
         for shnum in shnums:
             self.outstanding.discard( (peerid, shnum) )
 
         for shnum in shnums:
             self.outstanding.discard( (peerid, shnum) )
 
+        now = time.time()
+        elapsed = now - started
+        self._status.add_per_server_time(peerid, elapsed)
+
         wrote, read_data = answer
 
         if not wrote:
         wrote, read_data = answer
 
         if not wrote:
@@ -650,13 +690,16 @@ class Publish:
         if not self._running:
             return
         self._running = False
         if not self._running:
             return
         self._running = False
-        #now = time.time()
-        #self._status.timings["total"] = now - self._started
-        #self._status.set_active(False)
-        #self._status.set_status("Done")
-        #self._status.set_progress(1.0)
-        self.done_deferred.callback(res)
-        return None
+        now = time.time()
+        self._status.timings["total"] = now - self._started
+        self._status.set_active(False)
+        if isinstance(res, failure.Failure):
+            self.log("Retrieve done, with failure", failure=res)
+            self._status.set_status("Failed")
+        else:
+            self._status.set_status("Done")
+            self._status.set_progress(1.0)
+        eventually(self.done_deferred.callback, res)
 
     def get_status(self):
         return self._status
 
     def get_status(self):
         return self._status
index 4281ce90cfc12cc90cebdf6fa91bae2f3890f5fd..00663f4c15742124df564a76c7e5abad379c06e8 100644 (file)
@@ -21,13 +21,11 @@ class RetrieveStatus:
         self.timings = {}
         self.timings["fetch_per_server"] = {}
         self.timings["cumulative_verify"] = 0.0
         self.timings = {}
         self.timings["fetch_per_server"] = {}
         self.timings["cumulative_verify"] = 0.0
-        self.sharemap = {}
         self.problems = {}
         self.active = True
         self.storage_index = None
         self.helper = False
         self.encoding = ("?","?")
         self.problems = {}
         self.active = True
         self.storage_index = None
         self.helper = False
         self.encoding = ("?","?")
-        self.search_distance = None
         self.size = None
         self.status = "Not started"
         self.progress = 0.0
         self.size = None
         self.status = "Not started"
         self.progress = 0.0
@@ -40,8 +38,6 @@ class RetrieveStatus:
         return self.storage_index
     def get_encoding(self):
         return self.encoding
         return self.storage_index
     def get_encoding(self):
         return self.encoding
-    def get_search_distance(self):
-        return self.search_distance
     def using_helper(self):
         return self.helper
     def get_size(self):
     def using_helper(self):
         return self.helper
     def get_size(self):
@@ -55,14 +51,16 @@ class RetrieveStatus:
     def get_counter(self):
         return self.counter
 
     def get_counter(self):
         return self.counter
 
+    def add_fetch_timing(self, peerid, elapsed):
+        if peerid not in self.timings["fetch_per_server"]:
+            self.timings["fetch_per_server"][peerid] = []
+        self.timings["fetch_per_server"][peerid].append(elapsed)
     def set_storage_index(self, si):
         self.storage_index = si
     def set_helper(self, helper):
         self.helper = helper
     def set_encoding(self, k, n):
         self.encoding = (k, n)
     def set_storage_index(self, si):
         self.storage_index = si
     def set_helper(self, helper):
         self.helper = helper
     def set_encoding(self, k, n):
         self.encoding = (k, n)
-    def set_search_distance(self, value):
-        self.search_distance = value
     def set_size(self, size):
         self.size = size
     def set_status(self, status):
     def set_size(self, size):
         self.size = size
     def set_status(self, status):
@@ -99,6 +97,19 @@ class Retrieve:
         assert self._node._pubkey
         self.verinfo = verinfo
 
         assert self._node._pubkey
         self.verinfo = verinfo
 
+        self._status = RetrieveStatus()
+        self._status.set_storage_index(self._storage_index)
+        self._status.set_helper(False)
+        self._status.set_progress(0.0)
+        self._status.set_active(True)
+        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+         offsets_tuple) = self.verinfo
+        self._status.set_size(datalength)
+        self._status.set_encoding(k, N)
+
+    def get_status(self):
+        return self._status
+
     def log(self, *args, **kwargs):
         if "parent" not in kwargs:
             kwargs["parent"] = self._log_number
     def log(self, *args, **kwargs):
         if "parent" not in kwargs:
             kwargs["parent"] = self._log_number
@@ -106,6 +117,8 @@ class Retrieve:
 
     def download(self):
         self._done_deferred = defer.Deferred()
 
     def download(self):
         self._done_deferred = defer.Deferred()
+        self._started = time.time()
+        self._status.set_status("Retrieving Shares")
 
         # first, which servers can we use?
         versionmap = self.servermap.make_versionmap()
 
         # first, which servers can we use?
         versionmap = self.servermap.make_versionmap()
@@ -165,6 +178,7 @@ class Retrieve:
         self._outstanding_queries[m] = (peerid, shnum, started)
 
         # ask the cache first
         self._outstanding_queries[m] = (peerid, shnum, started)
 
         # ask the cache first
+        got_from_cache = False
         datav = []
         #for (offset, length) in readv:
         #    (data, timestamp) = self._node._cache.read(self.verinfo, shnum,
         datav = []
         #for (offset, length) in readv:
         #    (data, timestamp) = self._node._cache.read(self.verinfo, shnum,
@@ -173,13 +187,14 @@ class Retrieve:
         #        datav.append(data)
         if len(datav) == len(readv):
             self.log("got data from cache")
         #        datav.append(data)
         if len(datav) == len(readv):
             self.log("got data from cache")
+            got_from_cache = True
             d = defer.succeed(datav)
         else:
             self.remaining_sharemap[shnum].remove(peerid)
             d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
             d.addCallback(self._fill_cache, readv)
 
             d = defer.succeed(datav)
         else:
             self.remaining_sharemap[shnum].remove(peerid)
             d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
             d.addCallback(self._fill_cache, readv)
 
-        d.addCallback(self._got_results, m, peerid, started)
+        d.addCallback(self._got_results, m, peerid, started, got_from_cache)
         d.addErrback(self._query_failed, m, peerid)
         # errors that aren't handled by _query_failed (and errors caused by
         # _query_failed) get logged, but we still want to check for doneness.
         d.addErrback(self._query_failed, m, peerid)
         # errors that aren't handled by _query_failed (and errors caused by
         # _query_failed) get logged, but we still want to check for doneness.
@@ -216,7 +231,11 @@ class Retrieve:
         for shnum in list(self.remaining_sharemap.keys()):
             self.remaining_sharemap.discard(shnum, peerid)
 
         for shnum in list(self.remaining_sharemap.keys()):
             self.remaining_sharemap.discard(shnum, peerid)
 
-    def _got_results(self, datavs, marker, peerid, started):
+    def _got_results(self, datavs, marker, peerid, started, got_from_cache):
+        now = time.time()
+        elapsed = now - started
+        if not got_from_cache:
+            self._status.add_fetch_timing(peerid, elapsed)
         self.log(format="got results (%(shares)d shares) from [%(peerid)s]",
                  shares=len(datavs),
                  peerid=idlib.shortnodeid_b2a(peerid),
         self.log(format="got results (%(shares)d shares) from [%(peerid)s]",
                  shares=len(datavs),
                  peerid=idlib.shortnodeid_b2a(peerid),
@@ -241,6 +260,7 @@ class Retrieve:
                 self.remove_peer(peerid)
                 self.servermap.mark_bad_share(peerid, shnum)
                 self._bad_shares.add( (peerid, shnum) )
                 self.remove_peer(peerid)
                 self.servermap.mark_bad_share(peerid, shnum)
                 self._bad_shares.add( (peerid, shnum) )
+                self._status.problems[peerid] = f
                 self._last_failure = f
                 pass
         # all done!
                 self._last_failure = f
                 pass
         # all done!
@@ -284,6 +304,7 @@ class Retrieve:
         self.log(format="query to [%(peerid)s] failed",
                  peerid=idlib.shortnodeid_b2a(peerid),
                  level=log.NOISY)
         self.log(format="query to [%(peerid)s] failed",
                  peerid=idlib.shortnodeid_b2a(peerid),
                  level=log.NOISY)
+        self._status.problems[peerid] = f
         self._outstanding_queries.pop(marker, None)
         if not self._running:
             return
         self._outstanding_queries.pop(marker, None)
         if not self._running:
             return
@@ -317,6 +338,10 @@ class Retrieve:
         # to fix it, so the download will fail.
 
         self._decoding = True # avoid reentrancy
         # to fix it, so the download will fail.
 
         self._decoding = True # avoid reentrancy
+        self._status.set_status("decoding")
+        now = time.time()
+        elapsed = now - self._started
+        self._status.timings["fetch"] = elapsed
 
         d = defer.maybeDeferred(self._decode)
         d.addCallback(self._decrypt, IV, self._node._readkey)
 
         d = defer.maybeDeferred(self._decode)
         d.addCallback(self._decrypt, IV, self._node._readkey)
@@ -366,6 +391,7 @@ class Retrieve:
             peerid = list(self.remaining_sharemap[shnum])[0]
             # get_data will remove that peerid from the sharemap, and add the
             # query to self._outstanding_queries
             peerid = list(self.remaining_sharemap[shnum])[0]
             # get_data will remove that peerid from the sharemap, and add the
             # query to self._outstanding_queries
+            self._status.set_status("Retrieving More Shares")
             self.get_data(shnum, peerid)
             needed -= 1
             if not needed:
             self.get_data(shnum, peerid)
             needed -= 1
             if not needed:
@@ -400,6 +426,7 @@ class Retrieve:
         return
 
     def _decode(self):
         return
 
     def _decode(self):
+        started = time.time()
         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
          offsets_tuple) = self.verinfo
 
         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
          offsets_tuple) = self.verinfo
 
@@ -423,6 +450,7 @@ class Retrieve:
         self.log("about to decode, shareids=%s" % (shareids,))
         d = defer.maybeDeferred(fec.decode, shares, shareids)
         def _done(buffers):
         self.log("about to decode, shareids=%s" % (shareids,))
         d = defer.maybeDeferred(fec.decode, shares, shareids)
         def _done(buffers):
+            self._status.timings["decode"] = time.time() - started
             self.log(" decode done, %d buffers" % len(buffers))
             segment = "".join(buffers)
             self.log(" joined length %d, datalength %d" %
             self.log(" decode done, %d buffers" % len(buffers))
             segment = "".join(buffers)
             self.log(" joined length %d, datalength %d" %
@@ -438,21 +466,28 @@ class Retrieve:
         return d
 
     def _decrypt(self, crypttext, IV, readkey):
         return d
 
     def _decrypt(self, crypttext, IV, readkey):
+        self._status.set_status("decrypting")
         started = time.time()
         key = hashutil.ssk_readkey_data_hash(IV, readkey)
         decryptor = AES(key)
         plaintext = decryptor.process(crypttext)
         started = time.time()
         key = hashutil.ssk_readkey_data_hash(IV, readkey)
         decryptor = AES(key)
         plaintext = decryptor.process(crypttext)
+        self._status.timings["decrypt"] = time.time() - started
         return plaintext
 
     def _done(self, res):
         if not self._running:
             return
         self._running = False
         return plaintext
 
     def _done(self, res):
         if not self._running:
             return
         self._running = False
+        self._status.set_active(False)
+        self._status.timings["total"] = time.time() - self._started
         # res is either the new contents, or a Failure
         if isinstance(res, failure.Failure):
             self.log("Retrieve done, with failure", failure=res)
         # res is either the new contents, or a Failure
         if isinstance(res, failure.Failure):
             self.log("Retrieve done, with failure", failure=res)
+            self._status.set_status("Failed")
         else:
             self.log("Retrieve done, success!")
         else:
             self.log("Retrieve done, success!")
+            self._status.set_status("Done")
+            self._status.set_progress(1.0)
             # remember the encoding parameters, use them again next time
             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
              offsets_tuple) = self.verinfo
             # remember the encoding parameters, use them again next time
             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
              offsets_tuple) = self.verinfo
index ccfb50d337a6423ae4c6352be8cfe08bc1404cfb..edd2a8d66417670983d38bef2b92e077a840a8b2 100644 (file)
@@ -85,6 +85,13 @@ class ServerMap:
                     for (peerid, shnum)
                     in self.servermap])
 
                     for (peerid, shnum)
                     in self.servermap])
 
+    def make_sharemap(self):
+        """Return a dict that maps shnum to a set of peerds that hold it."""
+        sharemap = DictOfSets()
+        for (peerid, shnum) in self.servermap:
+            sharemap.add(shnum, peerid)
+        return sharemap
+
     def make_versionmap(self):
         """Return a dict that maps versionid to sets of (shnum, peerid,
         timestamp) tuples."""
     def make_versionmap(self):
         """Return a dict that maps versionid to sets of (shnum, peerid,
         timestamp) tuples."""
index 38c54d3fea39abd1eedd1a9967d92b96115a982d..a6f354d6b0dc1d75762fd2ef48d5d9191694bbd6 100644 (file)
@@ -25,6 +25,7 @@ class KeyGenService(unittest.TestCase, testutil.PollMixin):
         t.setServiceParent(self.parent)
         t.listenOn("tcp:0")
         t.setLocationAutomatically()
         t.setServiceParent(self.parent)
         t.listenOn("tcp:0")
         t.setLocationAutomatically()
+        return eventual.fireEventually()
 
     def tearDown(self):
         d = self.parent.stopService()
 
     def tearDown(self):
         d = self.parent.stopService()
index 9250371b43081d9051f620617a6c37c070481b88..cb089ee2e26eedad9fd378a588cf2f46d89352ad 100644 (file)
@@ -22,7 +22,6 @@
 <h2>Retrieve Results</h2>
 <ul>
   <li n:render="encoding" />
 <h2>Retrieve Results</h2>
 <ul>
   <li n:render="encoding" />
-  <li n:render="peers_queried" />
   <li n:render="problems" />
   <li n:render="sharemap" />
   <li>Timings:</li>
   <li n:render="problems" />
   <li n:render="sharemap" />
   <li>Timings:</li>
     (<span n:render="rate" n:data="rate_total" />)</li>
     <ul>
       <li>Setup: <span n:render="time" n:data="time_setup" /></li>
     (<span n:render="rate" n:data="rate_total" />)</li>
     <ul>
       <li>Setup: <span n:render="time" n:data="time_setup" /></li>
-      <li>Initial Version Query: <span n:render="time" n:data="time_query" />
-      (read size <span n:render="string" n:data="initial_read_size"/> bytes)</li>
-      <li>Obtain Privkey: <span n:render="time" n:data="time_privkey" />
-      <ul>
-        <li>Separate Privkey Fetch: <span n:render="time" n:data="time_privkey_fetch" /> <span n:render="privkey_from"/></li>
-      </ul></li>
       <li>Encrypting: <span n:render="time" n:data="time_encrypt" />
       (<span n:render="rate" n:data="rate_encrypt" />)</li>
       <li>Encoding: <span n:render="time" n:data="time_encode" />
       <li>Encrypting: <span n:render="time" n:data="time_encrypt" />
       (<span n:render="rate" n:data="rate_encrypt" />)</li>
       <li>Encoding: <span n:render="time" n:data="time_encode" />
index fe0b9e330727665b8ec5b65de7a7bb031db8fb66..52a7b18019bad862eb1e81bee0fbbbab7e6ad06b 100644 (file)
 <h2>Retrieve Results</h2>
 <ul>
   <li n:render="encoding" />
 <h2>Retrieve Results</h2>
 <ul>
   <li n:render="encoding" />
-  <li n:render="search_distance" />
   <li n:render="problems" />
   <li>Timings:</li>
   <ul>
     <li>Total: <span n:render="time" n:data="time_total" />
     (<span n:render="rate" n:data="rate_total" />)</li>
     <ul>
   <li n:render="problems" />
   <li>Timings:</li>
   <ul>
     <li>Total: <span n:render="time" n:data="time_total" />
     (<span n:render="rate" n:data="rate_total" />)</li>
     <ul>
-      <li>Initial Peer Selection: <span n:render="time" n:data="time_peer_selection" /></li>
       <li>Fetching: <span n:render="time" n:data="time_fetch" />
       <li>Fetching: <span n:render="time" n:data="time_fetch" />
-      (<span n:render="rate" n:data="rate_fetch" />)
-      <ul>
-        <li>Cumulative Verify: <span n:render="time" n:data="time_cumulative_verify" /></li>
-      </ul></li>
+      (<span n:render="rate" n:data="rate_fetch" />)</li>
       <li>Decoding: <span n:render="time" n:data="time_decode" />
       (<span n:render="rate" n:data="rate_decode" />)</li>
       <li>Decrypting: <span n:render="time" n:data="time_decrypt" />
       <li>Decoding: <span n:render="time" n:data="time_decode" />
       (<span n:render="rate" n:data="rate_decode" />)</li>
       <li>Decrypting: <span n:render="time" n:data="time_decrypt" />
index 06e9f0101ad42b1d3097678ce8813acccab9caf8..2e7788836b207faac4c9c945e9a4707970c0a82b 100644 (file)
@@ -449,10 +449,6 @@ class RetrieveStatusPage(rend.Page, RateAndTimeMixin):
         k, n = data.get_encoding()
         return ctx.tag["Encoding: %s of %s" % (k, n)]
 
         k, n = data.get_encoding()
         return ctx.tag["Encoding: %s of %s" % (k, n)]
 
-    def render_search_distance(self, ctx, data):
-        d = data.get_search_distance()
-        return ctx.tag["Search Distance: %s peer%s" % (d, plural(d))]
-
     def render_problems(self, ctx, data):
         problems = data.problems
         if not problems:
     def render_problems(self, ctx, data):
         problems = data.problems
         if not problems:
@@ -553,19 +549,16 @@ class PublishStatusPage(rend.Page, RateAndTimeMixin):
         k, n = data.get_encoding()
         return ctx.tag["Encoding: %s of %s" % (k, n)]
 
         k, n = data.get_encoding()
         return ctx.tag["Encoding: %s of %s" % (k, n)]
 
-    def render_peers_queried(self, ctx, data):
-        return ctx.tag["Peers Queried: ", data.peers_queried]
-
     def render_sharemap(self, ctx, data):
     def render_sharemap(self, ctx, data):
-        sharemap = data.sharemap
-        if sharemap is None:
+        servermap = data.get_servermap()
+        if servermap is None:
             return ctx.tag["None"]
         l = T.ul()
             return ctx.tag["None"]
         l = T.ul()
+        sharemap = servermap.make_sharemap()
         for shnum in sorted(sharemap.keys()):
             l[T.li["%d -> Placed on " % shnum,
                    ", ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
         for shnum in sorted(sharemap.keys()):
             l[T.li["%d -> Placed on " % shnum,
                    ", ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
-                              for (peerid,seqnum,root_hash)
-                              in sharemap[shnum]])]]
+                              for peerid in sharemap[shnum]])]]
         return ctx.tag["Sharemap:", l]
 
     def render_problems(self, ctx, data):
         return ctx.tag["Sharemap:", l]
 
     def render_problems(self, ctx, data):
@@ -596,21 +589,6 @@ class PublishStatusPage(rend.Page, RateAndTimeMixin):
     def data_time_setup(self, ctx, data):
         return self.publish_status.timings.get("setup")
 
     def data_time_setup(self, ctx, data):
         return self.publish_status.timings.get("setup")
 
-    def data_time_query(self, ctx, data):
-        return self.publish_status.timings.get("query")
-
-    def data_time_privkey(self, ctx, data):
-        return self.publish_status.timings.get("privkey")
-
-    def data_time_privkey_fetch(self, ctx, data):
-        return self.publish_status.timings.get("privkey_fetch")
-    def render_privkey_from(self, ctx, data):
-        peerid = data.privkey_from
-        if peerid:
-            return " (got from [%s])" % idlib.shortnodeid_b2a(peerid)
-        else:
-            return ""
-
     def data_time_encrypt(self, ctx, data):
         return self.publish_status.timings.get("encrypt")
     def data_rate_encrypt(self, ctx, data):
     def data_time_encrypt(self, ctx, data):
         return self.publish_status.timings.get("encrypt")
     def data_rate_encrypt(self, ctx, data):
@@ -633,23 +611,15 @@ class PublishStatusPage(rend.Page, RateAndTimeMixin):
     def data_rate_push(self, ctx, data):
         return self._get_rate(data, "push")
 
     def data_rate_push(self, ctx, data):
         return self._get_rate(data, "push")
 
-    def data_initial_read_size(self, ctx, data):
-        return self.publish_status.initial_read_size
-
     def render_server_timings(self, ctx, data):
     def render_server_timings(self, ctx, data):
-        per_server = self.publish_status.timings.get("per_server")
+        per_server = self.publish_status.timings.get("send_per_server")
         if not per_server:
             return ""
         l = T.ul()
         for peerid in sorted(per_server.keys()):
             peerid_s = idlib.shortnodeid_b2a(peerid)
         if not per_server:
             return ""
         l = T.ul()
         for peerid in sorted(per_server.keys()):
             peerid_s = idlib.shortnodeid_b2a(peerid)
-            times = []
-            for op,t in per_server[peerid]:
-                if op == "read":
-                    times.append( "(" + self.render_time(None, t) + ")" )
-                else:
-                    times.append( self.render_time(None, t) )
-            times_s = ", ".join(times)
+            times_s = ", ".join([self.render_time(None, t)
+                                 for t in per_server[peerid]])
             l[T.li["[%s]: %s" % (peerid_s, times_s)]]
         return T.li["Per-Server Response Times: ", l]
 
             l[T.li["[%s]: %s" % (peerid_s, times_s)]]
         return T.li["Per-Server Response Times: ", l]