From: Brian Warner Date: Wed, 4 Aug 2010 07:27:02 +0000 (-0700) Subject: Rewrite immutable downloader (#798). This patch includes higher-level X-Git-Tag: allmydata-tahoe-1.8.0b2~19 X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/simplejson/frontends/?a=commitdiff_plain;h=7b7b0c9709d8ade6ab9fe90b9e0a719308f25f50;p=tahoe-lafs%2Ftahoe-lafs.git Rewrite immutable downloader (#798). This patch includes higher-level integration into the NodeMaker, and updates the web-status display to handle the new download events. --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index d4b61727..fa515d41 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -1,9 +1,10 @@ -import os, stat, time +import os, stat, time, weakref from allmydata.interfaces import RIStorageServer from allmydata import node from zope.interface import implements from twisted.internet import reactor, defer +from twisted.application import service from twisted.application.internet import TimerService from foolscap.api import Referenceable from pycryptopp.publickey import rsa @@ -12,11 +13,10 @@ import allmydata from allmydata.storage.server import StorageServer from allmydata import storage_client from allmydata.immutable.upload import Uploader -from allmydata.immutable.download import Downloader from allmydata.immutable.offloaded import Helper from allmydata.control import ControlServer from allmydata.introducer.client import IntroducerClient -from allmydata.util import hashutil, base32, pollmixin, cachedir, log +from allmydata.util import hashutil, base32, pollmixin, log from allmydata.util.encodingutil import get_filesystem_encoding from allmydata.util.abbreviate import parse_abbreviated_size from allmydata.util.time_format import parse_duration, parse_date @@ -95,6 +95,16 @@ class KeyGenerator: verifier = signer.get_verifying_key() return defer.succeed( (verifier, signer) ) +class Terminator(service.Service): + def __init__(self): + self._clients = weakref.WeakKeyDictionary() + def register(self, c): + self._clients[c] = None + def stopService(self): + for c in self._clients: + c.stop() + return service.Service.stopService(self) + class Client(node.Node, pollmixin.PollMixin): implements(IStatsProducer) @@ -279,12 +289,9 @@ class Client(node.Node, pollmixin.PollMixin): self.init_client_storage_broker() self.history = History(self.stats_provider) + self.terminator = Terminator() + self.terminator.setServiceParent(self) self.add_service(Uploader(helper_furl, self.stats_provider)) - download_cachedir = os.path.join(self.basedir, - "private", "cache", "download") - self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir) - self.download_cache_dirman.setServiceParent(self) - self.downloader = Downloader(self.storage_broker, self.stats_provider) self.init_stub_client() self.init_nodemaker() @@ -343,8 +350,7 @@ class Client(node.Node, pollmixin.PollMixin): self._secret_holder, self.get_history(), self.getServiceNamed("uploader"), - self.downloader, - self.download_cache_dirman, + self.terminator, self.get_encoding_parameters(), self._key_generator) diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 4cfe9c90..3a7fa7fa 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -24,6 +24,9 @@ WriteEnablerSecret = Hash # used to protect mutable bucket modifications LeaseRenewSecret = Hash # used to protect bucket lease renewal requests LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests +KiB = 1024 +DEFAULT_MAX_SEGMENT_SIZE = 128*KiB + class RIStubClient(RemoteInterface): """Each client publishes a service announcement for a dummy object called the StubClient. This object doesn't actually offer any services, but the diff --git a/src/allmydata/nodemaker.py b/src/allmydata/nodemaker.py index c852f688..3b74d907 100644 --- a/src/allmydata/nodemaker.py +++ b/src/allmydata/nodemaker.py @@ -1,7 +1,8 @@ import weakref from zope.interface import implements from allmydata.interfaces import INodeMaker -from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode +from allmydata.immutable.literal import LiteralFileNode +from allmydata.immutable.filenode import ImmutableFileNode, CiphertextFileNode from allmydata.immutable.upload import Data from allmydata.mutable.filenode import MutableFileNode from allmydata.dirnode import DirectoryNode, pack_children @@ -12,14 +13,13 @@ class NodeMaker: implements(INodeMaker) def __init__(self, storage_broker, secret_holder, history, - uploader, downloader, download_cache_dirman, + uploader, terminator, default_encoding_parameters, key_generator): self.storage_broker = storage_broker self.secret_holder = secret_holder self.history = history self.uploader = uploader - self.downloader = downloader - self.download_cache_dirman = download_cache_dirman + self.terminator = terminator self.default_encoding_parameters = default_encoding_parameters self.key_generator = key_generator @@ -29,8 +29,10 @@ class NodeMaker: return LiteralFileNode(cap) def _create_immutable(self, cap): return ImmutableFileNode(cap, self.storage_broker, self.secret_holder, - self.downloader, self.history, - self.download_cache_dirman) + self.terminator, self.history) + def _create_immutable_verifier(self, cap): + return CiphertextFileNode(cap, self.storage_broker, self.secret_holder, + self.terminator, self.history) def _create_mutable(self, cap): n = MutableFileNode(self.storage_broker, self.secret_holder, self.default_encoding_parameters, @@ -73,6 +75,8 @@ class NodeMaker: return self._create_lit(cap) if isinstance(cap, uri.CHKFileURI): return self._create_immutable(cap) + if isinstance(cap, uri.CHKFileVerifierURI): + return self._create_immutable_verifier(cap) if isinstance(cap, (uri.ReadonlySSKFileURI, uri.WriteableSSKFileURI)): return self._create_mutable(cap) if isinstance(cap, (uri.DirectoryURI, diff --git a/src/allmydata/web/download-status.xhtml b/src/allmydata/web/download-status.xhtml index 77342ba5..30abfca4 100644 --- a/src/allmydata/web/download-status.xhtml +++ b/src/allmydata/web/download-status.xhtml @@ -18,6 +18,7 @@
  • Status:
  • +

    Download Results

    diff --git a/src/allmydata/web/status.py b/src/allmydata/web/status.py index e4241a32..c3a55d7e 100644 --- a/src/allmydata/web/status.py +++ b/src/allmydata/web/status.py @@ -358,6 +358,147 @@ class DownloadStatusPage(DownloadResultsRendererMixin, rend.Page): def download_results(self): return defer.maybeDeferred(self.download_status.get_results) + def relative_time(self, t): + if t is None: + return t + if self.download_status.started is not None: + return t - self.download_status.started + return t + def short_relative_time(self, t): + t = self.relative_time(t) + if t is None: + return "" + return "+%.6fs" % t + + def renderHTTP(self, ctx): + req = inevow.IRequest(ctx) + t = get_arg(req, "t") + if t == "json": + return self.json(req) + return rend.Page.renderHTTP(self, ctx) + + def json(self, req): + req.setHeader("content-type", "text/plain") + data = {} + dyhb_events = [] + for serverid,requests in self.download_status.dyhb_requests.iteritems(): + for req in requests: + dyhb_events.append( (base32.b2a(serverid),) + req ) + dyhb_events.sort(key=lambda req: req[1]) + data["dyhb"] = dyhb_events + request_events = [] + for serverid,requests in self.download_status.requests.iteritems(): + for req in requests: + request_events.append( (base32.b2a(serverid),) + req ) + request_events.sort(key=lambda req: (req[4],req[1])) + data["requests"] = request_events + data["segment"] = self.download_status.segment_events + data["read"] = self.download_status.read_events + return simplejson.dumps(data, indent=1) + "\n" + + def render_events(self, ctx, data): + if not self.download_status.storage_index: + return + srt = self.short_relative_time + l = T.ul() + + t = T.table(class_="status-download-events") + t[T.tr[T.td["serverid"], T.td["sent"], T.td["received"], + T.td["shnums"], T.td["RTT"]]] + dyhb_events = [] + for serverid,requests in self.download_status.dyhb_requests.iteritems(): + for req in requests: + dyhb_events.append( (serverid,) + req ) + dyhb_events.sort(key=lambda req: req[1]) + for d_ev in dyhb_events: + (serverid, sent, shnums, received) = d_ev + serverid_s = idlib.shortnodeid_b2a(serverid) + rtt = received - sent + t[T.tr(style="background: %s" % self.color(serverid))[ + [T.td[serverid_s], T.td[srt(sent)], T.td[srt(received)], + T.td[",".join([str(shnum) for shnum in shnums])], + T.td[self.render_time(None, rtt)], + ]]] + l["DYHB Requests:", t] + + t = T.table(class_="status-download-events") + t[T.tr[T.td["range"], T.td["start"], T.td["finish"], T.td["got"], + T.td["time"], T.td["decrypttime"], T.td["pausedtime"], + T.td["speed"]]] + for r_ev in self.download_status.read_events: + (start, length, requesttime, finishtime, bytes, decrypt, paused) = r_ev + print r_ev + if finishtime is not None: + rtt = finishtime - requesttime - paused + speed = self.render_rate(None, 1.0 * bytes / rtt) + rtt = self.render_time(None, rtt) + decrypt = self.render_time(None, decrypt) + paused = self.render_time(None, paused) + else: + speed, rtt, decrypt, paused = "","","","" + t[T.tr[T.td["[%d:+%d]" % (start, length)], + T.td[srt(requesttime)], T.td[srt(finishtime)], + T.td[bytes], T.td[rtt], T.td[decrypt], T.td[paused], + T.td[speed], + ]] + l["Read Events:", t] + + t = T.table(class_="status-download-events") + t[T.tr[T.td["type"], T.td["segnum"], T.td["when"], T.td["range"], + T.td["decodetime"], T.td["segtime"], T.td["speed"]]] + reqtime = (None, None) + for s_ev in self.download_status.segment_events: + (etype, segnum, when, segstart, seglen, decodetime) = s_ev + if etype == "request": + t[T.tr[T.td["request"], T.td["seg%d" % segnum], + T.td[srt(when)]]] + reqtime = (segnum, when) + elif etype == "delivery": + if reqtime[0] == segnum: + segtime = when - reqtime[1] + speed = self.render_rate(None, 1.0 * seglen / segtime) + segtime = self.render_time(None, segtime) + else: + segtime, speed = "", "" + t[T.tr[T.td["delivery"], T.td["seg%d" % segnum], + T.td[srt(when)], + T.td["[%d:+%d]" % (segstart, seglen)], + T.td[self.render_time(None,decodetime)], + T.td[segtime], T.td[speed]]] + elif etype == "error": + t[T.tr[T.td["error"], T.td["seg%d" % segnum]]] + l["Segment Events:", t] + + t = T.table(border="1") + t[T.tr[T.td["serverid"], T.td["shnum"], T.td["range"], + T.td["txtime"], T.td["rxtime"], T.td["received"], T.td["RTT"]]] + reqtime = (None, None) + request_events = [] + for serverid,requests in self.download_status.requests.iteritems(): + for req in requests: + request_events.append( (serverid,) + req ) + request_events.sort(key=lambda req: (req[4],req[1])) + for r_ev in request_events: + (peerid, shnum, start, length, sent, receivedlen, received) = r_ev + rtt = None + if received is not None: + rtt = received - sent + peerid_s = idlib.shortnodeid_b2a(peerid) + t[T.tr(style="background: %s" % self.color(peerid))[ + T.td[peerid_s], T.td[shnum], + T.td["[%d:+%d]" % (start, length)], + T.td[srt(sent)], T.td[srt(received)], T.td[receivedlen], + T.td[self.render_time(None, rtt)], + ]] + l["Requests:", t] + + return l + + def color(self, peerid): + def m(c): + return min(ord(c) / 2 + 0x80, 0xff) + return "#%02x%02x%02x" % (m(peerid[0]), m(peerid[1]), m(peerid[2])) + def render_results(self, ctx, data): d = self.download_results() def _got_results(results): @@ -371,7 +512,7 @@ class DownloadStatusPage(DownloadResultsRendererMixin, rend.Page): TIME_FORMAT = "%H:%M:%S %d-%b-%Y" started_s = time.strftime(TIME_FORMAT, time.localtime(data.get_started())) - return started_s + return started_s + " (%s)" % data.get_started() def render_si(self, ctx, data): si_s = base32.b2a_or_none(data.get_storage_index()) diff --git a/src/allmydata/web/tahoe.css b/src/allmydata/web/tahoe.css index a9aced6e..0ed83fc6 100644 --- a/src/allmydata/web/tahoe.css +++ b/src/allmydata/web/tahoe.css @@ -135,4 +135,14 @@ table.tahoe-directory { display: inline; text-align: center; padding: 0 1em; -} \ No newline at end of file +} + +/* recent upload/download status pages */ + +table.status-download-events { + border: 1px solid #aaa; +} +table.status-download-events td { + border: 1px solid #a00; + padding: 2px +}