]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/downloader/status.py
0bf3f715e664b27349296025da1c946b7c8b994c
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / downloader / status.py
1
2 import itertools
3 from zope.interface import implements
4 from allmydata.interfaces import IDownloadStatus
5
6 class ReadEvent:
7     def __init__(self, ev, ds):
8         self._ev = ev
9         self._ds = ds
10     def update(self, bytes, decrypttime, pausetime):
11         self._ev["bytes_returned"] += bytes
12         self._ev["decrypt_time"] += decrypttime
13         self._ev["paused_time"] += pausetime
14     def finished(self, finishtime):
15         self._ev["finish_time"] = finishtime
16         self._ds.update_last_timestamp(finishtime)
17
18 class SegmentEvent:
19     def __init__(self, ev, ds):
20         self._ev = ev
21         self._ds = ds
22     def activate(self, when):
23         if self._ev["active_time"] is None:
24             self._ev["active_time"] = when
25     def deliver(self, when, start, length, decodetime):
26         assert self._ev["active_time"] is not None
27         self._ev["finish_time"] = when
28         self._ev["success"] = True
29         self._ev["decode_time"] = decodetime
30         self._ev["segment_start"] = start
31         self._ev["segment_length"] = length
32         self._ds.update_last_timestamp(when)
33     def error(self, when):
34         self._ev["finish_time"] = when
35         self._ev["success"] = False
36         self._ds.update_last_timestamp(when)
37
38 class DYHBEvent:
39     def __init__(self, ev, ds):
40         self._ev = ev
41         self._ds = ds
42     def error(self, when):
43         self._ev["finish_time"] = when
44         self._ev["success"] = False
45         self._ds.update_last_timestamp(when)
46     def finished(self, shnums, when):
47         self._ev["finish_time"] = when
48         self._ev["success"] = True
49         self._ev["response_shnums"] = shnums
50         self._ds.update_last_timestamp(when)
51
52 class BlockRequestEvent:
53     def __init__(self, ev, ds):
54         self._ev = ev
55         self._ds = ds
56     def finished(self, received, when):
57         self._ev["finish_time"] = when
58         self._ev["success"] = True
59         self._ev["response_length"] = received
60         self._ds.update_last_timestamp(when)
61     def error(self, when):
62         self._ev["finish_time"] = when
63         self._ev["success"] = False
64         self._ds.update_last_timestamp(when)
65
66
67 class DownloadStatus:
68     # There is one DownloadStatus for each CiphertextFileNode. The status
69     # object will keep track of all activity for that node.
70     implements(IDownloadStatus)
71     statusid_counter = itertools.count(0)
72
73     def __init__(self, storage_index, size):
74         self.storage_index = storage_index
75         self.size = size
76         self.counter = self.statusid_counter.next()
77         self.helper = False
78
79         self.first_timestamp = None
80         self.last_timestamp = None
81
82         # all four of these _events lists are sorted by start_time, because
83         # they are strictly append-only (some elements are later mutated in
84         # place, but none are removed or inserted in the middle).
85
86         # self.read_events tracks read() requests. It is a list of dicts,
87         # each with the following keys:
88         #  start,length  (of data requested)
89         #  start_time
90         #  finish_time (None until finished)
91         #  bytes_returned (starts at 0, grows as segments are delivered)
92         #  decrypt_time (time spent in decrypt, None for ciphertext-only reads)
93         #  paused_time (time spent paused by client via pauseProducing)
94         self.read_events = []
95
96         # self.segment_events tracks segment requests and their resolution.
97         # It is a list of dicts:
98         #  segment_number
99         #  start_time
100         #  active_time (None until work has begun)
101         #  decode_time (time spent in decode, None until delievered)
102         #  finish_time (None until resolved)
103         #  success (None until resolved, then boolean)
104         #  segment_start (file offset of first byte, None until delivered)
105         #  segment_length (None until delivered)
106         self.segment_events = []
107
108         # self.dyhb_requests tracks "do you have a share" requests and
109         # responses. It is a list of dicts:
110         #  serverid (binary)
111         #  start_time
112         #  success (None until resolved, then boolean)
113         #  response_shnums (tuple, None until successful)
114         #  finish_time (None until resolved)
115         self.dyhb_requests = []
116
117         # self.block_requests tracks share-data requests and responses. It is
118         # a list of dicts:
119         #  serverid (binary),
120         #  shnum,
121         #  start,length,  (of data requested)
122         #  start_time
123         #  finish_time (None until resolved)
124         #  success (None until resolved, then bool)
125         #  response_length (None until success)
126         self.block_requests = []
127
128         self.known_shares = [] # (serverid, shnum)
129         self.problems = []
130
131
132     def add_read_event(self, start, length, when):
133         if self.first_timestamp is None:
134             self.first_timestamp = when
135         r = { "start": start,
136               "length": length,
137               "start_time": when,
138               "finish_time": None,
139               "bytes_returned": 0,
140               "decrypt_time": 0,
141               "paused_time": 0,
142               }
143         self.read_events.append(r)
144         return ReadEvent(r, self)
145
146     def add_segment_request(self, segnum, when):
147         if self.first_timestamp is None:
148             self.first_timestamp = when
149         r = { "segment_number": segnum,
150               "start_time": when,
151               "active_time": None,
152               "finish_time": None,
153               "success": None,
154               "decode_time": None,
155               "segment_start": None,
156               "segment_length": None,
157               }
158         self.segment_events.append(r)
159         return SegmentEvent(r, self)
160
161     def add_dyhb_request(self, serverid, when):
162         r = { "serverid": serverid,
163               "start_time": when,
164               "success": None,
165               "response_shnums": None,
166               "finish_time": None,
167               }
168         self.dyhb_requests.append(r)
169         return DYHBEvent(r, self)
170
171     def add_block_request(self, serverid, shnum, start, length, when):
172         r = { "serverid": serverid,
173               "shnum": shnum,
174               "start": start,
175               "length": length,
176               "start_time": when,
177               "finish_time": None,
178               "success": None,
179               "response_length": None,
180               }
181         self.block_requests.append(r)
182         return BlockRequestEvent(r, self)
183
184     def update_last_timestamp(self, when):
185         if self.last_timestamp is None or when > self.last_timestamp:
186             self.last_timestamp = when
187
188     def add_known_share(self, serverid, shnum):
189         self.known_shares.append( (serverid, shnum) )
190
191     def add_problem(self, p):
192         self.problems.append(p)
193
194     # IDownloadStatus methods
195     def get_counter(self):
196         return self.counter
197     def get_storage_index(self):
198         return self.storage_index
199     def get_size(self):
200         return self.size
201     def get_status(self):
202         # mention all outstanding segment requests
203         outstanding = set()
204         errorful = set()
205         outstanding = set([s_ev["segment_number"]
206                            for s_ev in self.segment_events
207                            if s_ev["finish_time"] is None])
208         errorful = set([s_ev["segment_number"]
209                         for s_ev in self.segment_events
210                         if s_ev["success"] is False])
211         def join(segnums):
212             if len(segnums) == 1:
213                 return "segment %s" % list(segnums)[0]
214             else:
215                 return "segments %s" % (",".join([str(i)
216                                                   for i in sorted(segnums)]))
217         error_s = ""
218         if errorful:
219             error_s = "; errors on %s" % join(errorful)
220         if outstanding:
221             s = "fetching %s" % join(outstanding)
222         else:
223             s = "idle"
224         return s + error_s
225
226     def get_progress(self):
227         # measure all read events that aren't completely done, return the
228         # total percentage complete for them
229         if not self.read_events:
230             return 0.0
231         total_outstanding, total_received = 0, 0
232         for r_ev in self.read_events:
233             if r_ev["finish_time"] is None:
234                 total_outstanding += r_ev["length"]
235                 total_received += r_ev["bytes_returned"]
236             # else ignore completed requests
237         if not total_outstanding:
238             return 1.0
239         return 1.0 * total_received / total_outstanding
240
241     def using_helper(self):
242         return False
243
244     def get_active(self):
245         # a download is considered active if it has at least one outstanding
246         # read() call
247         for r_ev in self.read_events:
248             (ign1, ign2, ign3, finishtime, ign4, ign5, ign6) = r_ev
249             if finishtime is None:
250                 return True
251         return False
252
253     def get_started(self):
254         return self.first_timestamp
255     def get_results(self):
256         return None # TODO