]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/downloader/segmentation.py
Rewrite immutable downloader (#798). This patch adds the new downloader itself.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / downloader / segmentation.py
1
2 import time
3 now = time.time
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.internet.interfaces import IPushProducer
7 from foolscap.api import eventually
8 from allmydata.util import log
9 from allmydata.util.spans import overlap
10
11 from common import BadSegmentNumberError, WrongSegmentError, DownloadStopped
12
13 class Segmentation:
14     """I am responsible for a single offset+size read of the file. I handle
15     segmentation: I figure out which segments are necessary, request them
16     (from my CiphertextDownloader) in order, and trim the segments down to
17     match the offset+size span. I use the Producer/Consumer interface to only
18     request one segment at a time.
19     """
20     implements(IPushProducer)
21     def __init__(self, node, offset, size, consumer, read_ev, logparent=None):
22         self._node = node
23         self._hungry = True
24         self._active_segnum = None
25         self._cancel_segment_request = None
26         # these are updated as we deliver data. At any given time, we still
27         # want to download file[offset:offset+size]
28         self._offset = offset
29         self._size = size
30         assert offset+size <= node._verifycap.size
31         self._consumer = consumer
32         self._read_ev = read_ev
33         self._start_pause = None
34         self._lp = logparent
35
36     def start(self):
37         self._alive = True
38         self._deferred = defer.Deferred()
39         self._consumer.registerProducer(self, True)
40         self._maybe_fetch_next()
41         return self._deferred
42
43     def _maybe_fetch_next(self):
44         if not self._alive or not self._hungry:
45             return
46         if self._active_segnum is not None:
47             return
48         self._fetch_next()
49
50     def _fetch_next(self):
51         if self._size == 0:
52             # done!
53             self._alive = False
54             self._hungry = False
55             self._consumer.unregisterProducer()
56             self._deferred.callback(self._consumer)
57             return
58         n = self._node
59         have_actual_segment_size = n.segment_size is not None
60         guess_s = ""
61         if not have_actual_segment_size:
62             guess_s = "probably "
63         segment_size = n.segment_size or n.guessed_segment_size
64         if self._offset == 0:
65             # great! we want segment0 for sure
66             wanted_segnum = 0
67         else:
68             # this might be a guess
69             wanted_segnum = self._offset // segment_size
70         log.msg(format="_fetch_next(offset=%(offset)d) %(guess)swants segnum=%(segnum)d",
71                 offset=self._offset, guess=guess_s, segnum=wanted_segnum,
72                 level=log.NOISY, parent=self._lp, umid="5WfN0w")
73         self._active_segnum = wanted_segnum
74         d,c = n.get_segment(wanted_segnum, self._lp)
75         self._cancel_segment_request = c
76         d.addBoth(self._request_retired)
77         d.addCallback(self._got_segment, wanted_segnum)
78         if not have_actual_segment_size:
79             # we can retry once
80             d.addErrback(self._retry_bad_segment)
81         d.addErrback(self._error)
82
83     def _request_retired(self, res):
84         self._active_segnum = None
85         self._cancel_segment_request = None
86         return res
87
88     def _got_segment(self, (segment_start,segment,decodetime), wanted_segnum):
89         self._cancel_segment_request = None
90         # we got file[segment_start:segment_start+len(segment)]
91         # we want file[self._offset:self._offset+self._size]
92         log.msg(format="Segmentation got data:"
93                 " want [%(wantstart)d-%(wantend)d),"
94                 " given [%(segstart)d-%(segend)d), for segnum=%(segnum)d",
95                 wantstart=self._offset, wantend=self._offset+self._size,
96                 segstart=segment_start, segend=segment_start+len(segment),
97                 segnum=wanted_segnum,
98                 level=log.OPERATIONAL, parent=self._lp, umid="32dHcg")
99
100         o = overlap(segment_start, len(segment),  self._offset, self._size)
101         # the overlap is file[o[0]:o[0]+o[1]]
102         if not o or o[0] != self._offset:
103             # we didn't get the first byte, so we can't use this segment
104             log.msg("Segmentation handed wrong data:"
105                     " want [%d-%d), given [%d-%d), for segnum=%d,"
106                     " for si=%s"
107                     % (self._offset, self._offset+self._size,
108                        segment_start, segment_start+len(segment),
109                        wanted_segnum, self._node._si_prefix),
110                     level=log.UNUSUAL, parent=self._lp, umid="STlIiA")
111             # we may retry if the segnum we asked was based on a guess
112             raise WrongSegmentError("I was given the wrong data.")
113         offset_in_segment = self._offset - segment_start
114         desired_data = segment[offset_in_segment:offset_in_segment+o[1]]
115
116         self._offset += len(desired_data)
117         self._size -= len(desired_data)
118         self._consumer.write(desired_data)
119         # the consumer might call our .pauseProducing() inside that write()
120         # call, setting self._hungry=False
121         self._read_ev.update(len(desired_data), 0, 0)
122         self._maybe_fetch_next()
123
124     def _retry_bad_segment(self, f):
125         f.trap(WrongSegmentError, BadSegmentNumberError)
126         # we guessed the segnum wrong: either one that doesn't overlap with
127         # the start of our desired region, or one that's beyond the end of
128         # the world. Now that we have the right information, we're allowed to
129         # retry once.
130         assert self._node.segment_size is not None
131         return self._maybe_fetch_next()
132
133     def _error(self, f):
134         log.msg("Error in Segmentation", failure=f,
135                 level=log.WEIRD, parent=self._lp, umid="EYlXBg")
136         self._alive = False
137         self._hungry = False
138         self._consumer.unregisterProducer()
139         self._deferred.errback(f)
140
141     def stopProducing(self):
142         self._hungry = False
143         self._alive = False
144         # cancel any outstanding segment request
145         if self._cancel_segment_request:
146             self._cancel_segment_request.cancel()
147             self._cancel_segment_request = None
148         e = DownloadStopped("our Consumer called stopProducing()")
149         self._deferred.errback(e)
150
151     def pauseProducing(self):
152         self._hungry = False
153         self._start_pause = now()
154     def resumeProducing(self):
155         self._hungry = True
156         eventually(self._maybe_fetch_next)
157         if self._start_pause is not None:
158             paused = now() - self._start_pause
159             self._read_ev.update(0, 0, paused)
160             self._start_pause = None