Rewrite immutable downloader (#798). This patch includes higher-level
authorBrian Warner <warner@lothar.com>
Wed, 4 Aug 2010 07:27:02 +0000 (00:27 -0700)
committerBrian Warner <warner@lothar.com>
Wed, 4 Aug 2010 07:27:02 +0000 (00:27 -0700)
integration into the NodeMaker, and updates the web-status display to handle
the new download events.

src/allmydata/client.py
src/allmydata/interfaces.py
src/allmydata/nodemaker.py
src/allmydata/web/download-status.xhtml
src/allmydata/web/status.py
src/allmydata/web/tahoe.css

index d4b6172761c531da8d299c685e2d0459c39f72d8..fa515d41a0895efb3eb67256320fe24da9025318 100644 (file)
@@ -1,9 +1,10 @@
-import os, stat, time
+import os, stat, time, weakref
 from allmydata.interfaces import RIStorageServer
 from allmydata import node
 
 from zope.interface import implements
 from twisted.internet import reactor, defer
+from twisted.application import service
 from twisted.application.internet import TimerService
 from foolscap.api import Referenceable
 from pycryptopp.publickey import rsa
@@ -12,11 +13,10 @@ import allmydata
 from allmydata.storage.server import StorageServer
 from allmydata import storage_client
 from allmydata.immutable.upload import Uploader
-from allmydata.immutable.download import Downloader
 from allmydata.immutable.offloaded import Helper
 from allmydata.control import ControlServer
 from allmydata.introducer.client import IntroducerClient
-from allmydata.util import hashutil, base32, pollmixin, cachedir, log
+from allmydata.util import hashutil, base32, pollmixin, log
 from allmydata.util.encodingutil import get_filesystem_encoding
 from allmydata.util.abbreviate import parse_abbreviated_size
 from allmydata.util.time_format import parse_duration, parse_date
@@ -95,6 +95,16 @@ class KeyGenerator:
             verifier = signer.get_verifying_key()
             return defer.succeed( (verifier, signer) )
 
+class Terminator(service.Service):
+    def __init__(self):
+        self._clients = weakref.WeakKeyDictionary()
+    def register(self, c):
+        self._clients[c] = None
+    def stopService(self):
+        for c in self._clients:
+            c.stop()
+        return service.Service.stopService(self)
+
 
 class Client(node.Node, pollmixin.PollMixin):
     implements(IStatsProducer)
@@ -279,12 +289,9 @@ class Client(node.Node, pollmixin.PollMixin):
 
         self.init_client_storage_broker()
         self.history = History(self.stats_provider)
+        self.terminator = Terminator()
+        self.terminator.setServiceParent(self)
         self.add_service(Uploader(helper_furl, self.stats_provider))
-        download_cachedir = os.path.join(self.basedir,
-                                         "private", "cache", "download")
-        self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
-        self.download_cache_dirman.setServiceParent(self)
-        self.downloader = Downloader(self.storage_broker, self.stats_provider)
         self.init_stub_client()
         self.init_nodemaker()
 
@@ -343,8 +350,7 @@ class Client(node.Node, pollmixin.PollMixin):
                                    self._secret_holder,
                                    self.get_history(),
                                    self.getServiceNamed("uploader"),
-                                   self.downloader,
-                                   self.download_cache_dirman,
+                                   self.terminator,
                                    self.get_encoding_parameters(),
                                    self._key_generator)
 
index 4cfe9c903e4991e27d9e5a6cac2e4eb7b0096f7f..3a7fa7fa15d7f77534d1269d7517f0c263686cf2 100644 (file)
@@ -24,6 +24,9 @@ WriteEnablerSecret = Hash # used to protect mutable bucket modifications
 LeaseRenewSecret = Hash # used to protect bucket lease renewal requests
 LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests
 
+KiB = 1024
+DEFAULT_MAX_SEGMENT_SIZE = 128*KiB
+
 class RIStubClient(RemoteInterface):
     """Each client publishes a service announcement for a dummy object called
     the StubClient. This object doesn't actually offer any services, but the
index c852f68888a675f7c9c47eaad63d25da729c6d06..3b74d907b047ae527bc25f042e20dc44a51744e8 100644 (file)
@@ -1,7 +1,8 @@
 import weakref
 from zope.interface import implements
 from allmydata.interfaces import INodeMaker
-from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
+from allmydata.immutable.literal import LiteralFileNode
+from allmydata.immutable.filenode import ImmutableFileNode, CiphertextFileNode
 from allmydata.immutable.upload import Data
 from allmydata.mutable.filenode import MutableFileNode
 from allmydata.dirnode import DirectoryNode, pack_children
@@ -12,14 +13,13 @@ class NodeMaker:
     implements(INodeMaker)
 
     def __init__(self, storage_broker, secret_holder, history,
-                 uploader, downloader, download_cache_dirman,
+                 uploader, terminator,
                  default_encoding_parameters, key_generator):
         self.storage_broker = storage_broker
         self.secret_holder = secret_holder
         self.history = history
         self.uploader = uploader
-        self.downloader = downloader
-        self.download_cache_dirman = download_cache_dirman
+        self.terminator = terminator
         self.default_encoding_parameters = default_encoding_parameters
         self.key_generator = key_generator
 
@@ -29,8 +29,10 @@ class NodeMaker:
         return LiteralFileNode(cap)
     def _create_immutable(self, cap):
         return ImmutableFileNode(cap, self.storage_broker, self.secret_holder,
-                                 self.downloader, self.history,
-                                 self.download_cache_dirman)
+                                 self.terminator, self.history)
+    def _create_immutable_verifier(self, cap):
+        return CiphertextFileNode(cap, self.storage_broker, self.secret_holder,
+                                  self.terminator, self.history)
     def _create_mutable(self, cap):
         n = MutableFileNode(self.storage_broker, self.secret_holder,
                             self.default_encoding_parameters,
@@ -73,6 +75,8 @@ class NodeMaker:
             return self._create_lit(cap)
         if isinstance(cap, uri.CHKFileURI):
             return self._create_immutable(cap)
+        if isinstance(cap, uri.CHKFileVerifierURI):
+            return self._create_immutable_verifier(cap)
         if isinstance(cap, (uri.ReadonlySSKFileURI, uri.WriteableSSKFileURI)):
             return self._create_mutable(cap)
         if isinstance(cap, (uri.DirectoryURI,
index 77342ba5d381b89e3d67fc57f6016daaa0ec82a6..30abfca4e0a54c7c7a2abfe077e7427bc0be3c9f 100644 (file)
@@ -18,6 +18,7 @@
   <li>Status: <span n:render="status"/></li>
 </ul>
 
+<div n:render="events"></div>
 
 <div n:render="results">
   <h2>Download Results</h2>
index e4241a32a4bd38017721da4c4efe5f0e606679c9..c3a55d7e91f6aa1310eaedf3ec4dc91965039c92 100644 (file)
@@ -358,6 +358,147 @@ class DownloadStatusPage(DownloadResultsRendererMixin, rend.Page):
     def download_results(self):
         return defer.maybeDeferred(self.download_status.get_results)
 
+    def relative_time(self, t):
+        if t is None:
+            return t
+        if self.download_status.started is not None:
+            return t - self.download_status.started
+        return t
+    def short_relative_time(self, t):
+        t = self.relative_time(t)
+        if t is None:
+            return ""
+        return "+%.6fs" % t
+
+    def renderHTTP(self, ctx):
+        req = inevow.IRequest(ctx)
+        t = get_arg(req, "t")
+        if t == "json":
+            return self.json(req)
+        return rend.Page.renderHTTP(self, ctx)
+
+    def json(self, req):
+        req.setHeader("content-type", "text/plain")
+        data = {}
+        dyhb_events = []
+        for serverid,requests in self.download_status.dyhb_requests.iteritems():
+            for req in requests:
+                dyhb_events.append( (base32.b2a(serverid),) + req )
+        dyhb_events.sort(key=lambda req: req[1])
+        data["dyhb"] = dyhb_events
+        request_events = []
+        for serverid,requests in self.download_status.requests.iteritems():
+            for req in requests:
+                request_events.append( (base32.b2a(serverid),) + req )
+        request_events.sort(key=lambda req: (req[4],req[1]))
+        data["requests"] = request_events
+        data["segment"] = self.download_status.segment_events
+        data["read"] = self.download_status.read_events
+        return simplejson.dumps(data, indent=1) + "\n"
+
+    def render_events(self, ctx, data):
+        if not self.download_status.storage_index:
+            return
+        srt = self.short_relative_time
+        l = T.ul()
+
+        t = T.table(class_="status-download-events")
+        t[T.tr[T.td["serverid"], T.td["sent"], T.td["received"],
+               T.td["shnums"], T.td["RTT"]]]
+        dyhb_events = []
+        for serverid,requests in self.download_status.dyhb_requests.iteritems():
+            for req in requests:
+                dyhb_events.append( (serverid,) + req )
+        dyhb_events.sort(key=lambda req: req[1])
+        for d_ev in dyhb_events:
+            (serverid, sent, shnums, received) = d_ev
+            serverid_s = idlib.shortnodeid_b2a(serverid)
+            rtt = received - sent
+            t[T.tr(style="background: %s" % self.color(serverid))[
+                [T.td[serverid_s], T.td[srt(sent)], T.td[srt(received)],
+                 T.td[",".join([str(shnum) for shnum in shnums])],
+                 T.td[self.render_time(None, rtt)],
+                 ]]]
+        l["DYHB Requests:", t]
+
+        t = T.table(class_="status-download-events")
+        t[T.tr[T.td["range"], T.td["start"], T.td["finish"], T.td["got"],
+               T.td["time"], T.td["decrypttime"], T.td["pausedtime"],
+               T.td["speed"]]]
+        for r_ev in self.download_status.read_events:
+            (start, length, requesttime, finishtime, bytes, decrypt, paused) = r_ev
+            print r_ev
+            if finishtime is not None:
+                rtt = finishtime - requesttime - paused
+                speed = self.render_rate(None, 1.0 * bytes / rtt)
+                rtt = self.render_time(None, rtt)
+                decrypt = self.render_time(None, decrypt)
+                paused = self.render_time(None, paused)
+            else:
+                speed, rtt, decrypt, paused = "","","",""
+            t[T.tr[T.td["[%d:+%d]" % (start, length)],
+                   T.td[srt(requesttime)], T.td[srt(finishtime)],
+                   T.td[bytes], T.td[rtt], T.td[decrypt], T.td[paused],
+                   T.td[speed],
+                   ]]
+        l["Read Events:", t]
+
+        t = T.table(class_="status-download-events")
+        t[T.tr[T.td["type"], T.td["segnum"], T.td["when"], T.td["range"],
+               T.td["decodetime"], T.td["segtime"], T.td["speed"]]]
+        reqtime = (None, None)
+        for s_ev in self.download_status.segment_events:
+            (etype, segnum, when, segstart, seglen, decodetime) = s_ev
+            if etype == "request":
+                t[T.tr[T.td["request"], T.td["seg%d" % segnum],
+                       T.td[srt(when)]]]
+                reqtime = (segnum, when)
+            elif etype == "delivery":
+                if reqtime[0] == segnum:
+                    segtime = when - reqtime[1]
+                    speed = self.render_rate(None, 1.0 * seglen / segtime)
+                    segtime = self.render_time(None, segtime)
+                else:
+                    segtime, speed = "", ""
+                t[T.tr[T.td["delivery"], T.td["seg%d" % segnum],
+                       T.td[srt(when)],
+                       T.td["[%d:+%d]" % (segstart, seglen)],
+                       T.td[self.render_time(None,decodetime)],
+                       T.td[segtime], T.td[speed]]]
+            elif etype == "error":
+                t[T.tr[T.td["error"], T.td["seg%d" % segnum]]]
+        l["Segment Events:", t]
+
+        t = T.table(border="1")
+        t[T.tr[T.td["serverid"], T.td["shnum"], T.td["range"],
+               T.td["txtime"], T.td["rxtime"], T.td["received"], T.td["RTT"]]]
+        reqtime = (None, None)
+        request_events = []
+        for serverid,requests in self.download_status.requests.iteritems():
+            for req in requests:
+                request_events.append( (serverid,) + req )
+        request_events.sort(key=lambda req: (req[4],req[1]))
+        for r_ev in request_events:
+            (peerid, shnum, start, length, sent, receivedlen, received) = r_ev
+            rtt = None
+            if received is not None:
+                rtt = received - sent
+            peerid_s = idlib.shortnodeid_b2a(peerid)
+            t[T.tr(style="background: %s" % self.color(peerid))[
+                T.td[peerid_s], T.td[shnum],
+                T.td["[%d:+%d]" % (start, length)],
+                T.td[srt(sent)], T.td[srt(received)], T.td[receivedlen],
+                T.td[self.render_time(None, rtt)],
+                ]]
+        l["Requests:", t]
+
+        return l
+
+    def color(self, peerid):
+        def m(c):
+            return min(ord(c) / 2 + 0x80, 0xff)
+        return "#%02x%02x%02x" % (m(peerid[0]), m(peerid[1]), m(peerid[2]))
+
     def render_results(self, ctx, data):
         d = self.download_results()
         def _got_results(results):
@@ -371,7 +512,7 @@ class DownloadStatusPage(DownloadResultsRendererMixin, rend.Page):
         TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
         started_s = time.strftime(TIME_FORMAT,
                                   time.localtime(data.get_started()))
-        return started_s
+        return started_s + " (%s)" % data.get_started()
 
     def render_si(self, ctx, data):
         si_s = base32.b2a_or_none(data.get_storage_index())
index a9aced6e3c4c1c1a9816eacf5c332384a7229b4e..0ed83fc61e06f827db044e9345d3e236a148d904 100644 (file)
@@ -135,4 +135,14 @@ table.tahoe-directory {
   display: inline;
   text-align: center;
   padding: 0 1em;
-}
\ No newline at end of file
+}
+
+/* recent upload/download status pages */
+
+table.status-download-events {
+  border: 1px solid #aaa;
+}
+table.status-download-events td {
+  border: 1px solid #a00;
+  padding: 2px
+}