4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.internet.interfaces import IPushProducer
7 from foolscap.api import eventually
8 from allmydata.util import log
9 from allmydata.util.spans import overlap
11 from common import BadSegmentNumberError, WrongSegmentError, DownloadStopped
14 """I am responsible for a single offset+size read of the file. I handle
15 segmentation: I figure out which segments are necessary, request them
16 (from my CiphertextDownloader) in order, and trim the segments down to
17 match the offset+size span. I use the Producer/Consumer interface to only
18 request one segment at a time.
20 implements(IPushProducer)
21 def __init__(self, node, offset, size, consumer, read_ev, logparent=None):
24 self._active_segnum = None
25 self._cancel_segment_request = None
26 # these are updated as we deliver data. At any given time, we still
27 # want to download file[offset:offset+size]
30 assert offset+size <= node._verifycap.size
31 self._consumer = consumer
32 self._read_ev = read_ev
33 self._start_pause = None
38 self._deferred = defer.Deferred()
39 self._consumer.registerProducer(self, True)
40 self._maybe_fetch_next()
43 def _maybe_fetch_next(self):
44 if not self._alive or not self._hungry:
46 if self._active_segnum is not None:
50 def _fetch_next(self):
55 self._consumer.unregisterProducer()
56 self._deferred.callback(self._consumer)
59 have_actual_segment_size = n.segment_size is not None
61 if not have_actual_segment_size:
63 segment_size = n.segment_size or n.guessed_segment_size
65 # great! we want segment0 for sure
68 # this might be a guess
69 wanted_segnum = self._offset // segment_size
70 log.msg(format="_fetch_next(offset=%(offset)d) %(guess)swants segnum=%(segnum)d",
71 offset=self._offset, guess=guess_s, segnum=wanted_segnum,
72 level=log.NOISY, parent=self._lp, umid="5WfN0w")
73 self._active_segnum = wanted_segnum
74 d,c = n.get_segment(wanted_segnum, self._lp)
75 self._cancel_segment_request = c
76 d.addBoth(self._request_retired)
77 d.addCallback(self._got_segment, wanted_segnum)
78 if not have_actual_segment_size:
80 d.addErrback(self._retry_bad_segment)
81 d.addErrback(self._error)
83 def _request_retired(self, res):
84 self._active_segnum = None
85 self._cancel_segment_request = None
88 def _got_segment(self, (segment_start,segment,decodetime), wanted_segnum):
89 self._cancel_segment_request = None
90 # we got file[segment_start:segment_start+len(segment)]
91 # we want file[self._offset:self._offset+self._size]
92 log.msg(format="Segmentation got data:"
93 " want [%(wantstart)d-%(wantend)d),"
94 " given [%(segstart)d-%(segend)d), for segnum=%(segnum)d",
95 wantstart=self._offset, wantend=self._offset+self._size,
96 segstart=segment_start, segend=segment_start+len(segment),
98 level=log.OPERATIONAL, parent=self._lp, umid="32dHcg")
100 o = overlap(segment_start, len(segment), self._offset, self._size)
101 # the overlap is file[o[0]:o[0]+o[1]]
102 if not o or o[0] != self._offset:
103 # we didn't get the first byte, so we can't use this segment
104 log.msg("Segmentation handed wrong data:"
105 " want [%d-%d), given [%d-%d), for segnum=%d,"
107 % (self._offset, self._offset+self._size,
108 segment_start, segment_start+len(segment),
109 wanted_segnum, self._node._si_prefix),
110 level=log.UNUSUAL, parent=self._lp, umid="STlIiA")
111 # we may retry if the segnum we asked was based on a guess
112 raise WrongSegmentError("I was given the wrong data.")
113 offset_in_segment = self._offset - segment_start
114 desired_data = segment[offset_in_segment:offset_in_segment+o[1]]
116 self._offset += len(desired_data)
117 self._size -= len(desired_data)
118 self._consumer.write(desired_data)
119 # the consumer might call our .pauseProducing() inside that write()
120 # call, setting self._hungry=False
121 self._read_ev.update(len(desired_data), 0, 0)
122 self._maybe_fetch_next()
124 def _retry_bad_segment(self, f):
125 f.trap(WrongSegmentError, BadSegmentNumberError)
126 # we guessed the segnum wrong: either one that doesn't overlap with
127 # the start of our desired region, or one that's beyond the end of
128 # the world. Now that we have the right information, we're allowed to
130 assert self._node.segment_size is not None
131 return self._maybe_fetch_next()
134 log.msg("Error in Segmentation", failure=f,
135 level=log.WEIRD, parent=self._lp, umid="EYlXBg")
138 self._consumer.unregisterProducer()
139 self._deferred.errback(f)
141 def stopProducing(self):
144 # cancel any outstanding segment request
145 if self._cancel_segment_request:
146 self._cancel_segment_request.cancel()
147 self._cancel_segment_request = None
148 e = DownloadStopped("our Consumer called stopProducing()")
149 self._deferred.errback(e)
151 def pauseProducing(self):
153 self._start_pause = now()
154 def resumeProducing(self):
156 eventually(self._maybe_fetch_next)
157 if self._start_pause is not None:
158 paused = now() - self._start_pause
159 self._read_ev.update(0, 0, paused)
160 self._start_pause = None