3 from zope.interface import implements
4 from allmydata.interfaces import IDownloadStatus
7 def __init__(self, ev, ds):
10 def update(self, bytes, decrypttime, pausetime):
11 self._ev["bytes_returned"] += bytes
12 self._ev["decrypt_time"] += decrypttime
13 self._ev["paused_time"] += pausetime
14 def finished(self, finishtime):
15 self._ev["finish_time"] = finishtime
16 self._ds.update_last_timestamp(finishtime)
19 def __init__(self, ev, ds):
22 def activate(self, when):
23 if self._ev["active_time"] is None:
24 self._ev["active_time"] = when
25 def deliver(self, when, start, length, decodetime):
26 assert self._ev["active_time"] is not None
27 self._ev["finish_time"] = when
28 self._ev["success"] = True
29 self._ev["decode_time"] = decodetime
30 self._ev["segment_start"] = start
31 self._ev["segment_length"] = length
32 self._ds.update_last_timestamp(when)
33 def error(self, when):
34 self._ev["finish_time"] = when
35 self._ev["success"] = False
36 self._ds.update_last_timestamp(when)
39 def __init__(self, ev, ds):
42 def error(self, when):
43 self._ev["finish_time"] = when
44 self._ev["success"] = False
45 self._ds.update_last_timestamp(when)
46 def finished(self, shnums, when):
47 self._ev["finish_time"] = when
48 self._ev["success"] = True
49 self._ev["response_shnums"] = shnums
50 self._ds.update_last_timestamp(when)
52 class BlockRequestEvent:
53 def __init__(self, ev, ds):
56 def finished(self, received, when):
57 self._ev["finish_time"] = when
58 self._ev["success"] = True
59 self._ev["response_length"] = received
60 self._ds.update_last_timestamp(when)
61 def error(self, when):
62 self._ev["finish_time"] = when
63 self._ev["success"] = False
64 self._ds.update_last_timestamp(when)
68 # There is one DownloadStatus for each CiphertextFileNode. The status
69 # object will keep track of all activity for that node.
70 implements(IDownloadStatus)
71 statusid_counter = itertools.count(0)
73 def __init__(self, storage_index, size):
74 self.storage_index = storage_index
76 self.counter = self.statusid_counter.next()
79 self.first_timestamp = None
80 self.last_timestamp = None
82 # all four of these _events lists are sorted by start_time, because
83 # they are strictly append-only (some elements are later mutated in
84 # place, but none are removed or inserted in the middle).
86 # self.read_events tracks read() requests. It is a list of dicts,
87 # each with the following keys:
88 # start,length (of data requested)
90 # finish_time (None until finished)
91 # bytes_returned (starts at 0, grows as segments are delivered)
92 # decrypt_time (time spent in decrypt, None for ciphertext-only reads)
93 # paused_time (time spent paused by client via pauseProducing)
96 # self.segment_events tracks segment requests and their resolution.
97 # It is a list of dicts:
100 # active_time (None until work has begun)
101 # decode_time (time spent in decode, None until delievered)
102 # finish_time (None until resolved)
103 # success (None until resolved, then boolean)
104 # segment_start (file offset of first byte, None until delivered)
105 # segment_length (None until delivered)
106 self.segment_events = []
108 # self.dyhb_requests tracks "do you have a share" requests and
109 # responses. It is a list of dicts:
112 # success (None until resolved, then boolean)
113 # response_shnums (tuple, None until successful)
114 # finish_time (None until resolved)
115 self.dyhb_requests = []
117 # self.block_requests tracks share-data requests and responses. It is
119 # server (instance of IServer)
121 # start,length, (of data requested)
123 # finish_time (None until resolved)
124 # success (None until resolved, then bool)
125 # response_length (None until success)
126 self.block_requests = []
128 self.known_shares = [] # (serverid, shnum)
132 def add_read_event(self, start, length, when):
133 if self.first_timestamp is None:
134 self.first_timestamp = when
135 r = { "start": start,
143 self.read_events.append(r)
144 return ReadEvent(r, self)
146 def add_segment_request(self, segnum, when):
147 if self.first_timestamp is None:
148 self.first_timestamp = when
149 r = { "segment_number": segnum,
155 "segment_start": None,
156 "segment_length": None,
158 self.segment_events.append(r)
159 return SegmentEvent(r, self)
161 def add_dyhb_request(self, serverid, when):
162 r = { "serverid": serverid,
165 "response_shnums": None,
168 self.dyhb_requests.append(r)
169 return DYHBEvent(r, self)
171 def add_block_request(self, server, shnum, start, length, when):
172 r = { "server": server,
179 "response_length": None,
181 self.block_requests.append(r)
182 return BlockRequestEvent(r, self)
184 def update_last_timestamp(self, when):
185 if self.last_timestamp is None or when > self.last_timestamp:
186 self.last_timestamp = when
188 def add_known_share(self, server, shnum): # XXX use me
189 self.known_shares.append( (server, shnum) )
191 def add_problem(self, p):
192 self.problems.append(p)
194 # IDownloadStatus methods
195 def get_counter(self):
197 def get_storage_index(self):
198 return self.storage_index
201 def get_status(self):
202 # mention all outstanding segment requests
205 outstanding = set([s_ev["segment_number"]
206 for s_ev in self.segment_events
207 if s_ev["finish_time"] is None])
208 errorful = set([s_ev["segment_number"]
209 for s_ev in self.segment_events
210 if s_ev["success"] is False])
212 if len(segnums) == 1:
213 return "segment %s" % list(segnums)[0]
215 return "segments %s" % (",".join([str(i)
216 for i in sorted(segnums)]))
219 error_s = "; errors on %s" % join(errorful)
221 s = "fetching %s" % join(outstanding)
226 def get_progress(self):
227 # measure all read events that aren't completely done, return the
228 # total percentage complete for them
229 if not self.read_events:
231 total_outstanding, total_received = 0, 0
232 for r_ev in self.read_events:
233 if r_ev["finish_time"] is None:
234 total_outstanding += r_ev["length"]
235 total_received += r_ev["bytes_returned"]
236 # else ignore completed requests
237 if not total_outstanding:
239 return 1.0 * total_received / total_outstanding
241 def using_helper(self):
244 def get_active(self):
245 # a download is considered active if it has at least one outstanding
247 for r_ev in self.read_events:
248 if r_ev["finish_time"] is None:
252 def get_started(self):
253 return self.first_timestamp
254 def get_results(self):