-import os, stat, time
+import os, stat, time, weakref
from allmydata.interfaces import RIStorageServer
from allmydata import node
from zope.interface import implements
from twisted.internet import reactor, defer
+from twisted.application import service
from twisted.application.internet import TimerService
from foolscap.api import Referenceable
from pycryptopp.publickey import rsa
from allmydata.storage.server import StorageServer
from allmydata import storage_client
from allmydata.immutable.upload import Uploader
-from allmydata.immutable.download import Downloader
from allmydata.immutable.offloaded import Helper
from allmydata.control import ControlServer
from allmydata.introducer.client import IntroducerClient
-from allmydata.util import hashutil, base32, pollmixin, cachedir, log
+from allmydata.util import hashutil, base32, pollmixin, log
from allmydata.util.encodingutil import get_filesystem_encoding
from allmydata.util.abbreviate import parse_abbreviated_size
from allmydata.util.time_format import parse_duration, parse_date
verifier = signer.get_verifying_key()
return defer.succeed( (verifier, signer) )
+class Terminator(service.Service):
+ def __init__(self):
+ self._clients = weakref.WeakKeyDictionary()
+ def register(self, c):
+ self._clients[c] = None
+ def stopService(self):
+ for c in self._clients:
+ c.stop()
+ return service.Service.stopService(self)
+
class Client(node.Node, pollmixin.PollMixin):
implements(IStatsProducer)
self.init_client_storage_broker()
self.history = History(self.stats_provider)
+ self.terminator = Terminator()
+ self.terminator.setServiceParent(self)
self.add_service(Uploader(helper_furl, self.stats_provider))
- download_cachedir = os.path.join(self.basedir,
- "private", "cache", "download")
- self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
- self.download_cache_dirman.setServiceParent(self)
- self.downloader = Downloader(self.storage_broker, self.stats_provider)
self.init_stub_client()
self.init_nodemaker()
self._secret_holder,
self.get_history(),
self.getServiceNamed("uploader"),
- self.downloader,
- self.download_cache_dirman,
+ self.terminator,
self.get_encoding_parameters(),
self._key_generator)
import weakref
from zope.interface import implements
from allmydata.interfaces import INodeMaker
-from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
+from allmydata.immutable.literal import LiteralFileNode
+from allmydata.immutable.filenode import ImmutableFileNode, CiphertextFileNode
from allmydata.immutable.upload import Data
from allmydata.mutable.filenode import MutableFileNode
from allmydata.dirnode import DirectoryNode, pack_children
implements(INodeMaker)
def __init__(self, storage_broker, secret_holder, history,
- uploader, downloader, download_cache_dirman,
+ uploader, terminator,
default_encoding_parameters, key_generator):
self.storage_broker = storage_broker
self.secret_holder = secret_holder
self.history = history
self.uploader = uploader
- self.downloader = downloader
- self.download_cache_dirman = download_cache_dirman
+ self.terminator = terminator
self.default_encoding_parameters = default_encoding_parameters
self.key_generator = key_generator
return LiteralFileNode(cap)
def _create_immutable(self, cap):
return ImmutableFileNode(cap, self.storage_broker, self.secret_holder,
- self.downloader, self.history,
- self.download_cache_dirman)
+ self.terminator, self.history)
+ def _create_immutable_verifier(self, cap):
+ return CiphertextFileNode(cap, self.storage_broker, self.secret_holder,
+ self.terminator, self.history)
def _create_mutable(self, cap):
n = MutableFileNode(self.storage_broker, self.secret_holder,
self.default_encoding_parameters,
return self._create_lit(cap)
if isinstance(cap, uri.CHKFileURI):
return self._create_immutable(cap)
+ if isinstance(cap, uri.CHKFileVerifierURI):
+ return self._create_immutable_verifier(cap)
if isinstance(cap, (uri.ReadonlySSKFileURI, uri.WriteableSSKFileURI)):
return self._create_mutable(cap)
if isinstance(cap, (uri.DirectoryURI,
def download_results(self):
return defer.maybeDeferred(self.download_status.get_results)
+ def relative_time(self, t):
+ if t is None:
+ return t
+ if self.download_status.started is not None:
+ return t - self.download_status.started
+ return t
+ def short_relative_time(self, t):
+ t = self.relative_time(t)
+ if t is None:
+ return ""
+ return "+%.6fs" % t
+
+ def renderHTTP(self, ctx):
+ req = inevow.IRequest(ctx)
+ t = get_arg(req, "t")
+ if t == "json":
+ return self.json(req)
+ return rend.Page.renderHTTP(self, ctx)
+
+ def json(self, req):
+ req.setHeader("content-type", "text/plain")
+ data = {}
+ dyhb_events = []
+ for serverid,requests in self.download_status.dyhb_requests.iteritems():
+ for req in requests:
+ dyhb_events.append( (base32.b2a(serverid),) + req )
+ dyhb_events.sort(key=lambda req: req[1])
+ data["dyhb"] = dyhb_events
+ request_events = []
+ for serverid,requests in self.download_status.requests.iteritems():
+ for req in requests:
+ request_events.append( (base32.b2a(serverid),) + req )
+ request_events.sort(key=lambda req: (req[4],req[1]))
+ data["requests"] = request_events
+ data["segment"] = self.download_status.segment_events
+ data["read"] = self.download_status.read_events
+ return simplejson.dumps(data, indent=1) + "\n"
+
+ def render_events(self, ctx, data):
+ if not self.download_status.storage_index:
+ return
+ srt = self.short_relative_time
+ l = T.ul()
+
+ t = T.table(class_="status-download-events")
+ t[T.tr[T.td["serverid"], T.td["sent"], T.td["received"],
+ T.td["shnums"], T.td["RTT"]]]
+ dyhb_events = []
+ for serverid,requests in self.download_status.dyhb_requests.iteritems():
+ for req in requests:
+ dyhb_events.append( (serverid,) + req )
+ dyhb_events.sort(key=lambda req: req[1])
+ for d_ev in dyhb_events:
+ (serverid, sent, shnums, received) = d_ev
+ serverid_s = idlib.shortnodeid_b2a(serverid)
+ rtt = received - sent
+ t[T.tr(style="background: %s" % self.color(serverid))[
+ [T.td[serverid_s], T.td[srt(sent)], T.td[srt(received)],
+ T.td[",".join([str(shnum) for shnum in shnums])],
+ T.td[self.render_time(None, rtt)],
+ ]]]
+ l["DYHB Requests:", t]
+
+ t = T.table(class_="status-download-events")
+ t[T.tr[T.td["range"], T.td["start"], T.td["finish"], T.td["got"],
+ T.td["time"], T.td["decrypttime"], T.td["pausedtime"],
+ T.td["speed"]]]
+ for r_ev in self.download_status.read_events:
+ (start, length, requesttime, finishtime, bytes, decrypt, paused) = r_ev
+ print r_ev
+ if finishtime is not None:
+ rtt = finishtime - requesttime - paused
+ speed = self.render_rate(None, 1.0 * bytes / rtt)
+ rtt = self.render_time(None, rtt)
+ decrypt = self.render_time(None, decrypt)
+ paused = self.render_time(None, paused)
+ else:
+ speed, rtt, decrypt, paused = "","","",""
+ t[T.tr[T.td["[%d:+%d]" % (start, length)],
+ T.td[srt(requesttime)], T.td[srt(finishtime)],
+ T.td[bytes], T.td[rtt], T.td[decrypt], T.td[paused],
+ T.td[speed],
+ ]]
+ l["Read Events:", t]
+
+ t = T.table(class_="status-download-events")
+ t[T.tr[T.td["type"], T.td["segnum"], T.td["when"], T.td["range"],
+ T.td["decodetime"], T.td["segtime"], T.td["speed"]]]
+ reqtime = (None, None)
+ for s_ev in self.download_status.segment_events:
+ (etype, segnum, when, segstart, seglen, decodetime) = s_ev
+ if etype == "request":
+ t[T.tr[T.td["request"], T.td["seg%d" % segnum],
+ T.td[srt(when)]]]
+ reqtime = (segnum, when)
+ elif etype == "delivery":
+ if reqtime[0] == segnum:
+ segtime = when - reqtime[1]
+ speed = self.render_rate(None, 1.0 * seglen / segtime)
+ segtime = self.render_time(None, segtime)
+ else:
+ segtime, speed = "", ""
+ t[T.tr[T.td["delivery"], T.td["seg%d" % segnum],
+ T.td[srt(when)],
+ T.td["[%d:+%d]" % (segstart, seglen)],
+ T.td[self.render_time(None,decodetime)],
+ T.td[segtime], T.td[speed]]]
+ elif etype == "error":
+ t[T.tr[T.td["error"], T.td["seg%d" % segnum]]]
+ l["Segment Events:", t]
+
+ t = T.table(border="1")
+ t[T.tr[T.td["serverid"], T.td["shnum"], T.td["range"],
+ T.td["txtime"], T.td["rxtime"], T.td["received"], T.td["RTT"]]]
+ reqtime = (None, None)
+ request_events = []
+ for serverid,requests in self.download_status.requests.iteritems():
+ for req in requests:
+ request_events.append( (serverid,) + req )
+ request_events.sort(key=lambda req: (req[4],req[1]))
+ for r_ev in request_events:
+ (peerid, shnum, start, length, sent, receivedlen, received) = r_ev
+ rtt = None
+ if received is not None:
+ rtt = received - sent
+ peerid_s = idlib.shortnodeid_b2a(peerid)
+ t[T.tr(style="background: %s" % self.color(peerid))[
+ T.td[peerid_s], T.td[shnum],
+ T.td["[%d:+%d]" % (start, length)],
+ T.td[srt(sent)], T.td[srt(received)], T.td[receivedlen],
+ T.td[self.render_time(None, rtt)],
+ ]]
+ l["Requests:", t]
+
+ return l
+
+ def color(self, peerid):
+ def m(c):
+ return min(ord(c) / 2 + 0x80, 0xff)
+ return "#%02x%02x%02x" % (m(peerid[0]), m(peerid[1]), m(peerid[2]))
+
def render_results(self, ctx, data):
d = self.download_results()
def _got_results(results):
TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
started_s = time.strftime(TIME_FORMAT,
time.localtime(data.get_started()))
- return started_s
+ return started_s + " (%s)" % data.get_started()
def render_si(self, ctx, data):
si_s = base32.b2a_or_none(data.get_storage_index())