lp = self.log(format="sending DYHB to [%(name)s]", name=server.name(),
level=log.NOISY, umid="Io7pyg")
time_sent = now()
- d_ev = self._download_status.add_dyhb_sent(server.get_serverid(),
- time_sent)
+ d_ev = self._download_status.add_dyhb_request(server.get_serverid(),
+ time_sent)
# TODO: get the timer from a Server object, it knows best
self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT,
self.overdue, req)
eventually(self.share_consumer.got_shares, shares)
def _got_error(self, f, server, req, d_ev, lp):
- d_ev.finished("error", now())
+ d_ev.error(now())
self.log(format="got error from [%(name)s]",
name=server.name(), failure=f,
level=log.UNUSUAL, parent=lp, umid="zUKdCw")
import time
now = time.time
+from zope.interface import Interface
from twisted.python.failure import Failure
from twisted.internet import defer
from foolscap.api import eventually
from segmentation import Segmentation
from common import BadCiphertextHashError
+class IDownloadStatusHandlingConsumer(Interface):
+ def set_download_status_read_event(read_ev):
+ """Record the DownloadStatus 'read event', to be updated with the
+ time it takes to decrypt each chunk of data."""
+
class Cancel:
def __init__(self, f):
self._f = f
# things to track callers that want data
# _segment_requests can have duplicates
- self._segment_requests = [] # (segnum, d, cancel_handle, logparent)
+ self._segment_requests = [] # (segnum, d, cancel_handle, seg_ev, lp)
self._active_segment = None # a SegmentFetcher, with .segnum
self._segsize_observers = observer.OneShotObserverList()
# things called by outside callers, via CiphertextFileNode. get_segment()
# may also be called by Segmentation.
- def read(self, consumer, offset=0, size=None, read_ev=None):
+ def read(self, consumer, offset, size):
"""I am the main entry point, from which FileNode.read() can get
data. I feed the consumer with the desired range of ciphertext. I
return a Deferred that fires (with the consumer) when the read is
finished.
Note that there is no notion of a 'file pointer': each call to read()
- uses an independent offset= value."""
+ uses an independent offset= value.
+ """
# for concurrent operations: each gets its own Segmentation manager
if size is None:
size = self._verifycap.size
# ignore overruns: clip size so offset+size does not go past EOF, and
# so size is not negative (which indicates that offset >= EOF)
size = max(0, min(size, self._verifycap.size-offset))
- if read_ev is None:
- read_ev = self._download_status.add_read_event(offset, size, now())
+
+ read_ev = self._download_status.add_read_event(offset, size, now())
+ if IDownloadStatusHandlingConsumer.providedBy(consumer):
+ consumer.set_download_status_read_event(read_ev)
lp = log.msg(format="imm Node(%(si)s).read(%(offset)d, %(size)d)",
si=base32.b2a(self._verifycap.storage_index)[:8],
read_ev.finished(now())
# no data, so no producer, so no register/unregisterProducer
return defer.succeed(consumer)
+
+ # for concurrent operations, each read() gets its own Segmentation
+ # manager
s = Segmentation(self, offset, size, consumer, read_ev, lp)
+
# this raises an interesting question: what segments to fetch? if
# offset=0, always fetch the first segment, and then allow
# Segmentation to be responsible for pulling the subsequent ones if
si=base32.b2a(self._verifycap.storage_index)[:8],
segnum=segnum,
level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
- self._download_status.add_segment_request(segnum, now())
+ seg_ev = self._download_status.add_segment_request(segnum, now())
d = defer.Deferred()
c = Cancel(self._cancel_request)
- self._segment_requests.append( (segnum, d, c, lp) )
+ self._segment_requests.append( (segnum, d, c, seg_ev, lp) )
self._start_new_segment()
return (d, c)
def _start_new_segment(self):
if self._active_segment is None and self._segment_requests:
- segnum = self._segment_requests[0][0]
+ (segnum, d, c, seg_ev, lp) = self._segment_requests[0]
k = self._verifycap.needed_shares
- lp = self._segment_requests[0][3]
log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d",
node=repr(self), segnum=segnum,
level=log.NOISY, parent=lp, umid="wAlnHQ")
self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp)
+ seg_ev.activate(now())
active_shares = [s for s in self._shares if s.is_alive()]
fetcher.add_shares(active_shares) # this triggers the loop
def fetch_failed(self, sf, f):
assert sf is self._active_segment
# deliver error upwards
- for (d,c) in self._extract_requests(sf.segnum):
+ for (d,c,seg_ev) in self._extract_requests(sf.segnum):
+ seg_ev.error(now())
eventually(self._deliver, d, c, f)
self._active_segment = None
self._start_new_segment()
d = defer.maybeDeferred(self._decode_blocks, segnum, blocks)
d.addCallback(self._check_ciphertext_hash, segnum)
def _deliver(result):
- ds = self._download_status
- if isinstance(result, Failure):
- ds.add_segment_error(segnum, now())
- else:
- (offset, segment, decodetime) = result
- ds.add_segment_delivery(segnum, now(),
- offset, len(segment), decodetime)
log.msg(format="delivering segment(%(segnum)d)",
segnum=segnum,
level=log.OPERATIONAL, parent=self._lp,
umid="j60Ojg")
- for (d,c) in self._extract_requests(segnum):
- eventually(self._deliver, d, c, result)
+ when = now()
+ if isinstance(result, Failure):
+ # this catches failures in decode or ciphertext hash
+ for (d,c,seg_ev) in self._extract_requests(segnum):
+ seg_ev.error(when)
+ eventually(self._deliver, d, c, result)
+ else:
+ (offset, segment, decodetime) = result
+ for (d,c,seg_ev) in self._extract_requests(segnum):
+ # when we have two requests for the same segment, the
+ # second one will not be "activated" before the data is
+ # delivered, so to allow the status-reporting code to see
+ # consistent behavior, we activate them all now. The
+ # SegmentEvent will ignore duplicate activate() calls.
+ # Note that this will result in an inaccurate "receive
+ # speed" for the second request.
+ seg_ev.activate(when)
+ seg_ev.deliver(when, offset, len(segment), decodetime)
+ eventually(self._deliver, d, c, result)
self._active_segment = None
self._start_new_segment()
d.addBoth(_deliver)
- d.addErrback(lambda f:
- log.err("unhandled error during process_blocks",
- failure=f, level=log.WEIRD,
- parent=self._lp, umid="MkEsCg"))
+ d.addErrback(log.err, "unhandled error during process_blocks",
+ level=log.WEIRD, parent=self._lp, umid="MkEsCg")
def _decode_blocks(self, segnum, blocks):
tail = (segnum == self.num_segments-1)
def _extract_requests(self, segnum):
"""Remove matching requests and return their (d,c) tuples so that the
caller can retire them."""
- retire = [(d,c) for (segnum0, d, c, lp) in self._segment_requests
+ retire = [(d,c,seg_ev)
+ for (segnum0,d,c,seg_ev,lp) in self._segment_requests
if segnum0 == segnum]
self._segment_requests = [t for t in self._segment_requests
if t[0] != segnum]
def _cancel_request(self, c):
self._segment_requests = [t for t in self._segment_requests
if t[2] != c]
- segnums = [segnum for (segnum,d,c,lp) in self._segment_requests]
+ segnums = [segnum for (segnum,d,c,seg_ev,lp) in self._segment_requests]
# self._active_segment might be None in rare circumstances, so make
# sure we tolerate it
if self._active_segment and self._active_segment.segnum not in segnums:
# the consumer might call our .pauseProducing() inside that write()
# call, setting self._hungry=False
self._read_ev.update(len(desired_data), 0, 0)
+ # note: filenode.DecryptingConsumer is responsible for calling
+ # _read_ev.update with how much decrypt_time was consumed
self._maybe_fetch_next()
def _retry_bad_segment(self, f):
share=repr(self),
start=start, length=length,
level=log.NOISY, parent=self._lp, umid="sgVAyA")
- req_ev = ds.add_request_sent(self._server.get_serverid(),
- self._shnum,
- start, length, now())
+ block_ev = ds.add_block_request(self._server.get_serverid(),
+ self._shnum,
+ start, length, now())
d = self._send_request(start, length)
- d.addCallback(self._got_data, start, length, req_ev, lp)
- d.addErrback(self._got_error, start, length, req_ev, lp)
+ d.addCallback(self._got_data, start, length, block_ev, lp)
+ d.addErrback(self._got_error, start, length, block_ev, lp)
d.addCallback(self._trigger_loop)
d.addErrback(lambda f:
log.err(format="unhandled error during send_request",
def _send_request(self, start, length):
return self._rref.callRemote("read", start, length)
- def _got_data(self, data, start, length, req_ev, lp):
- req_ev.finished(len(data), now())
+ def _got_data(self, data, start, length, block_ev, lp):
+ block_ev.finished(len(data), now())
if not self._alive:
return
log.msg(format="%(share)s._got_data [%(start)d:+%(length)d] -> %(datalen)d",
# the wanted/needed span is only "wanted" for the first pass. Once
# the offset table arrives, it's all "needed".
- def _got_error(self, f, start, length, req_ev, lp):
- req_ev.finished("error", now())
+ def _got_error(self, f, start, length, block_ev, lp):
+ block_ev.error(now())
log.msg(format="error requesting %(start)d+%(length)d"
" from %(server)s for si %(si)s",
start=start, length=length,
from zope.interface import implements
from allmydata.interfaces import IDownloadStatus
-class RequestEvent:
- def __init__(self, download_status, tag):
- self._download_status = download_status
- self._tag = tag
- def finished(self, received, when):
- self._download_status.add_request_finished(self._tag, received, when)
+class ReadEvent:
+ def __init__(self, ev, ds):
+ self._ev = ev
+ self._ds = ds
+ def update(self, bytes, decrypttime, pausetime):
+ self._ev["bytes_returned"] += bytes
+ self._ev["decrypt_time"] += decrypttime
+ self._ev["paused_time"] += pausetime
+ def finished(self, finishtime):
+ self._ev["finish_time"] = finishtime
+ self._ds.update_last_timestamp(finishtime)
+
+class SegmentEvent:
+ def __init__(self, ev, ds):
+ self._ev = ev
+ self._ds = ds
+ def activate(self, when):
+ if self._ev["active_time"] is None:
+ self._ev["active_time"] = when
+ def deliver(self, when, start, length, decodetime):
+ assert self._ev["active_time"] is not None
+ self._ev["finish_time"] = when
+ self._ev["success"] = True
+ self._ev["decode_time"] = decodetime
+ self._ev["segment_start"] = start
+ self._ev["segment_length"] = length
+ self._ds.update_last_timestamp(when)
+ def error(self, when):
+ self._ev["finish_time"] = when
+ self._ev["success"] = False
+ self._ds.update_last_timestamp(when)
class DYHBEvent:
- def __init__(self, download_status, tag):
- self._download_status = download_status
- self._tag = tag
+ def __init__(self, ev, ds):
+ self._ev = ev
+ self._ds = ds
+ def error(self, when):
+ self._ev["finish_time"] = when
+ self._ev["success"] = False
+ self._ds.update_last_timestamp(when)
def finished(self, shnums, when):
- self._download_status.add_dyhb_finished(self._tag, shnums, when)
+ self._ev["finish_time"] = when
+ self._ev["success"] = True
+ self._ev["response_shnums"] = shnums
+ self._ds.update_last_timestamp(when)
+
+class BlockRequestEvent:
+ def __init__(self, ev, ds):
+ self._ev = ev
+ self._ds = ds
+ def finished(self, received, when):
+ self._ev["finish_time"] = when
+ self._ev["success"] = True
+ self._ev["response_length"] = received
+ self._ds.update_last_timestamp(when)
+ def error(self, when):
+ self._ev["finish_time"] = when
+ self._ev["success"] = False
+ self._ds.update_last_timestamp(when)
-class ReadEvent:
- def __init__(self, download_status, tag):
- self._download_status = download_status
- self._tag = tag
- def update(self, bytes, decrypttime, pausetime):
- self._download_status.update_read_event(self._tag, bytes,
- decrypttime, pausetime)
- def finished(self, finishtime):
- self._download_status.finish_read_event(self._tag, finishtime)
class DownloadStatus:
# There is one DownloadStatus for each CiphertextFileNode. The status
self.size = size
self.counter = self.statusid_counter.next()
self.helper = False
- self.started = None
- # self.dyhb_requests tracks "do you have a share" requests and
- # responses. It maps serverid to a tuple of:
- # send time
- # tuple of response shnums (None if response hasn't arrived, "error")
- # response time (None if response hasn't arrived yet)
- self.dyhb_requests = {}
-
- # self.requests tracks share-data requests and responses. It maps
- # serverid to a tuple of:
- # shnum,
- # start,length, (of data requested)
- # send time
- # response length (None if reponse hasn't arrived yet, or "error")
- # response time (None if response hasn't arrived)
- self.requests = {}
-
- # self.segment_events tracks segment requests and delivery. It is a
- # list of:
- # type ("request", "delivery", "error")
- # segment number
- # event time
- # segment start (file offset of first byte, None except in "delivery")
- # segment length (only in "delivery")
- # time spent in decode (only in "delivery")
- self.segment_events = []
- # self.read_events tracks read() requests. It is a list of:
+ self.first_timestamp = None
+ self.last_timestamp = None
+
+ # all four of these _events lists are sorted by start_time, because
+ # they are strictly append-only (some elements are later mutated in
+ # place, but none are removed or inserted in the middle).
+
+ # self.read_events tracks read() requests. It is a list of dicts,
+ # each with the following keys:
# start,length (of data requested)
- # request time
- # finish time (None until finished)
- # bytes returned (starts at 0, grows as segments are delivered)
- # time spent in decrypt (None for ciphertext-only reads)
- # time spent paused
+ # start_time
+ # finish_time (None until finished)
+ # bytes_returned (starts at 0, grows as segments are delivered)
+ # decrypt_time (time spent in decrypt, None for ciphertext-only reads)
+ # paused_time (time spent paused by client via pauseProducing)
self.read_events = []
+ # self.segment_events tracks segment requests and their resolution.
+ # It is a list of dicts:
+ # segment_number
+ # start_time
+ # active_time (None until work has begun)
+ # decode_time (time spent in decode, None until delievered)
+ # finish_time (None until resolved)
+ # success (None until resolved, then boolean)
+ # segment_start (file offset of first byte, None until delivered)
+ # segment_length (None until delivered)
+ self.segment_events = []
+
+ # self.dyhb_requests tracks "do you have a share" requests and
+ # responses. It is a list of dicts:
+ # serverid (binary)
+ # start_time
+ # success (None until resolved, then boolean)
+ # response_shnums (tuple, None until successful)
+ # finish_time (None until resolved)
+ self.dyhb_requests = []
+
+ # self.block_requests tracks share-data requests and responses. It is
+ # a list of dicts:
+ # serverid (binary),
+ # shnum,
+ # start,length, (of data requested)
+ # start_time
+ # finish_time (None until resolved)
+ # success (None until resolved, then bool)
+ # response_length (None until success)
+ self.block_requests = []
+
self.known_shares = [] # (serverid, shnum)
self.problems = []
- def add_dyhb_sent(self, serverid, when):
- r = (when, None, None)
- if serverid not in self.dyhb_requests:
- self.dyhb_requests[serverid] = []
- self.dyhb_requests[serverid].append(r)
- tag = (serverid, len(self.dyhb_requests[serverid])-1)
- return DYHBEvent(self, tag)
-
- def add_dyhb_finished(self, tag, shnums, when):
- # received="error" on error, else tuple(shnums)
- (serverid, index) = tag
- r = self.dyhb_requests[serverid][index]
- (sent, _, _) = r
- r = (sent, shnums, when)
- self.dyhb_requests[serverid][index] = r
-
- def add_request_sent(self, serverid, shnum, start, length, when):
- r = (shnum, start, length, when, None, None)
- if serverid not in self.requests:
- self.requests[serverid] = []
- self.requests[serverid].append(r)
- tag = (serverid, len(self.requests[serverid])-1)
- return RequestEvent(self, tag)
-
- def add_request_finished(self, tag, received, when):
- # received="error" on error, else len(data)
- (serverid, index) = tag
- r = self.requests[serverid][index]
- (shnum, start, length, sent, _, _) = r
- r = (shnum, start, length, sent, received, when)
- self.requests[serverid][index] = r
+ def add_read_event(self, start, length, when):
+ if self.first_timestamp is None:
+ self.first_timestamp = when
+ r = { "start": start,
+ "length": length,
+ "start_time": when,
+ "finish_time": None,
+ "bytes_returned": 0,
+ "decrypt_time": 0,
+ "paused_time": 0,
+ }
+ self.read_events.append(r)
+ return ReadEvent(r, self)
def add_segment_request(self, segnum, when):
- if self.started is None:
- self.started = when
- r = ("request", segnum, when, None, None, None)
- self.segment_events.append(r)
- def add_segment_delivery(self, segnum, when, start, length, decodetime):
- r = ("delivery", segnum, when, start, length, decodetime)
- self.segment_events.append(r)
- def add_segment_error(self, segnum, when):
- r = ("error", segnum, when, None, None, None)
+ if self.first_timestamp is None:
+ self.first_timestamp = when
+ r = { "segment_number": segnum,
+ "start_time": when,
+ "active_time": None,
+ "finish_time": None,
+ "success": None,
+ "decode_time": None,
+ "segment_start": None,
+ "segment_length": None,
+ }
self.segment_events.append(r)
+ return SegmentEvent(r, self)
- def add_read_event(self, start, length, when):
- if self.started is None:
- self.started = when
- r = (start, length, when, None, 0, 0, 0)
- self.read_events.append(r)
- tag = len(self.read_events)-1
- return ReadEvent(self, tag)
- def update_read_event(self, tag, bytes_d, decrypt_d, paused_d):
- r = self.read_events[tag]
- (start, length, requesttime, finishtime, bytes, decrypt, paused) = r
- bytes += bytes_d
- decrypt += decrypt_d
- paused += paused_d
- r = (start, length, requesttime, finishtime, bytes, decrypt, paused)
- self.read_events[tag] = r
- def finish_read_event(self, tag, finishtime):
- r = self.read_events[tag]
- (start, length, requesttime, _, bytes, decrypt, paused) = r
- r = (start, length, requesttime, finishtime, bytes, decrypt, paused)
- self.read_events[tag] = r
+ def add_dyhb_request(self, serverid, when):
+ r = { "serverid": serverid,
+ "start_time": when,
+ "success": None,
+ "response_shnums": None,
+ "finish_time": None,
+ }
+ self.dyhb_requests.append(r)
+ return DYHBEvent(r, self)
+
+ def add_block_request(self, serverid, shnum, start, length, when):
+ r = { "serverid": serverid,
+ "shnum": shnum,
+ "start": start,
+ "length": length,
+ "start_time": when,
+ "finish_time": None,
+ "success": None,
+ "response_length": None,
+ }
+ self.block_requests.append(r)
+ return BlockRequestEvent(r, self)
+
+ def update_last_timestamp(self, when):
+ if self.last_timestamp is None or when > self.last_timestamp:
+ self.last_timestamp = when
def add_known_share(self, serverid, shnum):
self.known_shares.append( (serverid, shnum) )
# mention all outstanding segment requests
outstanding = set()
errorful = set()
- for s_ev in self.segment_events:
- (etype, segnum, when, segstart, seglen, decodetime) = s_ev
- if etype == "request":
- outstanding.add(segnum)
- elif etype == "delivery":
- outstanding.remove(segnum)
- else: # "error"
- outstanding.remove(segnum)
- errorful.add(segnum)
+ outstanding = set([s_ev["segment_number"]
+ for s_ev in self.segment_events
+ if s_ev["finish_time"] is None])
+ errorful = set([s_ev["segment_number"]
+ for s_ev in self.segment_events
+ if s_ev["success"] is False])
def join(segnums):
if len(segnums) == 1:
return "segment %s" % list(segnums)[0]
return 0.0
total_outstanding, total_received = 0, 0
for r_ev in self.read_events:
- (start, length, ign1, finishtime, bytes, ign2, ign3) = r_ev
- if finishtime is None:
- total_outstanding += length
- total_received += bytes
+ if r_ev["finish_time"] is None:
+ total_outstanding += r_ev["length"]
+ total_received += r_ev["bytes_returned"]
# else ignore completed requests
if not total_outstanding:
return 1.0
return False
def get_started(self):
- return self.started
+ return self.first_timestamp
def get_results(self):
return None # TODO
import copy
import time
now = time.time
-from zope.interface import implements, Interface
+from zope.interface import implements
from twisted.internet import defer
from twisted.internet.interfaces import IConsumer
# local imports
from allmydata.immutable.checker import Checker
from allmydata.immutable.repairer import Repairer
-from allmydata.immutable.downloader.node import DownloadNode
+from allmydata.immutable.downloader.node import DownloadNode, \
+ IDownloadStatusHandlingConsumer
from allmydata.immutable.downloader.status import DownloadStatus
-class IDownloadStatusHandlingConsumer(Interface):
- def set_download_status_read_event(read_ev):
- """Record the DownloadStatus 'read event', to be updated with the
- time it takes to decrypt each chunk of data."""
-
class CiphertextFileNode:
def __init__(self, verifycap, storage_broker, secret_holder,
terminator, history):
return a Deferred that fires (with the consumer) when the read is
finished."""
self._maybe_create_download_node()
- actual_size = size
- if actual_size is None:
- actual_size = self._verifycap.size - offset
- read_ev = self._download_status.add_read_event(offset, actual_size,
- now())
- if IDownloadStatusHandlingConsumer.providedBy(consumer):
- consumer.set_download_status_read_event(read_ev)
- return self._node.read(consumer, offset, size, read_ev)
+ return self._node.read(consumer, offset, size)
def get_segment(self, segnum):
"""Begin downloading a segment. I return a tuple (d, c): 'd' is a
def __init__(self, consumer, readkey, offset):
self._consumer = consumer
- self._read_event = None
+ self._read_ev = None
# TODO: pycryptopp CTR-mode needs random-access operations: I want
# either a=AES(readkey, offset) or better yet both of:
# a=AES(readkey, offset=0)
self._decryptor.process("\x00"*offset_small)
def set_download_status_read_event(self, read_ev):
- self._read_event = read_ev
+ self._read_ev = read_ev
def registerProducer(self, producer, streaming):
# this passes through, so the real consumer can flow-control the real
def write(self, ciphertext):
started = now()
plaintext = self._decryptor.process(ciphertext)
- if self._read_event:
+ if self._read_ev:
elapsed = now() - started
- self._read_event.update(0, elapsed, 0)
+ self._read_ev.update(0, elapsed, 0)
self._consumer.write(plaintext)
class ImmutableFileNode:
now = 12345.1
ds = DownloadStatus("si-1", 123)
self.failUnlessEqual(ds.get_status(), "idle")
- ds.add_segment_request(0, now)
+ ev0 = ds.add_segment_request(0, now)
self.failUnlessEqual(ds.get_status(), "fetching segment 0")
- ds.add_segment_delivery(0, now+1, 0, 1000, 2.0)
+ ev0.activate(now+0.5)
+ ev0.deliver(now+1, 0, 1000, 2.0)
self.failUnlessEqual(ds.get_status(), "idle")
- ds.add_segment_request(2, now+2)
- ds.add_segment_request(1, now+2)
+ ev2 = ds.add_segment_request(2, now+2)
+ del ev2 # hush pyflakes
+ ev1 = ds.add_segment_request(1, now+2)
self.failUnlessEqual(ds.get_status(), "fetching segments 1,2")
- ds.add_segment_error(1, now+3)
+ ev1.error(now+3)
self.failUnlessEqual(ds.get_status(),
"fetching segment 2; errors on segment 1")