]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/downloader/segmentation.py
move DownloadStopped from download.common to interfaces
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / downloader / segmentation.py
1
2 import time
3 now = time.time
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.internet.interfaces import IPushProducer
7 from foolscap.api import eventually
8 from allmydata.util import log
9 from allmydata.util.spans import overlap
10 from allmydata.interfaces import DownloadStopped
11
12 from common import BadSegmentNumberError, WrongSegmentError
13
14 class Segmentation:
15     """I am responsible for a single offset+size read of the file. I handle
16     segmentation: I figure out which segments are necessary, request them
17     (from my CiphertextDownloader) in order, and trim the segments down to
18     match the offset+size span. I use the Producer/Consumer interface to only
19     request one segment at a time.
20     """
21     implements(IPushProducer)
22     def __init__(self, node, offset, size, consumer, read_ev, logparent=None):
23         self._node = node
24         self._hungry = True
25         self._active_segnum = None
26         self._cancel_segment_request = None
27         # these are updated as we deliver data. At any given time, we still
28         # want to download file[offset:offset+size]
29         self._offset = offset
30         self._size = size
31         assert offset+size <= node._verifycap.size
32         self._consumer = consumer
33         self._read_ev = read_ev
34         self._start_pause = None
35         self._lp = logparent
36
37     def start(self):
38         self._alive = True
39         self._deferred = defer.Deferred()
40         self._deferred.addBoth(self._done)
41         self._consumer.registerProducer(self, True)
42         self._maybe_fetch_next()
43         return self._deferred
44
45     def _done(self, res):
46         self._consumer.unregisterProducer()
47         return res
48
49     def _maybe_fetch_next(self):
50         if not self._alive or not self._hungry:
51             return
52         if self._active_segnum is not None:
53             return
54         self._fetch_next()
55
56     def _fetch_next(self):
57         if self._size == 0:
58             # done!
59             self._alive = False
60             self._hungry = False
61             self._deferred.callback(self._consumer)
62             return
63         n = self._node
64         have_actual_segment_size = n.segment_size is not None
65         guess_s = ""
66         if not have_actual_segment_size:
67             guess_s = "probably "
68         segment_size = n.segment_size or n.guessed_segment_size
69         if self._offset == 0:
70             # great! we want segment0 for sure
71             wanted_segnum = 0
72         else:
73             # this might be a guess
74             wanted_segnum = self._offset // segment_size
75         log.msg(format="_fetch_next(offset=%(offset)d) %(guess)swants segnum=%(segnum)d",
76                 offset=self._offset, guess=guess_s, segnum=wanted_segnum,
77                 level=log.NOISY, parent=self._lp, umid="5WfN0w")
78         self._active_segnum = wanted_segnum
79         d,c = n.get_segment(wanted_segnum, self._lp)
80         self._cancel_segment_request = c
81         d.addBoth(self._request_retired)
82         d.addCallback(self._got_segment, wanted_segnum)
83         if not have_actual_segment_size:
84             # we can retry once
85             d.addErrback(self._retry_bad_segment)
86         d.addErrback(self._error)
87
88     def _request_retired(self, res):
89         self._active_segnum = None
90         self._cancel_segment_request = None
91         return res
92
93     def _got_segment(self, (segment_start,segment,decodetime), wanted_segnum):
94         self._cancel_segment_request = None
95         # we got file[segment_start:segment_start+len(segment)]
96         # we want file[self._offset:self._offset+self._size]
97         log.msg(format="Segmentation got data:"
98                 " want [%(wantstart)d-%(wantend)d),"
99                 " given [%(segstart)d-%(segend)d), for segnum=%(segnum)d",
100                 wantstart=self._offset, wantend=self._offset+self._size,
101                 segstart=segment_start, segend=segment_start+len(segment),
102                 segnum=wanted_segnum,
103                 level=log.OPERATIONAL, parent=self._lp, umid="32dHcg")
104
105         o = overlap(segment_start, len(segment),  self._offset, self._size)
106         # the overlap is file[o[0]:o[0]+o[1]]
107         if not o or o[0] != self._offset:
108             # we didn't get the first byte, so we can't use this segment
109             log.msg("Segmentation handed wrong data:"
110                     " want [%d-%d), given [%d-%d), for segnum=%d,"
111                     " for si=%s"
112                     % (self._offset, self._offset+self._size,
113                        segment_start, segment_start+len(segment),
114                        wanted_segnum, self._node._si_prefix),
115                     level=log.UNUSUAL, parent=self._lp, umid="STlIiA")
116             # we may retry if the segnum we asked was based on a guess
117             raise WrongSegmentError("I was given the wrong data.")
118         offset_in_segment = self._offset - segment_start
119         desired_data = segment[offset_in_segment:offset_in_segment+o[1]]
120
121         self._offset += len(desired_data)
122         self._size -= len(desired_data)
123         self._consumer.write(desired_data)
124         # the consumer might call our .pauseProducing() inside that write()
125         # call, setting self._hungry=False
126         self._read_ev.update(len(desired_data), 0, 0)
127         # note: filenode.DecryptingConsumer is responsible for calling
128         # _read_ev.update with how much decrypt_time was consumed
129         self._maybe_fetch_next()
130
131     def _retry_bad_segment(self, f):
132         f.trap(WrongSegmentError, BadSegmentNumberError)
133         # we guessed the segnum wrong: either one that doesn't overlap with
134         # the start of our desired region, or one that's beyond the end of
135         # the world. Now that we have the right information, we're allowed to
136         # retry once.
137         assert self._node.segment_size is not None
138         return self._maybe_fetch_next()
139
140     def _error(self, f):
141         log.msg("Error in Segmentation", failure=f,
142                 level=log.WEIRD, parent=self._lp, umid="EYlXBg")
143         self._alive = False
144         self._hungry = False
145         self._deferred.errback(f)
146
147     def stopProducing(self):
148         log.msg("asked to stopProducing",
149                 level=log.NOISY, parent=self._lp, umid="XIyL9w")
150         self._hungry = False
151         self._alive = False
152         # cancel any outstanding segment request
153         if self._cancel_segment_request:
154             self._cancel_segment_request.cancel()
155             self._cancel_segment_request = None
156         e = DownloadStopped("our Consumer called stopProducing()")
157         self._deferred.errback(e)
158
159     def pauseProducing(self):
160         self._hungry = False
161         self._start_pause = now()
162     def resumeProducing(self):
163         self._hungry = True
164         eventually(self._maybe_fetch_next)
165         if self._start_pause is not None:
166             paused = now() - self._start_pause
167             self._read_ev.update(0, 0, paused)
168             self._start_pause = None