prepare for viz: improve DownloadStatus events
authorBrian Warner <warner@lothar.com>
Wed, 29 Jun 2011 22:22:04 +0000 (15:22 -0700)
committerBrian Warner <warner@lothar.com>
Wed, 29 Jun 2011 22:25:42 +0000 (15:25 -0700)
consolidate IDownloadStatusHandlingConsumer stuff into DownloadNode

src/allmydata/immutable/downloader/finder.py
src/allmydata/immutable/downloader/node.py
src/allmydata/immutable/downloader/segmentation.py
src/allmydata/immutable/downloader/share.py
src/allmydata/immutable/downloader/status.py
src/allmydata/immutable/filenode.py
src/allmydata/test/test_download.py

index c2f9c1a66544e508ddc4cd246bafa0b8666b52c1..0103a589e6993bbd0f2e83bb80cecfb2e4f09d2e 100644 (file)
@@ -135,8 +135,8 @@ class ShareFinder:
         lp = self.log(format="sending DYHB to [%(name)s]", name=server.name(),
                       level=log.NOISY, umid="Io7pyg")
         time_sent = now()
-        d_ev = self._download_status.add_dyhb_sent(server.get_serverid(),
-                                                   time_sent)
+        d_ev = self._download_status.add_dyhb_request(server.get_serverid(),
+                                                      time_sent)
         # TODO: get the timer from a Server object, it knows best
         self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT,
                                                      self.overdue, req)
@@ -218,7 +218,7 @@ class ShareFinder:
         eventually(self.share_consumer.got_shares, shares)
 
     def _got_error(self, f, server, req, d_ev, lp):
-        d_ev.finished("error", now())
+        d_ev.error(now())
         self.log(format="got error from [%(name)s]",
                  name=server.name(), failure=f,
                  level=log.UNUSUAL, parent=lp, umid="zUKdCw")
index 04482e630b1609d412ef7fbfad08379e4d4babfe..428bca895d1f0f99ade19ed3cb275e0ce40cc420 100644 (file)
@@ -1,6 +1,7 @@
 
 import time
 now = time.time
+from zope.interface import Interface
 from twisted.python.failure import Failure
 from twisted.internet import defer
 from foolscap.api import eventually
@@ -17,6 +18,11 @@ from fetcher import SegmentFetcher
 from segmentation import Segmentation
 from common import BadCiphertextHashError
 
+class IDownloadStatusHandlingConsumer(Interface):
+    def set_download_status_read_event(read_ev):
+        """Record the DownloadStatus 'read event', to be updated with the
+        time it takes to decrypt each chunk of data."""
+
 class Cancel:
     def __init__(self, f):
         self._f = f
@@ -72,7 +78,7 @@ class DownloadNode:
         # things to track callers that want data
 
         # _segment_requests can have duplicates
-        self._segment_requests = [] # (segnum, d, cancel_handle, logparent)
+        self._segment_requests = [] # (segnum, d, cancel_handle, seg_ev, lp)
         self._active_segment = None # a SegmentFetcher, with .segnum
 
         self._segsize_observers = observer.OneShotObserverList()
@@ -119,22 +125,25 @@ class DownloadNode:
     # things called by outside callers, via CiphertextFileNode. get_segment()
     # may also be called by Segmentation.
 
-    def read(self, consumer, offset=0, size=None, read_ev=None):
+    def read(self, consumer, offset, size):
         """I am the main entry point, from which FileNode.read() can get
         data. I feed the consumer with the desired range of ciphertext. I
         return a Deferred that fires (with the consumer) when the read is
         finished.
 
         Note that there is no notion of a 'file pointer': each call to read()
-        uses an independent offset= value."""
+        uses an independent offset= value.
+        """
         # for concurrent operations: each gets its own Segmentation manager
         if size is None:
             size = self._verifycap.size
         # ignore overruns: clip size so offset+size does not go past EOF, and
         # so size is not negative (which indicates that offset >= EOF)
         size = max(0, min(size, self._verifycap.size-offset))
-        if read_ev is None:
-            read_ev = self._download_status.add_read_event(offset, size, now())
+
+        read_ev = self._download_status.add_read_event(offset, size, now())
+        if IDownloadStatusHandlingConsumer.providedBy(consumer):
+            consumer.set_download_status_read_event(read_ev)
 
         lp = log.msg(format="imm Node(%(si)s).read(%(offset)d, %(size)d)",
                      si=base32.b2a(self._verifycap.storage_index)[:8],
@@ -148,7 +157,11 @@ class DownloadNode:
             read_ev.finished(now())
             # no data, so no producer, so no register/unregisterProducer
             return defer.succeed(consumer)
+
+        # for concurrent operations, each read() gets its own Segmentation
+        # manager
         s = Segmentation(self, offset, size, consumer, read_ev, lp)
+
         # this raises an interesting question: what segments to fetch? if
         # offset=0, always fetch the first segment, and then allow
         # Segmentation to be responsible for pulling the subsequent ones if
@@ -186,10 +199,10 @@ class DownloadNode:
                      si=base32.b2a(self._verifycap.storage_index)[:8],
                      segnum=segnum,
                      level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
-        self._download_status.add_segment_request(segnum, now())
+        seg_ev = self._download_status.add_segment_request(segnum, now())
         d = defer.Deferred()
         c = Cancel(self._cancel_request)
-        self._segment_requests.append( (segnum, d, c, lp) )
+        self._segment_requests.append( (segnum, d, c, seg_ev, lp) )
         self._start_new_segment()
         return (d, c)
 
@@ -213,13 +226,13 @@ class DownloadNode:
 
     def _start_new_segment(self):
         if self._active_segment is None and self._segment_requests:
-            segnum = self._segment_requests[0][0]
+            (segnum, d, c, seg_ev, lp) = self._segment_requests[0]
             k = self._verifycap.needed_shares
-            lp = self._segment_requests[0][3]
             log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d",
                     node=repr(self), segnum=segnum,
                     level=log.NOISY, parent=lp, umid="wAlnHQ")
             self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp)
+            seg_ev.activate(now())
             active_shares = [s for s in self._shares if s.is_alive()]
             fetcher.add_shares(active_shares) # this triggers the loop
 
@@ -383,7 +396,8 @@ class DownloadNode:
     def fetch_failed(self, sf, f):
         assert sf is self._active_segment
         # deliver error upwards
-        for (d,c) in self._extract_requests(sf.segnum):
+        for (d,c,seg_ev) in self._extract_requests(sf.segnum):
+            seg_ev.error(now())
             eventually(self._deliver, d, c, f)
         self._active_segment = None
         self._start_new_segment()
@@ -392,26 +406,34 @@ class DownloadNode:
         d = defer.maybeDeferred(self._decode_blocks, segnum, blocks)
         d.addCallback(self._check_ciphertext_hash, segnum)
         def _deliver(result):
-            ds = self._download_status
-            if isinstance(result, Failure):
-                ds.add_segment_error(segnum, now())
-            else:
-                (offset, segment, decodetime) = result
-                ds.add_segment_delivery(segnum, now(),
-                                        offset, len(segment), decodetime)
             log.msg(format="delivering segment(%(segnum)d)",
                     segnum=segnum,
                     level=log.OPERATIONAL, parent=self._lp,
                     umid="j60Ojg")
-            for (d,c) in self._extract_requests(segnum):
-                eventually(self._deliver, d, c, result)
+            when = now()
+            if isinstance(result, Failure):
+                # this catches failures in decode or ciphertext hash
+                for (d,c,seg_ev) in self._extract_requests(segnum):
+                    seg_ev.error(when)
+                    eventually(self._deliver, d, c, result)
+            else:
+                (offset, segment, decodetime) = result
+                for (d,c,seg_ev) in self._extract_requests(segnum):
+                    # when we have two requests for the same segment, the
+                    # second one will not be "activated" before the data is
+                    # delivered, so to allow the status-reporting code to see
+                    # consistent behavior, we activate them all now. The
+                    # SegmentEvent will ignore duplicate activate() calls.
+                    # Note that this will result in an inaccurate "receive
+                    # speed" for the second request.
+                    seg_ev.activate(when)
+                    seg_ev.deliver(when, offset, len(segment), decodetime)
+                    eventually(self._deliver, d, c, result)
             self._active_segment = None
             self._start_new_segment()
         d.addBoth(_deliver)
-        d.addErrback(lambda f:
-                     log.err("unhandled error during process_blocks",
-                             failure=f, level=log.WEIRD,
-                             parent=self._lp, umid="MkEsCg"))
+        d.addErrback(log.err, "unhandled error during process_blocks",
+                     level=log.WEIRD, parent=self._lp, umid="MkEsCg")
 
     def _decode_blocks(self, segnum, blocks):
         tail = (segnum == self.num_segments-1)
@@ -479,7 +501,8 @@ class DownloadNode:
     def _extract_requests(self, segnum):
         """Remove matching requests and return their (d,c) tuples so that the
         caller can retire them."""
-        retire = [(d,c) for (segnum0, d, c, lp) in self._segment_requests
+        retire = [(d,c,seg_ev)
+                  for (segnum0,d,c,seg_ev,lp) in self._segment_requests
                   if segnum0 == segnum]
         self._segment_requests = [t for t in self._segment_requests
                                   if t[0] != segnum]
@@ -488,7 +511,7 @@ class DownloadNode:
     def _cancel_request(self, c):
         self._segment_requests = [t for t in self._segment_requests
                                   if t[2] != c]
-        segnums = [segnum for (segnum,d,c,lp) in self._segment_requests]
+        segnums = [segnum for (segnum,d,c,seg_ev,lp) in self._segment_requests]
         # self._active_segment might be None in rare circumstances, so make
         # sure we tolerate it
         if self._active_segment and self._active_segment.segnum not in segnums:
index 7c9f5cf65460ff6b7fe07b33dbce8dd492abf34c..84dddbe98fb5d6a3d30f72d363e96f51dfbed955 100644 (file)
@@ -123,6 +123,8 @@ class Segmentation:
         # the consumer might call our .pauseProducing() inside that write()
         # call, setting self._hungry=False
         self._read_ev.update(len(desired_data), 0, 0)
+        # note: filenode.DecryptingConsumer is responsible for calling
+        # _read_ev.update with how much decrypt_time was consumed
         self._maybe_fetch_next()
 
     def _retry_bad_segment(self, f):
index 32be47a4425dc1bcc9b5f42ef50d062ee7191077..95173ae6d3fee501dfec66a21bb2c286bbc78630 100644 (file)
@@ -726,12 +726,12 @@ class Share:
                          share=repr(self),
                          start=start, length=length,
                          level=log.NOISY, parent=self._lp, umid="sgVAyA")
-            req_ev = ds.add_request_sent(self._server.get_serverid(),
-                                         self._shnum,
-                                         start, length, now())
+            block_ev = ds.add_block_request(self._server.get_serverid(),
+                                            self._shnum,
+                                            start, length, now())
             d = self._send_request(start, length)
-            d.addCallback(self._got_data, start, length, req_ev, lp)
-            d.addErrback(self._got_error, start, length, req_ev, lp)
+            d.addCallback(self._got_data, start, length, block_ev, lp)
+            d.addErrback(self._got_error, start, length, block_ev, lp)
             d.addCallback(self._trigger_loop)
             d.addErrback(lambda f:
                          log.err(format="unhandled error during send_request",
@@ -741,8 +741,8 @@ class Share:
     def _send_request(self, start, length):
         return self._rref.callRemote("read", start, length)
 
-    def _got_data(self, data, start, length, req_ev, lp):
-        req_ev.finished(len(data), now())
+    def _got_data(self, data, start, length, block_ev, lp):
+        block_ev.finished(len(data), now())
         if not self._alive:
             return
         log.msg(format="%(share)s._got_data [%(start)d:+%(length)d] -> %(datalen)d",
@@ -784,8 +784,8 @@ class Share:
         # the wanted/needed span is only "wanted" for the first pass. Once
         # the offset table arrives, it's all "needed".
 
-    def _got_error(self, f, start, length, req_ev, lp):
-        req_ev.finished("error", now())
+    def _got_error(self, f, start, length, block_ev, lp):
+        block_ev.error(now())
         log.msg(format="error requesting %(start)d+%(length)d"
                 " from %(server)s for si %(si)s",
                 start=start, length=length,
index 4576d92c761e0b950aa3b3c047f45e07739b7c68..0bf3f715e664b27349296025da1c946b7c8b994c 100644 (file)
@@ -3,29 +3,66 @@ import itertools
 from zope.interface import implements
 from allmydata.interfaces import IDownloadStatus
 
-class RequestEvent:
-    def __init__(self, download_status, tag):
-        self._download_status = download_status
-        self._tag = tag
-    def finished(self, received, when):
-        self._download_status.add_request_finished(self._tag, received, when)
+class ReadEvent:
+    def __init__(self, ev, ds):
+        self._ev = ev
+        self._ds = ds
+    def update(self, bytes, decrypttime, pausetime):
+        self._ev["bytes_returned"] += bytes
+        self._ev["decrypt_time"] += decrypttime
+        self._ev["paused_time"] += pausetime
+    def finished(self, finishtime):
+        self._ev["finish_time"] = finishtime
+        self._ds.update_last_timestamp(finishtime)
+
+class SegmentEvent:
+    def __init__(self, ev, ds):
+        self._ev = ev
+        self._ds = ds
+    def activate(self, when):
+        if self._ev["active_time"] is None:
+            self._ev["active_time"] = when
+    def deliver(self, when, start, length, decodetime):
+        assert self._ev["active_time"] is not None
+        self._ev["finish_time"] = when
+        self._ev["success"] = True
+        self._ev["decode_time"] = decodetime
+        self._ev["segment_start"] = start
+        self._ev["segment_length"] = length
+        self._ds.update_last_timestamp(when)
+    def error(self, when):
+        self._ev["finish_time"] = when
+        self._ev["success"] = False
+        self._ds.update_last_timestamp(when)
 
 class DYHBEvent:
-    def __init__(self, download_status, tag):
-        self._download_status = download_status
-        self._tag = tag
+    def __init__(self, ev, ds):
+        self._ev = ev
+        self._ds = ds
+    def error(self, when):
+        self._ev["finish_time"] = when
+        self._ev["success"] = False
+        self._ds.update_last_timestamp(when)
     def finished(self, shnums, when):
-        self._download_status.add_dyhb_finished(self._tag, shnums, when)
+        self._ev["finish_time"] = when
+        self._ev["success"] = True
+        self._ev["response_shnums"] = shnums
+        self._ds.update_last_timestamp(when)
+
+class BlockRequestEvent:
+    def __init__(self, ev, ds):
+        self._ev = ev
+        self._ds = ds
+    def finished(self, received, when):
+        self._ev["finish_time"] = when
+        self._ev["success"] = True
+        self._ev["response_length"] = received
+        self._ds.update_last_timestamp(when)
+    def error(self, when):
+        self._ev["finish_time"] = when
+        self._ev["success"] = False
+        self._ds.update_last_timestamp(when)
 
-class ReadEvent:
-    def __init__(self, download_status, tag):
-        self._download_status = download_status
-        self._tag = tag
-    def update(self, bytes, decrypttime, pausetime):
-        self._download_status.update_read_event(self._tag, bytes,
-                                                decrypttime, pausetime)
-    def finished(self, finishtime):
-        self._download_status.finish_read_event(self._tag, finishtime)
 
 class DownloadStatus:
     # There is one DownloadStatus for each CiphertextFileNode. The status
@@ -38,110 +75,115 @@ class DownloadStatus:
         self.size = size
         self.counter = self.statusid_counter.next()
         self.helper = False
-        self.started = None
-        # self.dyhb_requests tracks "do you have a share" requests and
-        # responses. It maps serverid to a tuple of:
-        #  send time
-        #  tuple of response shnums (None if response hasn't arrived, "error")
-        #  response time (None if response hasn't arrived yet)
-        self.dyhb_requests = {}
-
-        # self.requests tracks share-data requests and responses. It maps
-        # serverid to a tuple of:
-        #  shnum,
-        #  start,length,  (of data requested)
-        #  send time
-        #  response length (None if reponse hasn't arrived yet, or "error")
-        #  response time (None if response hasn't arrived)
-        self.requests = {}
-
-        # self.segment_events tracks segment requests and delivery. It is a
-        # list of:
-        #  type ("request", "delivery", "error")
-        #  segment number
-        #  event time
-        #  segment start (file offset of first byte, None except in "delivery")
-        #  segment length (only in "delivery")
-        #  time spent in decode (only in "delivery")
-        self.segment_events = []
 
-        # self.read_events tracks read() requests. It is a list of:
+        self.first_timestamp = None
+        self.last_timestamp = None
+
+        # all four of these _events lists are sorted by start_time, because
+        # they are strictly append-only (some elements are later mutated in
+        # place, but none are removed or inserted in the middle).
+
+        # self.read_events tracks read() requests. It is a list of dicts,
+        # each with the following keys:
         #  start,length  (of data requested)
-        #  request time
-        #  finish time (None until finished)
-        #  bytes returned (starts at 0, grows as segments are delivered)
-        #  time spent in decrypt (None for ciphertext-only reads)
-        #  time spent paused
+        #  start_time
+        #  finish_time (None until finished)
+        #  bytes_returned (starts at 0, grows as segments are delivered)
+        #  decrypt_time (time spent in decrypt, None for ciphertext-only reads)
+        #  paused_time (time spent paused by client via pauseProducing)
         self.read_events = []
 
+        # self.segment_events tracks segment requests and their resolution.
+        # It is a list of dicts:
+        #  segment_number
+        #  start_time
+        #  active_time (None until work has begun)
+        #  decode_time (time spent in decode, None until delievered)
+        #  finish_time (None until resolved)
+        #  success (None until resolved, then boolean)
+        #  segment_start (file offset of first byte, None until delivered)
+        #  segment_length (None until delivered)
+        self.segment_events = []
+
+        # self.dyhb_requests tracks "do you have a share" requests and
+        # responses. It is a list of dicts:
+        #  serverid (binary)
+        #  start_time
+        #  success (None until resolved, then boolean)
+        #  response_shnums (tuple, None until successful)
+        #  finish_time (None until resolved)
+        self.dyhb_requests = []
+
+        # self.block_requests tracks share-data requests and responses. It is
+        # a list of dicts:
+        #  serverid (binary),
+        #  shnum,
+        #  start,length,  (of data requested)
+        #  start_time
+        #  finish_time (None until resolved)
+        #  success (None until resolved, then bool)
+        #  response_length (None until success)
+        self.block_requests = []
+
         self.known_shares = [] # (serverid, shnum)
         self.problems = []
 
 
-    def add_dyhb_sent(self, serverid, when):
-        r = (when, None, None)
-        if serverid not in self.dyhb_requests:
-            self.dyhb_requests[serverid] = []
-        self.dyhb_requests[serverid].append(r)
-        tag = (serverid, len(self.dyhb_requests[serverid])-1)
-        return DYHBEvent(self, tag)
-
-    def add_dyhb_finished(self, tag, shnums, when):
-        # received="error" on error, else tuple(shnums)
-        (serverid, index) = tag
-        r = self.dyhb_requests[serverid][index]
-        (sent, _, _) = r
-        r = (sent, shnums, when)
-        self.dyhb_requests[serverid][index] = r
-
-    def add_request_sent(self, serverid, shnum, start, length, when):
-        r = (shnum, start, length, when, None, None)
-        if serverid not in self.requests:
-            self.requests[serverid] = []
-        self.requests[serverid].append(r)
-        tag = (serverid, len(self.requests[serverid])-1)
-        return RequestEvent(self, tag)
-
-    def add_request_finished(self, tag, received, when):
-        # received="error" on error, else len(data)
-        (serverid, index) = tag
-        r = self.requests[serverid][index]
-        (shnum, start, length, sent, _, _) = r
-        r = (shnum, start, length, sent, received, when)
-        self.requests[serverid][index] = r
+    def add_read_event(self, start, length, when):
+        if self.first_timestamp is None:
+            self.first_timestamp = when
+        r = { "start": start,
+              "length": length,
+              "start_time": when,
+              "finish_time": None,
+              "bytes_returned": 0,
+              "decrypt_time": 0,
+              "paused_time": 0,
+              }
+        self.read_events.append(r)
+        return ReadEvent(r, self)
 
     def add_segment_request(self, segnum, when):
-        if self.started is None:
-            self.started = when
-        r = ("request", segnum, when, None, None, None)
-        self.segment_events.append(r)
-    def add_segment_delivery(self, segnum, when, start, length, decodetime):
-        r = ("delivery", segnum, when, start, length, decodetime)
-        self.segment_events.append(r)
-    def add_segment_error(self, segnum, when):
-        r = ("error", segnum, when, None, None, None)
+        if self.first_timestamp is None:
+            self.first_timestamp = when
+        r = { "segment_number": segnum,
+              "start_time": when,
+              "active_time": None,
+              "finish_time": None,
+              "success": None,
+              "decode_time": None,
+              "segment_start": None,
+              "segment_length": None,
+              }
         self.segment_events.append(r)
+        return SegmentEvent(r, self)
 
-    def add_read_event(self, start, length, when):
-        if self.started is None:
-            self.started = when
-        r = (start, length, when, None, 0, 0, 0)
-        self.read_events.append(r)
-        tag = len(self.read_events)-1
-        return ReadEvent(self, tag)
-    def update_read_event(self, tag, bytes_d, decrypt_d, paused_d):
-        r = self.read_events[tag]
-        (start, length, requesttime, finishtime, bytes, decrypt, paused) = r
-        bytes += bytes_d
-        decrypt += decrypt_d
-        paused += paused_d
-        r = (start, length, requesttime, finishtime, bytes, decrypt, paused)
-        self.read_events[tag] = r
-    def finish_read_event(self, tag, finishtime):
-        r = self.read_events[tag]
-        (start, length, requesttime, _, bytes, decrypt, paused) = r
-        r = (start, length, requesttime, finishtime, bytes, decrypt, paused)
-        self.read_events[tag] = r
+    def add_dyhb_request(self, serverid, when):
+        r = { "serverid": serverid,
+              "start_time": when,
+              "success": None,
+              "response_shnums": None,
+              "finish_time": None,
+              }
+        self.dyhb_requests.append(r)
+        return DYHBEvent(r, self)
+
+    def add_block_request(self, serverid, shnum, start, length, when):
+        r = { "serverid": serverid,
+              "shnum": shnum,
+              "start": start,
+              "length": length,
+              "start_time": when,
+              "finish_time": None,
+              "success": None,
+              "response_length": None,
+              }
+        self.block_requests.append(r)
+        return BlockRequestEvent(r, self)
+
+    def update_last_timestamp(self, when):
+        if self.last_timestamp is None or when > self.last_timestamp:
+            self.last_timestamp = when
 
     def add_known_share(self, serverid, shnum):
         self.known_shares.append( (serverid, shnum) )
@@ -160,15 +202,12 @@ class DownloadStatus:
         # mention all outstanding segment requests
         outstanding = set()
         errorful = set()
-        for s_ev in self.segment_events:
-            (etype, segnum, when, segstart, seglen, decodetime) = s_ev
-            if etype == "request":
-                outstanding.add(segnum)
-            elif etype == "delivery":
-                outstanding.remove(segnum)
-            else: # "error"
-                outstanding.remove(segnum)
-                errorful.add(segnum)
+        outstanding = set([s_ev["segment_number"]
+                           for s_ev in self.segment_events
+                           if s_ev["finish_time"] is None])
+        errorful = set([s_ev["segment_number"]
+                        for s_ev in self.segment_events
+                        if s_ev["success"] is False])
         def join(segnums):
             if len(segnums) == 1:
                 return "segment %s" % list(segnums)[0]
@@ -191,10 +230,9 @@ class DownloadStatus:
             return 0.0
         total_outstanding, total_received = 0, 0
         for r_ev in self.read_events:
-            (start, length, ign1, finishtime, bytes, ign2, ign3) = r_ev
-            if finishtime is None:
-                total_outstanding += length
-                total_received += bytes
+            if r_ev["finish_time"] is None:
+                total_outstanding += r_ev["length"]
+                total_received += r_ev["bytes_returned"]
             # else ignore completed requests
         if not total_outstanding:
             return 1.0
@@ -213,6 +251,6 @@ class DownloadStatus:
         return False
 
     def get_started(self):
-        return self.started
+        return self.first_timestamp
     def get_results(self):
         return None # TODO
index 8fc47725ed30445b0ccdaa7749be22808d650489..84d75035a841a36a5b2350aa5692345eddb667a3 100644 (file)
@@ -3,7 +3,7 @@ import binascii
 import copy
 import time
 now = time.time
-from zope.interface import implements, Interface
+from zope.interface import implements
 from twisted.internet import defer
 from twisted.internet.interfaces import IConsumer
 
@@ -16,14 +16,10 @@ from pycryptopp.cipher.aes import AES
 # local imports
 from allmydata.immutable.checker import Checker
 from allmydata.immutable.repairer import Repairer
-from allmydata.immutable.downloader.node import DownloadNode
+from allmydata.immutable.downloader.node import DownloadNode, \
+     IDownloadStatusHandlingConsumer
 from allmydata.immutable.downloader.status import DownloadStatus
 
-class IDownloadStatusHandlingConsumer(Interface):
-    def set_download_status_read_event(read_ev):
-        """Record the DownloadStatus 'read event', to be updated with the
-        time it takes to decrypt each chunk of data."""
-
 class CiphertextFileNode:
     def __init__(self, verifycap, storage_broker, secret_holder,
                  terminator, history):
@@ -55,14 +51,7 @@ class CiphertextFileNode:
         return a Deferred that fires (with the consumer) when the read is
         finished."""
         self._maybe_create_download_node()
-        actual_size = size
-        if actual_size is None:
-            actual_size = self._verifycap.size - offset
-        read_ev = self._download_status.add_read_event(offset, actual_size,
-                                                       now())
-        if IDownloadStatusHandlingConsumer.providedBy(consumer):
-            consumer.set_download_status_read_event(read_ev)
-        return self._node.read(consumer, offset, size, read_ev)
+        return self._node.read(consumer, offset, size)
 
     def get_segment(self, segnum):
         """Begin downloading a segment. I return a tuple (d, c): 'd' is a
@@ -177,7 +166,7 @@ class DecryptingConsumer:
 
     def __init__(self, consumer, readkey, offset):
         self._consumer = consumer
-        self._read_event = None
+        self._read_ev = None
         # TODO: pycryptopp CTR-mode needs random-access operations: I want
         # either a=AES(readkey, offset) or better yet both of:
         #  a=AES(readkey, offset=0)
@@ -190,7 +179,7 @@ class DecryptingConsumer:
         self._decryptor.process("\x00"*offset_small)
 
     def set_download_status_read_event(self, read_ev):
-        self._read_event = read_ev
+        self._read_ev = read_ev
 
     def registerProducer(self, producer, streaming):
         # this passes through, so the real consumer can flow-control the real
@@ -203,9 +192,9 @@ class DecryptingConsumer:
     def write(self, ciphertext):
         started = now()
         plaintext = self._decryptor.process(ciphertext)
-        if self._read_event:
+        if self._read_ev:
             elapsed = now() - started
-            self._read_event.update(0, elapsed, 0)
+            self._read_ev.update(0, elapsed, 0)
         self._consumer.write(plaintext)
 
 class ImmutableFileNode:
index 91ff8ada52955043c10cea288b3143ddf6e8ce7a..f6154cb500371ce697f83216a181b4ed0d017d24 100644 (file)
@@ -1217,14 +1217,16 @@ class Status(unittest.TestCase):
         now = 12345.1
         ds = DownloadStatus("si-1", 123)
         self.failUnlessEqual(ds.get_status(), "idle")
-        ds.add_segment_request(0, now)
+        ev0 = ds.add_segment_request(0, now)
         self.failUnlessEqual(ds.get_status(), "fetching segment 0")
-        ds.add_segment_delivery(0, now+1, 0, 1000, 2.0)
+        ev0.activate(now+0.5)
+        ev0.deliver(now+1, 0, 1000, 2.0)
         self.failUnlessEqual(ds.get_status(), "idle")
-        ds.add_segment_request(2, now+2)
-        ds.add_segment_request(1, now+2)
+        ev2 = ds.add_segment_request(2, now+2)
+        del ev2 # hush pyflakes
+        ev1 = ds.add_segment_request(1, now+2)
         self.failUnlessEqual(ds.get_status(), "fetching segments 1,2")
-        ds.add_segment_error(1, now+3)
+        ev1.error(now+3)
         self.failUnlessEqual(ds.get_status(),
                              "fetching segment 2; errors on segment 1")