4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.internet.interfaces import IPushProducer
7 from foolscap.api import eventually
8 from allmydata.util import log
9 from allmydata.util.spans import overlap
10 from allmydata.interfaces import DownloadStopped
12 from common import BadSegmentNumberError, WrongSegmentError
15 """I am responsible for a single offset+size read of the file. I handle
16 segmentation: I figure out which segments are necessary, request them
17 (from my CiphertextDownloader) in order, and trim the segments down to
18 match the offset+size span. I use the Producer/Consumer interface to only
19 request one segment at a time.
21 implements(IPushProducer)
22 def __init__(self, node, offset, size, consumer, read_ev, logparent=None):
25 self._active_segnum = None
26 self._cancel_segment_request = None
27 # these are updated as we deliver data. At any given time, we still
28 # want to download file[offset:offset+size]
31 assert offset+size <= node._verifycap.size
32 self._consumer = consumer
33 self._read_ev = read_ev
34 self._start_pause = None
39 self._deferred = defer.Deferred()
40 self._deferred.addBoth(self._done)
41 self._consumer.registerProducer(self, True)
42 self._maybe_fetch_next()
46 self._consumer.unregisterProducer()
49 def _maybe_fetch_next(self):
50 if not self._alive or not self._hungry:
52 if self._active_segnum is not None:
56 def _fetch_next(self):
61 self._deferred.callback(self._consumer)
64 have_actual_segment_size = n.segment_size is not None
66 if not have_actual_segment_size:
68 segment_size = n.segment_size or n.guessed_segment_size
70 # great! we want segment0 for sure
73 # this might be a guess
74 wanted_segnum = self._offset // segment_size
75 log.msg(format="_fetch_next(offset=%(offset)d) %(guess)swants segnum=%(segnum)d",
76 offset=self._offset, guess=guess_s, segnum=wanted_segnum,
77 level=log.NOISY, parent=self._lp, umid="5WfN0w")
78 self._active_segnum = wanted_segnum
79 d,c = n.get_segment(wanted_segnum, self._lp)
80 self._cancel_segment_request = c
81 d.addBoth(self._request_retired)
82 d.addCallback(self._got_segment, wanted_segnum)
83 if not have_actual_segment_size:
85 d.addErrback(self._retry_bad_segment)
86 d.addErrback(self._error)
88 def _request_retired(self, res):
89 self._active_segnum = None
90 self._cancel_segment_request = None
93 def _got_segment(self, (segment_start,segment,decodetime), wanted_segnum):
94 self._cancel_segment_request = None
95 # we got file[segment_start:segment_start+len(segment)]
96 # we want file[self._offset:self._offset+self._size]
97 log.msg(format="Segmentation got data:"
98 " want [%(wantstart)d-%(wantend)d),"
99 " given [%(segstart)d-%(segend)d), for segnum=%(segnum)d",
100 wantstart=self._offset, wantend=self._offset+self._size,
101 segstart=segment_start, segend=segment_start+len(segment),
102 segnum=wanted_segnum,
103 level=log.OPERATIONAL, parent=self._lp, umid="32dHcg")
105 o = overlap(segment_start, len(segment), self._offset, self._size)
106 # the overlap is file[o[0]:o[0]+o[1]]
107 if not o or o[0] != self._offset:
108 # we didn't get the first byte, so we can't use this segment
109 log.msg("Segmentation handed wrong data:"
110 " want [%d-%d), given [%d-%d), for segnum=%d,"
112 % (self._offset, self._offset+self._size,
113 segment_start, segment_start+len(segment),
114 wanted_segnum, self._node._si_prefix),
115 level=log.UNUSUAL, parent=self._lp, umid="STlIiA")
116 # we may retry if the segnum we asked was based on a guess
117 raise WrongSegmentError("I was given the wrong data.")
118 offset_in_segment = self._offset - segment_start
119 desired_data = segment[offset_in_segment:offset_in_segment+o[1]]
121 self._offset += len(desired_data)
122 self._size -= len(desired_data)
123 self._consumer.write(desired_data)
124 # the consumer might call our .pauseProducing() inside that write()
125 # call, setting self._hungry=False
126 self._read_ev.update(len(desired_data), 0, 0)
127 # note: filenode.DecryptingConsumer is responsible for calling
128 # _read_ev.update with how much decrypt_time was consumed
129 self._maybe_fetch_next()
131 def _retry_bad_segment(self, f):
132 f.trap(WrongSegmentError, BadSegmentNumberError)
133 # we guessed the segnum wrong: either one that doesn't overlap with
134 # the start of our desired region, or one that's beyond the end of
135 # the world. Now that we have the right information, we're allowed to
137 assert self._node.segment_size is not None
138 return self._maybe_fetch_next()
141 log.msg("Error in Segmentation", failure=f,
142 level=log.WEIRD, parent=self._lp, umid="EYlXBg")
145 self._deferred.errback(f)
147 def stopProducing(self):
148 log.msg("asked to stopProducing",
149 level=log.NOISY, parent=self._lp, umid="XIyL9w")
152 # cancel any outstanding segment request
153 if self._cancel_segment_request:
154 self._cancel_segment_request.cancel()
155 self._cancel_segment_request = None
156 e = DownloadStopped("our Consumer called stopProducing()")
157 self._deferred.errback(e)
159 def pauseProducing(self):
161 self._start_pause = now()
162 def resumeProducing(self):
164 eventually(self._maybe_fetch_next)
165 if self._start_pause is not None:
166 paused = now() - self._start_pause
167 self._read_ev.update(0, 0, paused)
168 self._start_pause = None