]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/offloaded.py
a71bf1324b80109d00879ded6167d91cadb8d04b
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / offloaded.py
1
2 import os, stat, time, weakref
3 from zope.interface import implements
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap.api import Referenceable, DeadReferenceError, eventually
7 import allmydata # for __full_version__
8 from allmydata import interfaces, uri
9 from allmydata.storage.server import si_b2a
10 from allmydata.immutable import upload
11 from allmydata.immutable.layout import ReadBucketProxy
12 from allmydata.util.assertutil import precondition
13 from allmydata.util import idlib, log, observer, fileutil, hashutil, dictutil
14
15
16 class NotEnoughWritersError(Exception):
17     pass
18
19
20 class CHKCheckerAndUEBFetcher:
21     """I check to see if a file is already present in the grid. I also fetch
22     the URI Extension Block, which is useful for an uploading client who
23     wants to avoid the work of encryption and encoding.
24
25     I return False if the file is not completely healthy: i.e. if there are
26     less than 'N' shares present.
27
28     If the file is completely healthy, I return a tuple of (sharemap,
29     UEB_data, UEB_hash).
30     """
31
32     def __init__(self, peer_getter, storage_index, logparent=None):
33         self._peer_getter = peer_getter
34         self._found_shares = set()
35         self._storage_index = storage_index
36         self._sharemap = dictutil.DictOfSets()
37         self._readers = set()
38         self._ueb_hash = None
39         self._ueb_data = None
40         self._logparent = logparent
41
42     def log(self, *args, **kwargs):
43         if 'facility' not in kwargs:
44             kwargs['facility'] = "tahoe.helper.chk.checkandUEBfetch"
45         if 'parent' not in kwargs:
46             kwargs['parent'] = self._logparent
47         return log.msg(*args, **kwargs)
48
49     def check(self):
50         d = self._get_all_shareholders(self._storage_index)
51         d.addCallback(self._get_uri_extension)
52         d.addCallback(self._done)
53         return d
54
55     def _get_all_shareholders(self, storage_index):
56         dl = []
57         for (peerid, ss) in self._peer_getter(storage_index):
58             d = ss.callRemote("get_buckets", storage_index)
59             d.addCallbacks(self._got_response, self._got_error,
60                            callbackArgs=(peerid,))
61             dl.append(d)
62         return defer.DeferredList(dl)
63
64     def _got_response(self, buckets, peerid):
65         # buckets is a dict: maps shum to an rref of the server who holds it
66         shnums_s = ",".join([str(shnum) for shnum in buckets])
67         self.log("got_response: [%s] has %d shares (%s)" %
68                  (idlib.shortnodeid_b2a(peerid), len(buckets), shnums_s),
69                  level=log.NOISY)
70         self._found_shares.update(buckets.keys())
71         for k in buckets:
72             self._sharemap.add(k, peerid)
73         self._readers.update( [ (bucket, peerid)
74                                 for bucket in buckets.values() ] )
75
76     def _got_error(self, f):
77         if f.check(DeadReferenceError):
78             return
79         log.err(f, parent=self._logparent)
80         pass
81
82     def _get_uri_extension(self, res):
83         # assume that we can pull the UEB from any share. If we get an error,
84         # declare the whole file unavailable.
85         if not self._readers:
86             self.log("no readers, so no UEB", level=log.NOISY)
87             return
88         b,peerid = self._readers.pop()
89         rbp = ReadBucketProxy(b, peerid, si_b2a(self._storage_index))
90         d = rbp.get_uri_extension()
91         d.addCallback(self._got_uri_extension)
92         d.addErrback(self._ueb_error)
93         return d
94
95     def _got_uri_extension(self, ueb):
96         self.log("_got_uri_extension", level=log.NOISY)
97         self._ueb_hash = hashutil.uri_extension_hash(ueb)
98         self._ueb_data = uri.unpack_extension(ueb)
99
100     def _ueb_error(self, f):
101         # an error means the file is unavailable, but the overall check
102         # shouldn't fail.
103         self.log("UEB fetch failed", failure=f, level=log.WEIRD, umid="sJLKVg")
104         return None
105
106     def _done(self, res):
107         if self._ueb_data:
108             found = len(self._found_shares)
109             total = self._ueb_data['total_shares']
110             self.log(format="got %(found)d shares of %(total)d",
111                      found=found, total=total, level=log.NOISY)
112             if found < total:
113                 # not all shares are present in the grid
114                 self.log("not enough to qualify, file not found in grid",
115                          level=log.NOISY)
116                 return False
117             # all shares are present
118             self.log("all shares present, file is found in grid",
119                      level=log.NOISY)
120             return (self._sharemap, self._ueb_data, self._ueb_hash)
121         # no shares are present
122         self.log("unable to find UEB data, file not found in grid",
123                  level=log.NOISY)
124         return False
125
126
127 class CHKUploadHelper(Referenceable, upload.CHKUploader):
128     """I am the helper-server -side counterpart to AssistedUploader. I handle
129     peer selection, encoding, and share pushing. I read ciphertext from the
130     remote AssistedUploader.
131     """
132     implements(interfaces.RICHKUploadHelper)
133     VERSION = { "http://allmydata.org/tahoe/protocols/helper/chk-upload/v1" :
134                  { },
135                 "application-version": str(allmydata.__full_version__),
136                 }
137
138     def __init__(self, storage_index, helper,
139                  incoming_file, encoding_file,
140                  results, log_number):
141         self._storage_index = storage_index
142         self._helper = helper
143         self._incoming_file = incoming_file
144         self._encoding_file = encoding_file
145         self._upload_id = si_b2a(storage_index)[:5]
146         self._log_number = log_number
147         self._results = results
148         self._upload_status = upload.UploadStatus()
149         self._upload_status.set_helper(False)
150         self._upload_status.set_storage_index(storage_index)
151         self._upload_status.set_status("fetching ciphertext")
152         self._upload_status.set_progress(0, 1.0)
153         self._helper.log("CHKUploadHelper starting for SI %s" % self._upload_id,
154                          parent=log_number)
155
156         self._client = helper.parent
157         self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
158                                              self._log_number)
159         self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
160         self._finished_observers = observer.OneShotObserverList()
161
162         d = self._fetcher.when_done()
163         d.addCallback(lambda res: self._reader.start())
164         d.addCallback(lambda res: self.start_encrypted(self._reader))
165         d.addCallback(self._finished)
166         d.addErrback(self._failed)
167
168     def log(self, *args, **kwargs):
169         if 'facility' not in kwargs:
170             kwargs['facility'] = "tahoe.helper.chk"
171         return upload.CHKUploader.log(self, *args, **kwargs)
172
173     def start(self):
174         self._started = time.time()
175         # determine if we need to upload the file. If so, return ({},self) .
176         # If not, return (UploadResults,None) .
177         self.log("deciding whether to upload the file or not", level=log.NOISY)
178         if os.path.exists(self._encoding_file):
179             # we have the whole file, and we might be encoding it (or the
180             # encode/upload might have failed, and we need to restart it).
181             self.log("ciphertext already in place", level=log.UNUSUAL)
182             return (self._results, self)
183         if os.path.exists(self._incoming_file):
184             # we have some of the file, but not all of it (otherwise we'd be
185             # encoding). The caller might be useful.
186             self.log("partial ciphertext already present", level=log.UNUSUAL)
187             return (self._results, self)
188         # we don't remember uploading this file
189         self.log("no ciphertext yet", level=log.NOISY)
190         return (self._results, self)
191
192     def remote_get_version(self):
193         return self.VERSION
194
195     def remote_upload(self, reader):
196         # reader is an RIEncryptedUploadable. I am specified to return an
197         # UploadResults dictionary.
198
199         # let our fetcher pull ciphertext from the reader.
200         self._fetcher.add_reader(reader)
201         # and also hashes
202         self._reader.add_reader(reader)
203
204         # and inform the client when the upload has finished
205         return self._finished_observers.when_fired()
206
207     def _finished(self, uploadresults):
208         precondition(isinstance(uploadresults.verifycapstr, str), uploadresults.verifycapstr)
209         assert interfaces.IUploadResults.providedBy(uploadresults), uploadresults
210         r = uploadresults
211         v = uri.from_string(r.verifycapstr)
212         r.uri_extension_hash = v.uri_extension_hash
213         f_times = self._fetcher.get_times()
214         r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
215         r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
216         r.timings["total_fetch"] = f_times["total"]
217         self._reader.close()
218         os.unlink(self._encoding_file)
219         self._finished_observers.fire(r)
220         self._helper.upload_finished(self._storage_index, v.size)
221         del self._reader
222
223     def _failed(self, f):
224         self.log(format="CHKUploadHelper(%(si)s) failed",
225                  si=si_b2a(self._storage_index)[:5],
226                  failure=f,
227                  level=log.UNUSUAL)
228         self._finished_observers.fire(f)
229         self._helper.upload_finished(self._storage_index, 0)
230         del self._reader
231
232 class AskUntilSuccessMixin:
233     # create me with a _reader array
234     _last_failure = None
235
236     def add_reader(self, reader):
237         self._readers.append(reader)
238
239     def call(self, *args, **kwargs):
240         if not self._readers:
241             raise NotEnoughWritersError("ran out of assisted uploaders, last failure was %s" % self._last_failure)
242         rr = self._readers[0]
243         d = rr.callRemote(*args, **kwargs)
244         def _err(f):
245             self._last_failure = f
246             if rr in self._readers:
247                 self._readers.remove(rr)
248             self._upload_helper.log("call to assisted uploader %s failed" % rr,
249                                     failure=f, level=log.UNUSUAL)
250             # we can try again with someone else who's left
251             return self.call(*args, **kwargs)
252         d.addErrback(_err)
253         return d
254
255 class CHKCiphertextFetcher(AskUntilSuccessMixin):
256     """I use one or more remote RIEncryptedUploadable instances to gather
257     ciphertext on disk. When I'm done, the file I create can be used by a
258     LocalCiphertextReader to satisfy the ciphertext needs of a CHK upload
259     process.
260
261     I begin pulling ciphertext as soon as a reader is added. I remove readers
262     when they have any sort of error. If the last reader is removed, I fire
263     my when_done() Deferred with a failure.
264
265     I fire my when_done() Deferred (with None) immediately after I have moved
266     the ciphertext to 'encoded_file'.
267     """
268
269     def __init__(self, helper, incoming_file, encoded_file, logparent):
270         self._upload_helper = helper
271         self._incoming_file = incoming_file
272         self._encoding_file = encoded_file
273         self._upload_id = helper._upload_id
274         self._log_parent = logparent
275         self._done_observers = observer.OneShotObserverList()
276         self._readers = []
277         self._started = False
278         self._f = None
279         self._times = {
280             "cumulative_fetch": 0.0,
281             "total": 0.0,
282             }
283         self._ciphertext_fetched = 0
284
285     def log(self, *args, **kwargs):
286         if "facility" not in kwargs:
287             kwargs["facility"] = "tahoe.helper.chkupload.fetch"
288         if "parent" not in kwargs:
289             kwargs["parent"] = self._log_parent
290         return log.msg(*args, **kwargs)
291
292     def add_reader(self, reader):
293         AskUntilSuccessMixin.add_reader(self, reader)
294         eventually(self._start)
295
296     def _start(self):
297         if self._started:
298             return
299         self._started = True
300         started = time.time()
301
302         if os.path.exists(self._encoding_file):
303             self.log("ciphertext already present, bypassing fetch",
304                      level=log.UNUSUAL)
305             # we'll still need the plaintext hashes (when
306             # LocalCiphertextReader.get_plaintext_hashtree_leaves() is
307             # called), and currently the easiest way to get them is to ask
308             # the sender for the last byte of ciphertext. That will provoke
309             # them into reading and hashing (but not sending) everything
310             # else.
311             have = os.stat(self._encoding_file)[stat.ST_SIZE]
312             d = self.call("read_encrypted", have-1, 1)
313             d.addCallback(self._done2, started)
314             return
315
316         # first, find out how large the file is going to be
317         d = self.call("get_size")
318         d.addCallback(self._got_size)
319         d.addCallback(self._start_reading)
320         d.addCallback(self._done)
321         d.addCallback(self._done2, started)
322         d.addErrback(self._failed)
323
324     def _got_size(self, size):
325         self.log("total size is %d bytes" % size, level=log.NOISY)
326         self._upload_helper._upload_status.set_size(size)
327         self._expected_size = size
328
329     def _start_reading(self, res):
330         # then find out how much crypttext we have on disk
331         if os.path.exists(self._incoming_file):
332             self._have = os.stat(self._incoming_file)[stat.ST_SIZE]
333             self._upload_helper._helper.count("chk_upload_helper.resumes")
334             self.log("we already have %d bytes" % self._have, level=log.NOISY)
335         else:
336             self._have = 0
337             self.log("we do not have any ciphertext yet", level=log.NOISY)
338         self.log("starting ciphertext fetch", level=log.NOISY)
339         self._f = open(self._incoming_file, "ab")
340
341         # now loop to pull the data from the readers
342         d = defer.Deferred()
343         self._loop(d)
344         # this Deferred will be fired once the last byte has been written to
345         # self._f
346         return d
347
348     # read data in 50kB chunks. We should choose a more considered number
349     # here, possibly letting the client specify it. The goal should be to
350     # keep the RTT*bandwidth to be less than 10% of the chunk size, to reduce
351     # the upload bandwidth lost because this protocol is non-windowing. Too
352     # large, however, means more memory consumption for both ends. Something
353     # that can be transferred in, say, 10 seconds sounds about right. On my
354     # home DSL line (50kBps upstream), that suggests 500kB. Most lines are
355     # slower, maybe 10kBps, which suggests 100kB, and that's a bit more
356     # memory than I want to hang on to, so I'm going to go with 50kB and see
357     # how that works.
358     CHUNK_SIZE = 50*1024
359
360     def _loop(self, fire_when_done):
361         # this slightly weird structure is needed because Deferreds don't do
362         # tail-recursion, so it is important to let each one retire promptly.
363         # Simply chaining them will cause a stack overflow at the end of a
364         # transfer that involves more than a few hundred chunks.
365         # 'fire_when_done' lives a long time, but the Deferreds returned by
366         # the inner _fetch() call do not.
367         start = time.time()
368         d = defer.maybeDeferred(self._fetch)
369         def _done(finished):
370             elapsed = time.time() - start
371             self._times["cumulative_fetch"] += elapsed
372             if finished:
373                 self.log("finished reading ciphertext", level=log.NOISY)
374                 fire_when_done.callback(None)
375             else:
376                 self._loop(fire_when_done)
377         def _err(f):
378             self.log(format="[%(si)s] ciphertext read failed",
379                      si=self._upload_id, failure=f, level=log.UNUSUAL)
380             fire_when_done.errback(f)
381         d.addCallbacks(_done, _err)
382         return None
383
384     def _fetch(self):
385         needed = self._expected_size - self._have
386         fetch_size = min(needed, self.CHUNK_SIZE)
387         if fetch_size == 0:
388             self._upload_helper._upload_status.set_progress(1, 1.0)
389             return True # all done
390         percent = 0.0
391         if self._expected_size:
392             percent = 1.0 * (self._have+fetch_size) / self._expected_size
393         self.log(format="fetching [%(si)s] %(start)d-%(end)d of %(total)d (%(percent)d%%)",
394                  si=self._upload_id,
395                  start=self._have,
396                  end=self._have+fetch_size,
397                  total=self._expected_size,
398                  percent=int(100.0*percent),
399                  level=log.NOISY)
400         d = self.call("read_encrypted", self._have, fetch_size)
401         def _got_data(ciphertext_v):
402             for data in ciphertext_v:
403                 self._f.write(data)
404                 self._have += len(data)
405                 self._ciphertext_fetched += len(data)
406                 self._upload_helper._helper.count("chk_upload_helper.fetched_bytes", len(data))
407                 self._upload_helper._upload_status.set_progress(1, percent)
408             return False # not done
409         d.addCallback(_got_data)
410         return d
411
412     def _done(self, res):
413         self._f.close()
414         self._f = None
415         self.log(format="done fetching ciphertext, size=%(size)d",
416                  size=os.stat(self._incoming_file)[stat.ST_SIZE],
417                  level=log.NOISY)
418         os.rename(self._incoming_file, self._encoding_file)
419
420     def _done2(self, _ignored, started):
421         self.log("done2", level=log.NOISY)
422         elapsed = time.time() - started
423         self._times["total"] = elapsed
424         self._readers = []
425         self._done_observers.fire(None)
426
427     def _failed(self, f):
428         if self._f:
429             self._f.close()
430         self._readers = []
431         self._done_observers.fire(f)
432
433     def when_done(self):
434         return self._done_observers.when_fired()
435
436     def get_times(self):
437         return self._times
438
439     def get_ciphertext_fetched(self):
440         return self._ciphertext_fetched
441
442
443 class LocalCiphertextReader(AskUntilSuccessMixin):
444     implements(interfaces.IEncryptedUploadable)
445
446     def __init__(self, upload_helper, storage_index, encoding_file):
447         self._readers = []
448         self._upload_helper = upload_helper
449         self._storage_index = storage_index
450         self._encoding_file = encoding_file
451         self._status = None
452
453     def start(self):
454         self._upload_helper._upload_status.set_status("pushing")
455         self._size = os.stat(self._encoding_file)[stat.ST_SIZE]
456         self.f = open(self._encoding_file, "rb")
457
458     def get_size(self):
459         return defer.succeed(self._size)
460
461     def get_all_encoding_parameters(self):
462         return self.call("get_all_encoding_parameters")
463
464     def get_storage_index(self):
465         return defer.succeed(self._storage_index)
466
467     def read_encrypted(self, length, hash_only):
468         assert hash_only is False
469         d = defer.maybeDeferred(self.f.read, length)
470         d.addCallback(lambda data: [data])
471         return d
472
473     def close(self):
474         self.f.close()
475         # ??. I'm not sure if it makes sense to forward the close message.
476         return self.call("close")
477
478
479
480 class Helper(Referenceable, service.MultiService):
481     implements(interfaces.RIHelper, interfaces.IStatsProducer)
482     # this is the non-distributed version. When we need to have multiple
483     # helpers, this object will become the HelperCoordinator, and will query
484     # the farm of Helpers to see if anyone has the storage_index of interest,
485     # and send the request off to them. If nobody has it, we'll choose a
486     # helper at random.
487
488     name = "helper"
489     VERSION = { "http://allmydata.org/tahoe/protocols/helper/v1" :
490                  { },
491                 "application-version": str(allmydata.__full_version__),
492                 }
493     chk_upload_helper_class = CHKUploadHelper
494     MAX_UPLOAD_STATUSES = 10
495
496     def __init__(self, basedir, stats_provider=None):
497         self._basedir = basedir
498         self._chk_incoming = os.path.join(basedir, "CHK_incoming")
499         self._chk_encoding = os.path.join(basedir, "CHK_encoding")
500         fileutil.make_dirs(self._chk_incoming)
501         fileutil.make_dirs(self._chk_encoding)
502         self._active_uploads = {}
503         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
504         self._all_upload_statuses = weakref.WeakKeyDictionary()
505         self._recent_upload_statuses = []
506         self.stats_provider = stats_provider
507         if stats_provider:
508             stats_provider.register_producer(self)
509         self._counters = {"chk_upload_helper.upload_requests": 0,
510                           "chk_upload_helper.upload_already_present": 0,
511                           "chk_upload_helper.upload_need_upload": 0,
512                           "chk_upload_helper.resumes": 0,
513                           "chk_upload_helper.fetched_bytes": 0,
514                           "chk_upload_helper.encoded_bytes": 0,
515                           }
516         service.MultiService.__init__(self)
517
518     def setServiceParent(self, parent):
519         service.MultiService.setServiceParent(self, parent)
520
521     def log(self, *args, **kwargs):
522         if 'facility' not in kwargs:
523             kwargs['facility'] = "tahoe.helper"
524         return self.parent.log(*args, **kwargs)
525
526     def count(self, key, value=1):
527         if self.stats_provider:
528             self.stats_provider.count(key, value)
529         self._counters[key] += value
530
531     def get_stats(self):
532         OLD = 86400*2 # 48hours
533         now = time.time()
534         inc_count = inc_size = inc_size_old = 0
535         enc_count = enc_size = enc_size_old = 0
536         inc = os.listdir(self._chk_incoming)
537         enc = os.listdir(self._chk_encoding)
538         for f in inc:
539             s = os.stat(os.path.join(self._chk_incoming, f))
540             size = s[stat.ST_SIZE]
541             mtime = s[stat.ST_MTIME]
542             inc_count += 1
543             inc_size += size
544             if now - mtime > OLD:
545                 inc_size_old += size
546         for f in enc:
547             s = os.stat(os.path.join(self._chk_encoding, f))
548             size = s[stat.ST_SIZE]
549             mtime = s[stat.ST_MTIME]
550             enc_count += 1
551             enc_size += size
552             if now - mtime > OLD:
553                 enc_size_old += size
554         stats = { 'chk_upload_helper.active_uploads': len(self._active_uploads),
555                   'chk_upload_helper.incoming_count': inc_count,
556                   'chk_upload_helper.incoming_size': inc_size,
557                   'chk_upload_helper.incoming_size_old': inc_size_old,
558                   'chk_upload_helper.encoding_count': enc_count,
559                   'chk_upload_helper.encoding_size': enc_size,
560                   'chk_upload_helper.encoding_size_old': enc_size_old,
561                   }
562         stats.update(self._counters)
563         return stats
564
565     def remote_get_version(self):
566         return self.VERSION
567
568     def remote_upload_chk(self, storage_index):
569         self.count("chk_upload_helper.upload_requests")
570         r = upload.UploadResults()
571         started = time.time()
572         si_s = si_b2a(storage_index)
573         lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
574         incoming_file = os.path.join(self._chk_incoming, si_s)
575         encoding_file = os.path.join(self._chk_encoding, si_s)
576         if storage_index in self._active_uploads:
577             self.log("upload is currently active", parent=lp)
578             uh = self._active_uploads[storage_index]
579             return uh.start()
580
581         d = self._check_for_chk_already_in_grid(storage_index, r, lp)
582         def _checked(already_present):
583             elapsed = time.time() - started
584             r.timings['existence_check'] = elapsed
585             if already_present:
586                 # the necessary results are placed in the UploadResults
587                 self.count("chk_upload_helper.upload_already_present")
588                 self.log("file already found in grid", parent=lp)
589                 return (r, None)
590
591             self.count("chk_upload_helper.upload_need_upload")
592             # the file is not present in the grid, by which we mean there are
593             # less than 'N' shares available.
594             self.log("unable to find file in the grid", parent=lp,
595                      level=log.NOISY)
596             # We need an upload helper. Check our active uploads again in
597             # case there was a race.
598             if storage_index in self._active_uploads:
599                 self.log("upload is currently active", parent=lp)
600                 uh = self._active_uploads[storage_index]
601             else:
602                 self.log("creating new upload helper", parent=lp)
603                 uh = self.chk_upload_helper_class(storage_index, self,
604                                                   incoming_file, encoding_file,
605                                                   r, lp)
606                 self._active_uploads[storage_index] = uh
607                 self._add_upload(uh)
608             return uh.start()
609         d.addCallback(_checked)
610         def _err(f):
611             self.log("error while checking for chk-already-in-grid",
612                      failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
613             return f
614         d.addErrback(_err)
615         return d
616
617     def _check_for_chk_already_in_grid(self, storage_index, results, lp):
618         # see if this file is already in the grid
619         lp2 = self.log("doing a quick check+UEBfetch",
620                        parent=lp, level=log.NOISY)
621         sb = self.parent.get_storage_broker()
622         c = CHKCheckerAndUEBFetcher(sb.get_servers, storage_index, lp2)
623         d = c.check()
624         def _checked(res):
625             if res:
626                 (sharemap, ueb_data, ueb_hash) = res
627                 self.log("found file in grid", level=log.NOISY, parent=lp)
628                 results.uri_extension_hash = ueb_hash
629                 results.sharemap = sharemap
630                 results.uri_extension_data = ueb_data
631                 results.preexisting_shares = len(sharemap)
632                 results.pushed_shares = 0
633                 return True
634             return False
635         d.addCallback(_checked)
636         return d
637
638     def _add_upload(self, uh):
639         self._all_uploads[uh] = None
640         s = uh.get_upload_status()
641         self._all_upload_statuses[s] = None
642         self._recent_upload_statuses.append(s)
643         while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
644             self._recent_upload_statuses.pop(0)
645
646     def upload_finished(self, storage_index, size):
647         # this is called with size=0 if the upload failed
648         self.count("chk_upload_helper.encoded_bytes", size)
649         uh = self._active_uploads[storage_index]
650         del self._active_uploads[storage_index]
651         s = uh.get_upload_status()
652         s.set_active(False)
653
654     def get_all_upload_statuses(self):
655         return self._all_upload_statuses