]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/downloader/segmentation.py
84dddbe98fb5d6a3d30f72d363e96f51dfbed955
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / downloader / segmentation.py
1
2 import time
3 now = time.time
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.internet.interfaces import IPushProducer
7 from foolscap.api import eventually
8 from allmydata.util import log
9 from allmydata.util.spans import overlap
10
11 from common import BadSegmentNumberError, WrongSegmentError, DownloadStopped
12
13 class Segmentation:
14     """I am responsible for a single offset+size read of the file. I handle
15     segmentation: I figure out which segments are necessary, request them
16     (from my CiphertextDownloader) in order, and trim the segments down to
17     match the offset+size span. I use the Producer/Consumer interface to only
18     request one segment at a time.
19     """
20     implements(IPushProducer)
21     def __init__(self, node, offset, size, consumer, read_ev, logparent=None):
22         self._node = node
23         self._hungry = True
24         self._active_segnum = None
25         self._cancel_segment_request = None
26         # these are updated as we deliver data. At any given time, we still
27         # want to download file[offset:offset+size]
28         self._offset = offset
29         self._size = size
30         assert offset+size <= node._verifycap.size
31         self._consumer = consumer
32         self._read_ev = read_ev
33         self._start_pause = None
34         self._lp = logparent
35
36     def start(self):
37         self._alive = True
38         self._deferred = defer.Deferred()
39         self._deferred.addBoth(self._done)
40         self._consumer.registerProducer(self, True)
41         self._maybe_fetch_next()
42         return self._deferred
43
44     def _done(self, res):
45         self._consumer.unregisterProducer()
46         return res
47
48     def _maybe_fetch_next(self):
49         if not self._alive or not self._hungry:
50             return
51         if self._active_segnum is not None:
52             return
53         self._fetch_next()
54
55     def _fetch_next(self):
56         if self._size == 0:
57             # done!
58             self._alive = False
59             self._hungry = False
60             self._deferred.callback(self._consumer)
61             return
62         n = self._node
63         have_actual_segment_size = n.segment_size is not None
64         guess_s = ""
65         if not have_actual_segment_size:
66             guess_s = "probably "
67         segment_size = n.segment_size or n.guessed_segment_size
68         if self._offset == 0:
69             # great! we want segment0 for sure
70             wanted_segnum = 0
71         else:
72             # this might be a guess
73             wanted_segnum = self._offset // segment_size
74         log.msg(format="_fetch_next(offset=%(offset)d) %(guess)swants segnum=%(segnum)d",
75                 offset=self._offset, guess=guess_s, segnum=wanted_segnum,
76                 level=log.NOISY, parent=self._lp, umid="5WfN0w")
77         self._active_segnum = wanted_segnum
78         d,c = n.get_segment(wanted_segnum, self._lp)
79         self._cancel_segment_request = c
80         d.addBoth(self._request_retired)
81         d.addCallback(self._got_segment, wanted_segnum)
82         if not have_actual_segment_size:
83             # we can retry once
84             d.addErrback(self._retry_bad_segment)
85         d.addErrback(self._error)
86
87     def _request_retired(self, res):
88         self._active_segnum = None
89         self._cancel_segment_request = None
90         return res
91
92     def _got_segment(self, (segment_start,segment,decodetime), wanted_segnum):
93         self._cancel_segment_request = None
94         # we got file[segment_start:segment_start+len(segment)]
95         # we want file[self._offset:self._offset+self._size]
96         log.msg(format="Segmentation got data:"
97                 " want [%(wantstart)d-%(wantend)d),"
98                 " given [%(segstart)d-%(segend)d), for segnum=%(segnum)d",
99                 wantstart=self._offset, wantend=self._offset+self._size,
100                 segstart=segment_start, segend=segment_start+len(segment),
101                 segnum=wanted_segnum,
102                 level=log.OPERATIONAL, parent=self._lp, umid="32dHcg")
103
104         o = overlap(segment_start, len(segment),  self._offset, self._size)
105         # the overlap is file[o[0]:o[0]+o[1]]
106         if not o or o[0] != self._offset:
107             # we didn't get the first byte, so we can't use this segment
108             log.msg("Segmentation handed wrong data:"
109                     " want [%d-%d), given [%d-%d), for segnum=%d,"
110                     " for si=%s"
111                     % (self._offset, self._offset+self._size,
112                        segment_start, segment_start+len(segment),
113                        wanted_segnum, self._node._si_prefix),
114                     level=log.UNUSUAL, parent=self._lp, umid="STlIiA")
115             # we may retry if the segnum we asked was based on a guess
116             raise WrongSegmentError("I was given the wrong data.")
117         offset_in_segment = self._offset - segment_start
118         desired_data = segment[offset_in_segment:offset_in_segment+o[1]]
119
120         self._offset += len(desired_data)
121         self._size -= len(desired_data)
122         self._consumer.write(desired_data)
123         # the consumer might call our .pauseProducing() inside that write()
124         # call, setting self._hungry=False
125         self._read_ev.update(len(desired_data), 0, 0)
126         # note: filenode.DecryptingConsumer is responsible for calling
127         # _read_ev.update with how much decrypt_time was consumed
128         self._maybe_fetch_next()
129
130     def _retry_bad_segment(self, f):
131         f.trap(WrongSegmentError, BadSegmentNumberError)
132         # we guessed the segnum wrong: either one that doesn't overlap with
133         # the start of our desired region, or one that's beyond the end of
134         # the world. Now that we have the right information, we're allowed to
135         # retry once.
136         assert self._node.segment_size is not None
137         return self._maybe_fetch_next()
138
139     def _error(self, f):
140         log.msg("Error in Segmentation", failure=f,
141                 level=log.WEIRD, parent=self._lp, umid="EYlXBg")
142         self._alive = False
143         self._hungry = False
144         self._deferred.errback(f)
145
146     def stopProducing(self):
147         log.msg("asked to stopProducing",
148                 level=log.NOISY, parent=self._lp, umid="XIyL9w")
149         self._hungry = False
150         self._alive = False
151         # cancel any outstanding segment request
152         if self._cancel_segment_request:
153             self._cancel_segment_request.cancel()
154             self._cancel_segment_request = None
155         e = DownloadStopped("our Consumer called stopProducing()")
156         self._deferred.errback(e)
157
158     def pauseProducing(self):
159         self._hungry = False
160         self._start_pause = now()
161     def resumeProducing(self):
162         self._hungry = True
163         eventually(self._maybe_fetch_next)
164         if self._start_pause is not None:
165             paused = now() - self._start_pause
166             self._read_ev.update(0, 0, paused)
167             self._start_pause = None