4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.internet.interfaces import IPushProducer
7 from foolscap.api import eventually
8 from allmydata.util import log
9 from allmydata.util.spans import overlap
11 from common import BadSegmentNumberError, WrongSegmentError, DownloadStopped
14 """I am responsible for a single offset+size read of the file. I handle
15 segmentation: I figure out which segments are necessary, request them
16 (from my CiphertextDownloader) in order, and trim the segments down to
17 match the offset+size span. I use the Producer/Consumer interface to only
18 request one segment at a time.
20 implements(IPushProducer)
21 def __init__(self, node, offset, size, consumer, read_ev, logparent=None):
24 self._active_segnum = None
25 self._cancel_segment_request = None
26 # these are updated as we deliver data. At any given time, we still
27 # want to download file[offset:offset+size]
30 assert offset+size <= node._verifycap.size
31 self._consumer = consumer
32 self._read_ev = read_ev
33 self._start_pause = None
38 self._deferred = defer.Deferred()
39 self._deferred.addBoth(self._done)
40 self._consumer.registerProducer(self, True)
41 self._maybe_fetch_next()
45 self._consumer.unregisterProducer()
48 def _maybe_fetch_next(self):
49 if not self._alive or not self._hungry:
51 if self._active_segnum is not None:
55 def _fetch_next(self):
60 self._deferred.callback(self._consumer)
63 have_actual_segment_size = n.segment_size is not None
65 if not have_actual_segment_size:
67 segment_size = n.segment_size or n.guessed_segment_size
69 # great! we want segment0 for sure
72 # this might be a guess
73 wanted_segnum = self._offset // segment_size
74 log.msg(format="_fetch_next(offset=%(offset)d) %(guess)swants segnum=%(segnum)d",
75 offset=self._offset, guess=guess_s, segnum=wanted_segnum,
76 level=log.NOISY, parent=self._lp, umid="5WfN0w")
77 self._active_segnum = wanted_segnum
78 d,c = n.get_segment(wanted_segnum, self._lp)
79 self._cancel_segment_request = c
80 d.addBoth(self._request_retired)
81 d.addCallback(self._got_segment, wanted_segnum)
82 if not have_actual_segment_size:
84 d.addErrback(self._retry_bad_segment)
85 d.addErrback(self._error)
87 def _request_retired(self, res):
88 self._active_segnum = None
89 self._cancel_segment_request = None
92 def _got_segment(self, (segment_start,segment,decodetime), wanted_segnum):
93 self._cancel_segment_request = None
94 # we got file[segment_start:segment_start+len(segment)]
95 # we want file[self._offset:self._offset+self._size]
96 log.msg(format="Segmentation got data:"
97 " want [%(wantstart)d-%(wantend)d),"
98 " given [%(segstart)d-%(segend)d), for segnum=%(segnum)d",
99 wantstart=self._offset, wantend=self._offset+self._size,
100 segstart=segment_start, segend=segment_start+len(segment),
101 segnum=wanted_segnum,
102 level=log.OPERATIONAL, parent=self._lp, umid="32dHcg")
104 o = overlap(segment_start, len(segment), self._offset, self._size)
105 # the overlap is file[o[0]:o[0]+o[1]]
106 if not o or o[0] != self._offset:
107 # we didn't get the first byte, so we can't use this segment
108 log.msg("Segmentation handed wrong data:"
109 " want [%d-%d), given [%d-%d), for segnum=%d,"
111 % (self._offset, self._offset+self._size,
112 segment_start, segment_start+len(segment),
113 wanted_segnum, self._node._si_prefix),
114 level=log.UNUSUAL, parent=self._lp, umid="STlIiA")
115 # we may retry if the segnum we asked was based on a guess
116 raise WrongSegmentError("I was given the wrong data.")
117 offset_in_segment = self._offset - segment_start
118 desired_data = segment[offset_in_segment:offset_in_segment+o[1]]
120 self._offset += len(desired_data)
121 self._size -= len(desired_data)
122 self._consumer.write(desired_data)
123 # the consumer might call our .pauseProducing() inside that write()
124 # call, setting self._hungry=False
125 self._read_ev.update(len(desired_data), 0, 0)
126 # note: filenode.DecryptingConsumer is responsible for calling
127 # _read_ev.update with how much decrypt_time was consumed
128 self._maybe_fetch_next()
130 def _retry_bad_segment(self, f):
131 f.trap(WrongSegmentError, BadSegmentNumberError)
132 # we guessed the segnum wrong: either one that doesn't overlap with
133 # the start of our desired region, or one that's beyond the end of
134 # the world. Now that we have the right information, we're allowed to
136 assert self._node.segment_size is not None
137 return self._maybe_fetch_next()
140 log.msg("Error in Segmentation", failure=f,
141 level=log.WEIRD, parent=self._lp, umid="EYlXBg")
144 self._deferred.errback(f)
146 def stopProducing(self):
147 log.msg("asked to stopProducing",
148 level=log.NOISY, parent=self._lp, umid="XIyL9w")
151 # cancel any outstanding segment request
152 if self._cancel_segment_request:
153 self._cancel_segment_request.cancel()
154 self._cancel_segment_request = None
155 e = DownloadStopped("our Consumer called stopProducing()")
156 self._deferred.errback(e)
158 def pauseProducing(self):
160 self._start_pause = now()
161 def resumeProducing(self):
163 eventually(self._maybe_fetch_next)
164 if self._start_pause is not None:
165 paused = now() - self._start_pause
166 self._read_ev.update(0, 0, paused)
167 self._start_pause = None