]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/offloaded.py
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / offloaded.py
1
2 import os, stat, time, weakref
3 from zope.interface import implements
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap.api import Referenceable, DeadReferenceError, eventually
7 import allmydata # for __full_version__
8 from allmydata import interfaces, uri
9 from allmydata.storage.server import si_b2a
10 from allmydata.immutable import upload
11 from allmydata.immutable.layout import ReadBucketProxy
12 from allmydata.util.assertutil import precondition
13 from allmydata.util import idlib, log, observer, fileutil, hashutil, dictutil
14
15
16 class NotEnoughWritersError(Exception):
17     pass
18
19
20 class CHKCheckerAndUEBFetcher:
21     """I check to see if a file is already present in the grid. I also fetch
22     the URI Extension Block, which is useful for an uploading client who
23     wants to avoid the work of encryption and encoding.
24
25     I return False if the file is not completely healthy: i.e. if there are
26     less than 'N' shares present.
27
28     If the file is completely healthy, I return a tuple of (sharemap,
29     UEB_data, UEB_hash).
30     """
31
32     def __init__(self, peer_getter, storage_index, logparent=None):
33         self._peer_getter = peer_getter
34         self._found_shares = set()
35         self._storage_index = storage_index
36         self._sharemap = dictutil.DictOfSets()
37         self._readers = set()
38         self._ueb_hash = None
39         self._ueb_data = None
40         self._logparent = logparent
41
42     def log(self, *args, **kwargs):
43         if 'facility' not in kwargs:
44             kwargs['facility'] = "tahoe.helper.chk.checkandUEBfetch"
45         if 'parent' not in kwargs:
46             kwargs['parent'] = self._logparent
47         return log.msg(*args, **kwargs)
48
49     def check(self):
50         d = self._get_all_shareholders(self._storage_index)
51         d.addCallback(self._get_uri_extension)
52         d.addCallback(self._done)
53         return d
54
55     def _get_all_shareholders(self, storage_index):
56         dl = []
57         for (peerid, ss) in self._peer_getter(storage_index):
58             d = ss.callRemote("get_buckets", storage_index)
59             d.addCallbacks(self._got_response, self._got_error,
60                            callbackArgs=(peerid,))
61             dl.append(d)
62         return defer.DeferredList(dl)
63
64     def _got_response(self, buckets, peerid):
65         # buckets is a dict: maps shum to an rref of the server who holds it
66         shnums_s = ",".join([str(shnum) for shnum in buckets])
67         self.log("got_response: [%s] has %d shares (%s)" %
68                  (idlib.shortnodeid_b2a(peerid), len(buckets), shnums_s),
69                  level=log.NOISY)
70         self._found_shares.update(buckets.keys())
71         for k in buckets:
72             self._sharemap.add(k, peerid)
73         self._readers.update( [ (bucket, peerid)
74                                 for bucket in buckets.values() ] )
75
76     def _got_error(self, f):
77         if f.check(DeadReferenceError):
78             return
79         log.err(f, parent=self._logparent)
80         pass
81
82     def _get_uri_extension(self, res):
83         # assume that we can pull the UEB from any share. If we get an error,
84         # declare the whole file unavailable.
85         if not self._readers:
86             self.log("no readers, so no UEB", level=log.NOISY)
87             return
88         b,peerid = self._readers.pop()
89         rbp = ReadBucketProxy(b, peerid, si_b2a(self._storage_index))
90         d = rbp.get_uri_extension()
91         d.addCallback(self._got_uri_extension)
92         d.addErrback(self._ueb_error)
93         return d
94
95     def _got_uri_extension(self, ueb):
96         self.log("_got_uri_extension", level=log.NOISY)
97         self._ueb_hash = hashutil.uri_extension_hash(ueb)
98         self._ueb_data = uri.unpack_extension(ueb)
99
100     def _ueb_error(self, f):
101         # an error means the file is unavailable, but the overall check
102         # shouldn't fail.
103         self.log("UEB fetch failed", failure=f, level=log.WEIRD, umid="sJLKVg")
104         return None
105
106     def _done(self, res):
107         if self._ueb_data:
108             found = len(self._found_shares)
109             total = self._ueb_data['total_shares']
110             self.log(format="got %(found)d shares of %(total)d",
111                      found=found, total=total, level=log.NOISY)
112             if found < total:
113                 # not all shares are present in the grid
114                 self.log("not enough to qualify, file not found in grid",
115                          level=log.NOISY)
116                 return False
117             # all shares are present
118             self.log("all shares present, file is found in grid",
119                      level=log.NOISY)
120             return (self._sharemap, self._ueb_data, self._ueb_hash)
121         # no shares are present
122         self.log("unable to find UEB data, file not found in grid",
123                  level=log.NOISY)
124         return False
125
126
127 class CHKUploadHelper(Referenceable, upload.CHKUploader):
128     """I am the helper-server -side counterpart to AssistedUploader. I handle
129     peer selection, encoding, and share pushing. I read ciphertext from the
130     remote AssistedUploader.
131     """
132     implements(interfaces.RICHKUploadHelper)
133     VERSION = { "http://allmydata.org/tahoe/protocols/helper/chk-upload/v1" :
134                  { },
135                 "application-version": str(allmydata.__full_version__),
136                 }
137
138     def __init__(self, storage_index, helper,
139                  incoming_file, encoding_file,
140                  results, log_number):
141         self._storage_index = storage_index
142         self._helper = helper
143         self._incoming_file = incoming_file
144         self._encoding_file = encoding_file
145         self._upload_id = si_b2a(storage_index)[:5]
146         self._log_number = log_number
147         self._results = results
148         self._upload_status = upload.UploadStatus()
149         self._upload_status.set_helper(False)
150         self._upload_status.set_storage_index(storage_index)
151         self._upload_status.set_status("fetching ciphertext")
152         self._upload_status.set_progress(0, 1.0)
153         self._helper.log("CHKUploadHelper starting for SI %s" % self._upload_id,
154                          parent=log_number)
155
156         client = helper.parent
157         self._storage_broker = client.get_storage_broker()
158         self._secret_holder = client._secret_holder
159         self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
160                                              self._log_number)
161         self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
162         self._finished_observers = observer.OneShotObserverList()
163
164         d = self._fetcher.when_done()
165         d.addCallback(lambda res: self._reader.start())
166         d.addCallback(lambda res: self.start_encrypted(self._reader))
167         d.addCallback(self._finished)
168         d.addErrback(self._failed)
169
170     def log(self, *args, **kwargs):
171         if 'facility' not in kwargs:
172             kwargs['facility'] = "tahoe.helper.chk"
173         return upload.CHKUploader.log(self, *args, **kwargs)
174
175     def start(self):
176         self._started = time.time()
177         # determine if we need to upload the file. If so, return ({},self) .
178         # If not, return (UploadResults,None) .
179         self.log("deciding whether to upload the file or not", level=log.NOISY)
180         if os.path.exists(self._encoding_file):
181             # we have the whole file, and we might be encoding it (or the
182             # encode/upload might have failed, and we need to restart it).
183             self.log("ciphertext already in place", level=log.UNUSUAL)
184             return (self._results, self)
185         if os.path.exists(self._incoming_file):
186             # we have some of the file, but not all of it (otherwise we'd be
187             # encoding). The caller might be useful.
188             self.log("partial ciphertext already present", level=log.UNUSUAL)
189             return (self._results, self)
190         # we don't remember uploading this file
191         self.log("no ciphertext yet", level=log.NOISY)
192         return (self._results, self)
193
194     def remote_get_version(self):
195         return self.VERSION
196
197     def remote_upload(self, reader):
198         # reader is an RIEncryptedUploadable. I am specified to return an
199         # UploadResults dictionary.
200
201         # let our fetcher pull ciphertext from the reader.
202         self._fetcher.add_reader(reader)
203         # and also hashes
204         self._reader.add_reader(reader)
205
206         # and inform the client when the upload has finished
207         return self._finished_observers.when_fired()
208
209     def _finished(self, uploadresults):
210         precondition(isinstance(uploadresults.verifycapstr, str), uploadresults.verifycapstr)
211         assert interfaces.IUploadResults.providedBy(uploadresults), uploadresults
212         r = uploadresults
213         v = uri.from_string(r.verifycapstr)
214         r.uri_extension_hash = v.uri_extension_hash
215         f_times = self._fetcher.get_times()
216         r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
217         r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
218         r.timings["total_fetch"] = f_times["total"]
219         self._reader.close()
220         os.unlink(self._encoding_file)
221         self._finished_observers.fire(r)
222         self._helper.upload_finished(self._storage_index, v.size)
223         del self._reader
224
225     def _failed(self, f):
226         self.log(format="CHKUploadHelper(%(si)s) failed",
227                  si=si_b2a(self._storage_index)[:5],
228                  failure=f,
229                  level=log.UNUSUAL)
230         self._finished_observers.fire(f)
231         self._helper.upload_finished(self._storage_index, 0)
232         del self._reader
233
234 class AskUntilSuccessMixin:
235     # create me with a _reader array
236     _last_failure = None
237
238     def add_reader(self, reader):
239         self._readers.append(reader)
240
241     def call(self, *args, **kwargs):
242         if not self._readers:
243             raise NotEnoughWritersError("ran out of assisted uploaders, last failure was %s" % self._last_failure)
244         rr = self._readers[0]
245         d = rr.callRemote(*args, **kwargs)
246         def _err(f):
247             self._last_failure = f
248             if rr in self._readers:
249                 self._readers.remove(rr)
250             self._upload_helper.log("call to assisted uploader %s failed" % rr,
251                                     failure=f, level=log.UNUSUAL)
252             # we can try again with someone else who's left
253             return self.call(*args, **kwargs)
254         d.addErrback(_err)
255         return d
256
257 class CHKCiphertextFetcher(AskUntilSuccessMixin):
258     """I use one or more remote RIEncryptedUploadable instances to gather
259     ciphertext on disk. When I'm done, the file I create can be used by a
260     LocalCiphertextReader to satisfy the ciphertext needs of a CHK upload
261     process.
262
263     I begin pulling ciphertext as soon as a reader is added. I remove readers
264     when they have any sort of error. If the last reader is removed, I fire
265     my when_done() Deferred with a failure.
266
267     I fire my when_done() Deferred (with None) immediately after I have moved
268     the ciphertext to 'encoded_file'.
269     """
270
271     def __init__(self, helper, incoming_file, encoded_file, logparent):
272         self._upload_helper = helper
273         self._incoming_file = incoming_file
274         self._encoding_file = encoded_file
275         self._upload_id = helper._upload_id
276         self._log_parent = logparent
277         self._done_observers = observer.OneShotObserverList()
278         self._readers = []
279         self._started = False
280         self._f = None
281         self._times = {
282             "cumulative_fetch": 0.0,
283             "total": 0.0,
284             }
285         self._ciphertext_fetched = 0
286
287     def log(self, *args, **kwargs):
288         if "facility" not in kwargs:
289             kwargs["facility"] = "tahoe.helper.chkupload.fetch"
290         if "parent" not in kwargs:
291             kwargs["parent"] = self._log_parent
292         return log.msg(*args, **kwargs)
293
294     def add_reader(self, reader):
295         AskUntilSuccessMixin.add_reader(self, reader)
296         eventually(self._start)
297
298     def _start(self):
299         if self._started:
300             return
301         self._started = True
302         started = time.time()
303
304         if os.path.exists(self._encoding_file):
305             self.log("ciphertext already present, bypassing fetch",
306                      level=log.UNUSUAL)
307             # we'll still need the plaintext hashes (when
308             # LocalCiphertextReader.get_plaintext_hashtree_leaves() is
309             # called), and currently the easiest way to get them is to ask
310             # the sender for the last byte of ciphertext. That will provoke
311             # them into reading and hashing (but not sending) everything
312             # else.
313             have = os.stat(self._encoding_file)[stat.ST_SIZE]
314             d = self.call("read_encrypted", have-1, 1)
315             d.addCallback(self._done2, started)
316             return
317
318         # first, find out how large the file is going to be
319         d = self.call("get_size")
320         d.addCallback(self._got_size)
321         d.addCallback(self._start_reading)
322         d.addCallback(self._done)
323         d.addCallback(self._done2, started)
324         d.addErrback(self._failed)
325
326     def _got_size(self, size):
327         self.log("total size is %d bytes" % size, level=log.NOISY)
328         self._upload_helper._upload_status.set_size(size)
329         self._expected_size = size
330
331     def _start_reading(self, res):
332         # then find out how much crypttext we have on disk
333         if os.path.exists(self._incoming_file):
334             self._have = os.stat(self._incoming_file)[stat.ST_SIZE]
335             self._upload_helper._helper.count("chk_upload_helper.resumes")
336             self.log("we already have %d bytes" % self._have, level=log.NOISY)
337         else:
338             self._have = 0
339             self.log("we do not have any ciphertext yet", level=log.NOISY)
340         self.log("starting ciphertext fetch", level=log.NOISY)
341         self._f = open(self._incoming_file, "ab")
342
343         # now loop to pull the data from the readers
344         d = defer.Deferred()
345         self._loop(d)
346         # this Deferred will be fired once the last byte has been written to
347         # self._f
348         return d
349
350     # read data in 50kB chunks. We should choose a more considered number
351     # here, possibly letting the client specify it. The goal should be to
352     # keep the RTT*bandwidth to be less than 10% of the chunk size, to reduce
353     # the upload bandwidth lost because this protocol is non-windowing. Too
354     # large, however, means more memory consumption for both ends. Something
355     # that can be transferred in, say, 10 seconds sounds about right. On my
356     # home DSL line (50kBps upstream), that suggests 500kB. Most lines are
357     # slower, maybe 10kBps, which suggests 100kB, and that's a bit more
358     # memory than I want to hang on to, so I'm going to go with 50kB and see
359     # how that works.
360     CHUNK_SIZE = 50*1024
361
362     def _loop(self, fire_when_done):
363         # this slightly weird structure is needed because Deferreds don't do
364         # tail-recursion, so it is important to let each one retire promptly.
365         # Simply chaining them will cause a stack overflow at the end of a
366         # transfer that involves more than a few hundred chunks.
367         # 'fire_when_done' lives a long time, but the Deferreds returned by
368         # the inner _fetch() call do not.
369         start = time.time()
370         d = defer.maybeDeferred(self._fetch)
371         def _done(finished):
372             elapsed = time.time() - start
373             self._times["cumulative_fetch"] += elapsed
374             if finished:
375                 self.log("finished reading ciphertext", level=log.NOISY)
376                 fire_when_done.callback(None)
377             else:
378                 self._loop(fire_when_done)
379         def _err(f):
380             self.log(format="[%(si)s] ciphertext read failed",
381                      si=self._upload_id, failure=f, level=log.UNUSUAL)
382             fire_when_done.errback(f)
383         d.addCallbacks(_done, _err)
384         return None
385
386     def _fetch(self):
387         needed = self._expected_size - self._have
388         fetch_size = min(needed, self.CHUNK_SIZE)
389         if fetch_size == 0:
390             self._upload_helper._upload_status.set_progress(1, 1.0)
391             return True # all done
392         percent = 0.0
393         if self._expected_size:
394             percent = 1.0 * (self._have+fetch_size) / self._expected_size
395         self.log(format="fetching [%(si)s] %(start)d-%(end)d of %(total)d (%(percent)d%%)",
396                  si=self._upload_id,
397                  start=self._have,
398                  end=self._have+fetch_size,
399                  total=self._expected_size,
400                  percent=int(100.0*percent),
401                  level=log.NOISY)
402         d = self.call("read_encrypted", self._have, fetch_size)
403         def _got_data(ciphertext_v):
404             for data in ciphertext_v:
405                 self._f.write(data)
406                 self._have += len(data)
407                 self._ciphertext_fetched += len(data)
408                 self._upload_helper._helper.count("chk_upload_helper.fetched_bytes", len(data))
409                 self._upload_helper._upload_status.set_progress(1, percent)
410             return False # not done
411         d.addCallback(_got_data)
412         return d
413
414     def _done(self, res):
415         self._f.close()
416         self._f = None
417         self.log(format="done fetching ciphertext, size=%(size)d",
418                  size=os.stat(self._incoming_file)[stat.ST_SIZE],
419                  level=log.NOISY)
420         os.rename(self._incoming_file, self._encoding_file)
421
422     def _done2(self, _ignored, started):
423         self.log("done2", level=log.NOISY)
424         elapsed = time.time() - started
425         self._times["total"] = elapsed
426         self._readers = []
427         self._done_observers.fire(None)
428
429     def _failed(self, f):
430         if self._f:
431             self._f.close()
432         self._readers = []
433         self._done_observers.fire(f)
434
435     def when_done(self):
436         return self._done_observers.when_fired()
437
438     def get_times(self):
439         return self._times
440
441     def get_ciphertext_fetched(self):
442         return self._ciphertext_fetched
443
444
445 class LocalCiphertextReader(AskUntilSuccessMixin):
446     implements(interfaces.IEncryptedUploadable)
447
448     def __init__(self, upload_helper, storage_index, encoding_file):
449         self._readers = []
450         self._upload_helper = upload_helper
451         self._storage_index = storage_index
452         self._encoding_file = encoding_file
453         self._status = None
454
455     def start(self):
456         self._upload_helper._upload_status.set_status("pushing")
457         self._size = os.stat(self._encoding_file)[stat.ST_SIZE]
458         self.f = open(self._encoding_file, "rb")
459
460     def get_size(self):
461         return defer.succeed(self._size)
462
463     def get_all_encoding_parameters(self):
464         return self.call("get_all_encoding_parameters")
465
466     def get_storage_index(self):
467         return defer.succeed(self._storage_index)
468
469     def read_encrypted(self, length, hash_only):
470         assert hash_only is False
471         d = defer.maybeDeferred(self.f.read, length)
472         d.addCallback(lambda data: [data])
473         return d
474
475     def close(self):
476         self.f.close()
477         # ??. I'm not sure if it makes sense to forward the close message.
478         return self.call("close")
479
480
481
482 class Helper(Referenceable, service.MultiService):
483     implements(interfaces.RIHelper, interfaces.IStatsProducer)
484     # this is the non-distributed version. When we need to have multiple
485     # helpers, this object will become the HelperCoordinator, and will query
486     # the farm of Helpers to see if anyone has the storage_index of interest,
487     # and send the request off to them. If nobody has it, we'll choose a
488     # helper at random.
489
490     name = "helper"
491     VERSION = { "http://allmydata.org/tahoe/protocols/helper/v1" :
492                  { },
493                 "application-version": str(allmydata.__full_version__),
494                 }
495     chk_upload_helper_class = CHKUploadHelper
496     MAX_UPLOAD_STATUSES = 10
497
498     def __init__(self, basedir, stats_provider=None, history=None):
499         self._basedir = basedir
500         self._chk_incoming = os.path.join(basedir, "CHK_incoming")
501         self._chk_encoding = os.path.join(basedir, "CHK_encoding")
502         fileutil.make_dirs(self._chk_incoming)
503         fileutil.make_dirs(self._chk_encoding)
504         self._active_uploads = {}
505         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
506         self.stats_provider = stats_provider
507         if stats_provider:
508             stats_provider.register_producer(self)
509         self._counters = {"chk_upload_helper.upload_requests": 0,
510                           "chk_upload_helper.upload_already_present": 0,
511                           "chk_upload_helper.upload_need_upload": 0,
512                           "chk_upload_helper.resumes": 0,
513                           "chk_upload_helper.fetched_bytes": 0,
514                           "chk_upload_helper.encoded_bytes": 0,
515                           }
516         self._history = history
517         service.MultiService.__init__(self)
518
519     def setServiceParent(self, parent):
520         service.MultiService.setServiceParent(self, parent)
521
522     def log(self, *args, **kwargs):
523         if 'facility' not in kwargs:
524             kwargs['facility'] = "tahoe.helper"
525         return self.parent.log(*args, **kwargs)
526
527     def count(self, key, value=1):
528         if self.stats_provider:
529             self.stats_provider.count(key, value)
530         self._counters[key] += value
531
532     def get_stats(self):
533         OLD = 86400*2 # 48hours
534         now = time.time()
535         inc_count = inc_size = inc_size_old = 0
536         enc_count = enc_size = enc_size_old = 0
537         inc = os.listdir(self._chk_incoming)
538         enc = os.listdir(self._chk_encoding)
539         for f in inc:
540             s = os.stat(os.path.join(self._chk_incoming, f))
541             size = s[stat.ST_SIZE]
542             mtime = s[stat.ST_MTIME]
543             inc_count += 1
544             inc_size += size
545             if now - mtime > OLD:
546                 inc_size_old += size
547         for f in enc:
548             s = os.stat(os.path.join(self._chk_encoding, f))
549             size = s[stat.ST_SIZE]
550             mtime = s[stat.ST_MTIME]
551             enc_count += 1
552             enc_size += size
553             if now - mtime > OLD:
554                 enc_size_old += size
555         stats = { 'chk_upload_helper.active_uploads': len(self._active_uploads),
556                   'chk_upload_helper.incoming_count': inc_count,
557                   'chk_upload_helper.incoming_size': inc_size,
558                   'chk_upload_helper.incoming_size_old': inc_size_old,
559                   'chk_upload_helper.encoding_count': enc_count,
560                   'chk_upload_helper.encoding_size': enc_size,
561                   'chk_upload_helper.encoding_size_old': enc_size_old,
562                   }
563         stats.update(self._counters)
564         return stats
565
566     def remote_get_version(self):
567         return self.VERSION
568
569     def remote_upload_chk(self, storage_index):
570         self.count("chk_upload_helper.upload_requests")
571         r = upload.UploadResults()
572         started = time.time()
573         si_s = si_b2a(storage_index)
574         lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
575         incoming_file = os.path.join(self._chk_incoming, si_s)
576         encoding_file = os.path.join(self._chk_encoding, si_s)
577         if storage_index in self._active_uploads:
578             self.log("upload is currently active", parent=lp)
579             uh = self._active_uploads[storage_index]
580             return uh.start()
581
582         d = self._check_for_chk_already_in_grid(storage_index, r, lp)
583         def _checked(already_present):
584             elapsed = time.time() - started
585             r.timings['existence_check'] = elapsed
586             if already_present:
587                 # the necessary results are placed in the UploadResults
588                 self.count("chk_upload_helper.upload_already_present")
589                 self.log("file already found in grid", parent=lp)
590                 return (r, None)
591
592             self.count("chk_upload_helper.upload_need_upload")
593             # the file is not present in the grid, by which we mean there are
594             # less than 'N' shares available.
595             self.log("unable to find file in the grid", parent=lp,
596                      level=log.NOISY)
597             # We need an upload helper. Check our active uploads again in
598             # case there was a race.
599             if storage_index in self._active_uploads:
600                 self.log("upload is currently active", parent=lp)
601                 uh = self._active_uploads[storage_index]
602             else:
603                 self.log("creating new upload helper", parent=lp)
604                 uh = self.chk_upload_helper_class(storage_index, self,
605                                                   incoming_file, encoding_file,
606                                                   r, lp)
607                 self._active_uploads[storage_index] = uh
608                 self._add_upload(uh)
609             return uh.start()
610         d.addCallback(_checked)
611         def _err(f):
612             self.log("error while checking for chk-already-in-grid",
613                      failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
614             return f
615         d.addErrback(_err)
616         return d
617
618     def _check_for_chk_already_in_grid(self, storage_index, results, lp):
619         # see if this file is already in the grid
620         lp2 = self.log("doing a quick check+UEBfetch",
621                        parent=lp, level=log.NOISY)
622         sb = self.parent.get_storage_broker()
623         c = CHKCheckerAndUEBFetcher(sb.get_servers_for_index, storage_index, lp2)
624         d = c.check()
625         def _checked(res):
626             if res:
627                 (sharemap, ueb_data, ueb_hash) = res
628                 self.log("found file in grid", level=log.NOISY, parent=lp)
629                 results.uri_extension_hash = ueb_hash
630                 results.sharemap = sharemap
631                 results.uri_extension_data = ueb_data
632                 results.preexisting_shares = len(sharemap)
633                 results.pushed_shares = 0
634                 return True
635             return False
636         d.addCallback(_checked)
637         return d
638
639     def _add_upload(self, uh):
640         self._all_uploads[uh] = None
641         if self._history:
642             s = uh.get_upload_status()
643             self._history.notify_helper_upload(s)
644
645     def upload_finished(self, storage_index, size):
646         # this is called with size=0 if the upload failed
647         self.count("chk_upload_helper.encoded_bytes", size)
648         uh = self._active_uploads[storage_index]
649         del self._active_uploads[storage_index]
650         s = uh.get_upload_status()
651         s.set_active(False)