]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/offloaded.py
fbd756be291bc46f38a16b952b7f4f0460cff307
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / offloaded.py
1
2 import os, stat, time, weakref
3 from zope.interface import implements
4 from twisted.internet import defer
5 from foolscap.api import Referenceable, DeadReferenceError, eventually
6 import allmydata # for __full_version__
7 from allmydata import interfaces, uri
8 from allmydata.storage.server import si_b2a
9 from allmydata.immutable import upload
10 from allmydata.immutable.layout import ReadBucketProxy
11 from allmydata.util.assertutil import precondition
12 from allmydata.util import log, observer, fileutil, hashutil, dictutil
13
14
15 class NotEnoughWritersError(Exception):
16     pass
17
18
19 class CHKCheckerAndUEBFetcher:
20     """I check to see if a file is already present in the grid. I also fetch
21     the URI Extension Block, which is useful for an uploading client who
22     wants to avoid the work of encryption and encoding.
23
24     I return False if the file is not completely healthy: i.e. if there are
25     less than 'N' shares present.
26
27     If the file is completely healthy, I return a tuple of (sharemap,
28     UEB_data, UEB_hash).
29     """
30
31     def __init__(self, peer_getter, storage_index, logparent=None):
32         self._peer_getter = peer_getter
33         self._found_shares = set()
34         self._storage_index = storage_index
35         self._sharemap = dictutil.DictOfSets()
36         self._readers = set()
37         self._ueb_hash = None
38         self._ueb_data = None
39         self._logparent = logparent
40
41     def log(self, *args, **kwargs):
42         if 'facility' not in kwargs:
43             kwargs['facility'] = "tahoe.helper.chk.checkandUEBfetch"
44         if 'parent' not in kwargs:
45             kwargs['parent'] = self._logparent
46         return log.msg(*args, **kwargs)
47
48     def check(self):
49         d = self._get_all_shareholders(self._storage_index)
50         d.addCallback(self._get_uri_extension)
51         d.addCallback(self._done)
52         return d
53
54     def _get_all_shareholders(self, storage_index):
55         dl = []
56         for s in self._peer_getter(storage_index):
57             d = s.get_rref().callRemote("get_buckets", storage_index)
58             d.addCallbacks(self._got_response, self._got_error,
59                            callbackArgs=(s,))
60             dl.append(d)
61         return defer.DeferredList(dl)
62
63     def _got_response(self, buckets, server):
64         # buckets is a dict: maps shum to an rref of the server who holds it
65         shnums_s = ",".join([str(shnum) for shnum in buckets])
66         self.log("got_response: [%s] has %d shares (%s)" %
67                  (server.get_name(), len(buckets), shnums_s),
68                  level=log.NOISY)
69         self._found_shares.update(buckets.keys())
70         for k in buckets:
71             self._sharemap.add(k, server.get_serverid())
72         self._readers.update( [ (bucket, server)
73                                 for bucket in buckets.values() ] )
74
75     def _got_error(self, f):
76         if f.check(DeadReferenceError):
77             return
78         log.err(f, parent=self._logparent)
79         pass
80
81     def _get_uri_extension(self, res):
82         # assume that we can pull the UEB from any share. If we get an error,
83         # declare the whole file unavailable.
84         if not self._readers:
85             self.log("no readers, so no UEB", level=log.NOISY)
86             return
87         b,server = self._readers.pop()
88         rbp = ReadBucketProxy(b, server, si_b2a(self._storage_index))
89         d = rbp.get_uri_extension()
90         d.addCallback(self._got_uri_extension)
91         d.addErrback(self._ueb_error)
92         return d
93
94     def _got_uri_extension(self, ueb):
95         self.log("_got_uri_extension", level=log.NOISY)
96         self._ueb_hash = hashutil.uri_extension_hash(ueb)
97         self._ueb_data = uri.unpack_extension(ueb)
98
99     def _ueb_error(self, f):
100         # an error means the file is unavailable, but the overall check
101         # shouldn't fail.
102         self.log("UEB fetch failed", failure=f, level=log.WEIRD, umid="sJLKVg")
103         return None
104
105     def _done(self, res):
106         if self._ueb_data:
107             found = len(self._found_shares)
108             total = self._ueb_data['total_shares']
109             self.log(format="got %(found)d shares of %(total)d",
110                      found=found, total=total, level=log.NOISY)
111             if found < total:
112                 # not all shares are present in the grid
113                 self.log("not enough to qualify, file not found in grid",
114                          level=log.NOISY)
115                 return False
116             # all shares are present
117             self.log("all shares present, file is found in grid",
118                      level=log.NOISY)
119             return (self._sharemap, self._ueb_data, self._ueb_hash)
120         # no shares are present
121         self.log("unable to find UEB data, file not found in grid",
122                  level=log.NOISY)
123         return False
124
125
126 class CHKUploadHelper(Referenceable, upload.CHKUploader):
127     """I am the helper-server -side counterpart to AssistedUploader. I handle
128     peer selection, encoding, and share pushing. I read ciphertext from the
129     remote AssistedUploader.
130     """
131     implements(interfaces.RICHKUploadHelper)
132     VERSION = { "http://allmydata.org/tahoe/protocols/helper/chk-upload/v1" :
133                  { },
134                 "application-version": str(allmydata.__full_version__),
135                 }
136
137     def __init__(self, storage_index,
138                  helper, storage_broker, secret_holder,
139                  incoming_file, encoding_file,
140                  log_number):
141         self._storage_index = storage_index
142         self._helper = helper
143         self._incoming_file = incoming_file
144         self._encoding_file = encoding_file
145         self._upload_id = si_b2a(storage_index)[:5]
146         self._log_number = log_number
147         self._upload_status = upload.UploadStatus()
148         self._upload_status.set_helper(False)
149         self._upload_status.set_storage_index(storage_index)
150         self._upload_status.set_status("fetching ciphertext")
151         self._upload_status.set_progress(0, 1.0)
152         self._helper.log("CHKUploadHelper starting for SI %s" % self._upload_id,
153                          parent=log_number)
154
155         self._storage_broker = storage_broker
156         self._secret_holder = secret_holder
157         self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
158                                              self._log_number)
159         self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
160         self._finished_observers = observer.OneShotObserverList()
161
162         self._started = time.time()
163         d = self._fetcher.when_done()
164         d.addCallback(lambda res: self._reader.start())
165         d.addCallback(lambda res: self.start_encrypted(self._reader))
166         d.addCallback(self._finished)
167         d.addErrback(self._failed)
168
169     def log(self, *args, **kwargs):
170         if 'facility' not in kwargs:
171             kwargs['facility'] = "tahoe.helper.chk"
172         return upload.CHKUploader.log(self, *args, **kwargs)
173
174     def remote_get_version(self):
175         return self.VERSION
176
177     def remote_upload(self, reader):
178         # reader is an RIEncryptedUploadable. I am specified to return an
179         # UploadResults dictionary.
180
181         # Log how much ciphertext we need to get.
182         self.log("deciding whether to upload the file or not", level=log.NOISY)
183         if os.path.exists(self._encoding_file):
184             # we have the whole file, and we might be encoding it (or the
185             # encode/upload might have failed, and we need to restart it).
186             self.log("ciphertext already in place", level=log.UNUSUAL)
187         elif os.path.exists(self._incoming_file):
188             # we have some of the file, but not all of it (otherwise we'd be
189             # encoding). The caller might be useful.
190             self.log("partial ciphertext already present", level=log.UNUSUAL)
191         else:
192             # we don't remember uploading this file
193             self.log("no ciphertext yet", level=log.NOISY)
194
195         # let our fetcher pull ciphertext from the reader.
196         self._fetcher.add_reader(reader)
197         # and also hashes
198         self._reader.add_reader(reader)
199
200         # and inform the client when the upload has finished
201         return self._finished_observers.when_fired()
202
203     def _finished(self, ur):
204         assert interfaces.IUploadResults.providedBy(ur), ur
205         vcapstr = ur.get_verifycapstr()
206         precondition(isinstance(vcapstr, str), vcapstr)
207         v = uri.from_string(vcapstr)
208         f_times = self._fetcher.get_times()
209
210         hur = upload.HelperUploadResults()
211         hur.timings = {"cumulative_fetch": f_times["cumulative_fetch"],
212                        "total_fetch": f_times["total"],
213                        }
214         for key,val in ur.get_timings().items():
215             hur.timings[key] = val
216         hur.uri_extension_hash = v.uri_extension_hash
217         hur.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
218         hur.preexisting_shares = ur.get_preexisting_shares()
219         # hur.sharemap needs to be {shnum: set(serverid)}
220         hur.sharemap = {}
221         for shnum, servers in ur.get_sharemap().items():
222             hur.sharemap[shnum] = set([s.get_serverid() for s in servers])
223         # and hur.servermap needs to be {serverid: set(shnum)}
224         hur.servermap = {}
225         for server, shnums in ur.get_servermap().items():
226             hur.servermap[server.get_serverid()] = set(shnums)
227         hur.pushed_shares = ur.get_pushed_shares()
228         hur.file_size = ur.get_file_size()
229         hur.uri_extension_data = ur.get_uri_extension_data()
230         hur.verifycapstr = vcapstr
231
232         self._reader.close()
233         os.unlink(self._encoding_file)
234         self._finished_observers.fire(hur)
235         self._helper.upload_finished(self._storage_index, v.size)
236         del self._reader
237
238     def _failed(self, f):
239         self.log(format="CHKUploadHelper(%(si)s) failed",
240                  si=si_b2a(self._storage_index)[:5],
241                  failure=f,
242                  level=log.UNUSUAL)
243         self._finished_observers.fire(f)
244         self._helper.upload_finished(self._storage_index, 0)
245         del self._reader
246
247 class AskUntilSuccessMixin:
248     # create me with a _reader array
249     _last_failure = None
250
251     def add_reader(self, reader):
252         self._readers.append(reader)
253
254     def call(self, *args, **kwargs):
255         if not self._readers:
256             raise NotEnoughWritersError("ran out of assisted uploaders, last failure was %s" % self._last_failure)
257         rr = self._readers[0]
258         d = rr.callRemote(*args, **kwargs)
259         def _err(f):
260             self._last_failure = f
261             if rr in self._readers:
262                 self._readers.remove(rr)
263             self._upload_helper.log("call to assisted uploader %s failed" % rr,
264                                     failure=f, level=log.UNUSUAL)
265             # we can try again with someone else who's left
266             return self.call(*args, **kwargs)
267         d.addErrback(_err)
268         return d
269
270 class CHKCiphertextFetcher(AskUntilSuccessMixin):
271     """I use one or more remote RIEncryptedUploadable instances to gather
272     ciphertext on disk. When I'm done, the file I create can be used by a
273     LocalCiphertextReader to satisfy the ciphertext needs of a CHK upload
274     process.
275
276     I begin pulling ciphertext as soon as a reader is added. I remove readers
277     when they have any sort of error. If the last reader is removed, I fire
278     my when_done() Deferred with a failure.
279
280     I fire my when_done() Deferred (with None) immediately after I have moved
281     the ciphertext to 'encoded_file'.
282     """
283
284     def __init__(self, helper, incoming_file, encoded_file, logparent):
285         self._upload_helper = helper
286         self._incoming_file = incoming_file
287         self._encoding_file = encoded_file
288         self._upload_id = helper._upload_id
289         self._log_parent = logparent
290         self._done_observers = observer.OneShotObserverList()
291         self._readers = []
292         self._started = False
293         self._f = None
294         self._times = {
295             "cumulative_fetch": 0.0,
296             "total": 0.0,
297             }
298         self._ciphertext_fetched = 0
299
300     def log(self, *args, **kwargs):
301         if "facility" not in kwargs:
302             kwargs["facility"] = "tahoe.helper.chkupload.fetch"
303         if "parent" not in kwargs:
304             kwargs["parent"] = self._log_parent
305         return log.msg(*args, **kwargs)
306
307     def add_reader(self, reader):
308         AskUntilSuccessMixin.add_reader(self, reader)
309         eventually(self._start)
310
311     def _start(self):
312         if self._started:
313             return
314         self._started = True
315         started = time.time()
316
317         if os.path.exists(self._encoding_file):
318             self.log("ciphertext already present, bypassing fetch",
319                      level=log.UNUSUAL)
320             # XXX the following comment is probably stale, since
321             # LocalCiphertextReader.get_plaintext_hashtree_leaves does not exist.
322             #
323             # we'll still need the plaintext hashes (when
324             # LocalCiphertextReader.get_plaintext_hashtree_leaves() is
325             # called), and currently the easiest way to get them is to ask
326             # the sender for the last byte of ciphertext. That will provoke
327             # them into reading and hashing (but not sending) everything
328             # else.
329             have = os.stat(self._encoding_file)[stat.ST_SIZE]
330             d = self.call("read_encrypted", have-1, 1)
331         else:
332             # first, find out how large the file is going to be
333             d = self.call("get_size")
334             d.addCallback(self._got_size)
335             d.addCallback(self._start_reading)
336             d.addCallback(self._done)
337         d.addCallback(self._done2, started)
338         d.addErrback(self._failed)
339
340     def _got_size(self, size):
341         self.log("total size is %d bytes" % size, level=log.NOISY)
342         self._upload_helper._upload_status.set_size(size)
343         self._expected_size = size
344
345     def _start_reading(self, res):
346         # then find out how much crypttext we have on disk
347         if os.path.exists(self._incoming_file):
348             self._have = os.stat(self._incoming_file)[stat.ST_SIZE]
349             self._upload_helper._helper.count("chk_upload_helper.resumes")
350             self.log("we already have %d bytes" % self._have, level=log.NOISY)
351         else:
352             self._have = 0
353             self.log("we do not have any ciphertext yet", level=log.NOISY)
354         self.log("starting ciphertext fetch", level=log.NOISY)
355         self._f = open(self._incoming_file, "ab")
356
357         # now loop to pull the data from the readers
358         d = defer.Deferred()
359         self._loop(d)
360         # this Deferred will be fired once the last byte has been written to
361         # self._f
362         return d
363
364     # read data in 50kB chunks. We should choose a more considered number
365     # here, possibly letting the client specify it. The goal should be to
366     # keep the RTT*bandwidth to be less than 10% of the chunk size, to reduce
367     # the upload bandwidth lost because this protocol is non-windowing. Too
368     # large, however, means more memory consumption for both ends. Something
369     # that can be transferred in, say, 10 seconds sounds about right. On my
370     # home DSL line (50kBps upstream), that suggests 500kB. Most lines are
371     # slower, maybe 10kBps, which suggests 100kB, and that's a bit more
372     # memory than I want to hang on to, so I'm going to go with 50kB and see
373     # how that works.
374     CHUNK_SIZE = 50*1024
375
376     def _loop(self, fire_when_done):
377         # this slightly weird structure is needed because Deferreds don't do
378         # tail-recursion, so it is important to let each one retire promptly.
379         # Simply chaining them will cause a stack overflow at the end of a
380         # transfer that involves more than a few hundred chunks.
381         # 'fire_when_done' lives a long time, but the Deferreds returned by
382         # the inner _fetch() call do not.
383         start = time.time()
384         d = defer.maybeDeferred(self._fetch)
385         def _done(finished):
386             elapsed = time.time() - start
387             self._times["cumulative_fetch"] += elapsed
388             if finished:
389                 self.log("finished reading ciphertext", level=log.NOISY)
390                 fire_when_done.callback(None)
391             else:
392                 self._loop(fire_when_done)
393         def _err(f):
394             self.log(format="[%(si)s] ciphertext read failed",
395                      si=self._upload_id, failure=f, level=log.UNUSUAL)
396             fire_when_done.errback(f)
397         d.addCallbacks(_done, _err)
398         return None
399
400     def _fetch(self):
401         needed = self._expected_size - self._have
402         fetch_size = min(needed, self.CHUNK_SIZE)
403         if fetch_size == 0:
404             self._upload_helper._upload_status.set_progress(1, 1.0)
405             return True # all done
406         percent = 0.0
407         if self._expected_size:
408             percent = 1.0 * (self._have+fetch_size) / self._expected_size
409         self.log(format="fetching [%(si)s] %(start)d-%(end)d of %(total)d (%(percent)d%%)",
410                  si=self._upload_id,
411                  start=self._have,
412                  end=self._have+fetch_size,
413                  total=self._expected_size,
414                  percent=int(100.0*percent),
415                  level=log.NOISY)
416         d = self.call("read_encrypted", self._have, fetch_size)
417         def _got_data(ciphertext_v):
418             for data in ciphertext_v:
419                 self._f.write(data)
420                 self._have += len(data)
421                 self._ciphertext_fetched += len(data)
422                 self._upload_helper._helper.count("chk_upload_helper.fetched_bytes", len(data))
423                 self._upload_helper._upload_status.set_progress(1, percent)
424             return False # not done
425         d.addCallback(_got_data)
426         return d
427
428     def _done(self, res):
429         self._f.close()
430         self._f = None
431         self.log(format="done fetching ciphertext, size=%(size)d",
432                  size=os.stat(self._incoming_file)[stat.ST_SIZE],
433                  level=log.NOISY)
434         os.rename(self._incoming_file, self._encoding_file)
435
436     def _done2(self, _ignored, started):
437         self.log("done2", level=log.NOISY)
438         elapsed = time.time() - started
439         self._times["total"] = elapsed
440         self._readers = []
441         self._done_observers.fire(None)
442
443     def _failed(self, f):
444         if self._f:
445             self._f.close()
446         self._readers = []
447         self._done_observers.fire(f)
448
449     def when_done(self):
450         return self._done_observers.when_fired()
451
452     def get_times(self):
453         return self._times
454
455     def get_ciphertext_fetched(self):
456         return self._ciphertext_fetched
457
458
459 class LocalCiphertextReader(AskUntilSuccessMixin):
460     implements(interfaces.IEncryptedUploadable)
461
462     def __init__(self, upload_helper, storage_index, encoding_file):
463         self._readers = []
464         self._upload_helper = upload_helper
465         self._storage_index = storage_index
466         self._encoding_file = encoding_file
467         self._status = None
468
469     def start(self):
470         self._upload_helper._upload_status.set_status("pushing")
471         self._size = os.stat(self._encoding_file)[stat.ST_SIZE]
472         self.f = open(self._encoding_file, "rb")
473
474     def get_size(self):
475         return defer.succeed(self._size)
476
477     def get_all_encoding_parameters(self):
478         return self.call("get_all_encoding_parameters")
479
480     def get_storage_index(self):
481         return defer.succeed(self._storage_index)
482
483     def read_encrypted(self, length, hash_only):
484         assert hash_only is False
485         d = defer.maybeDeferred(self.f.read, length)
486         d.addCallback(lambda data: [data])
487         return d
488
489     def close(self):
490         self.f.close()
491         # ??. I'm not sure if it makes sense to forward the close message.
492         return self.call("close")
493
494
495
496 class Helper(Referenceable):
497     implements(interfaces.RIHelper, interfaces.IStatsProducer)
498     # this is the non-distributed version. When we need to have multiple
499     # helpers, this object will become the HelperCoordinator, and will query
500     # the farm of Helpers to see if anyone has the storage_index of interest,
501     # and send the request off to them. If nobody has it, we'll choose a
502     # helper at random.
503
504     name = "helper"
505     VERSION = { "http://allmydata.org/tahoe/protocols/helper/v1" :
506                  { },
507                 "application-version": str(allmydata.__full_version__),
508                 }
509     MAX_UPLOAD_STATUSES = 10
510
511     def __init__(self, basedir, storage_broker, secret_holder,
512                  stats_provider, history):
513         self._basedir = basedir
514         self._storage_broker = storage_broker
515         self._secret_holder = secret_holder
516         self._chk_incoming = os.path.join(basedir, "CHK_incoming")
517         self._chk_encoding = os.path.join(basedir, "CHK_encoding")
518         fileutil.make_dirs(self._chk_incoming)
519         fileutil.make_dirs(self._chk_encoding)
520         self._active_uploads = {}
521         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
522         self.stats_provider = stats_provider
523         if stats_provider:
524             stats_provider.register_producer(self)
525         self._counters = {"chk_upload_helper.upload_requests": 0,
526                           "chk_upload_helper.upload_already_present": 0,
527                           "chk_upload_helper.upload_need_upload": 0,
528                           "chk_upload_helper.resumes": 0,
529                           "chk_upload_helper.fetched_bytes": 0,
530                           "chk_upload_helper.encoded_bytes": 0,
531                           }
532         self._history = history
533
534     def log(self, *args, **kwargs):
535         if 'facility' not in kwargs:
536             kwargs['facility'] = "tahoe.helper"
537         return log.msg(*args, **kwargs)
538
539     def count(self, key, value=1):
540         if self.stats_provider:
541             self.stats_provider.count(key, value)
542         self._counters[key] += value
543
544     def get_stats(self):
545         OLD = 86400*2 # 48hours
546         now = time.time()
547         inc_count = inc_size = inc_size_old = 0
548         enc_count = enc_size = enc_size_old = 0
549         inc = os.listdir(self._chk_incoming)
550         enc = os.listdir(self._chk_encoding)
551         for f in inc:
552             s = os.stat(os.path.join(self._chk_incoming, f))
553             size = s[stat.ST_SIZE]
554             mtime = s[stat.ST_MTIME]
555             inc_count += 1
556             inc_size += size
557             if now - mtime > OLD:
558                 inc_size_old += size
559         for f in enc:
560             s = os.stat(os.path.join(self._chk_encoding, f))
561             size = s[stat.ST_SIZE]
562             mtime = s[stat.ST_MTIME]
563             enc_count += 1
564             enc_size += size
565             if now - mtime > OLD:
566                 enc_size_old += size
567         stats = { 'chk_upload_helper.active_uploads': len(self._active_uploads),
568                   'chk_upload_helper.incoming_count': inc_count,
569                   'chk_upload_helper.incoming_size': inc_size,
570                   'chk_upload_helper.incoming_size_old': inc_size_old,
571                   'chk_upload_helper.encoding_count': enc_count,
572                   'chk_upload_helper.encoding_size': enc_size,
573                   'chk_upload_helper.encoding_size_old': enc_size_old,
574                   }
575         stats.update(self._counters)
576         return stats
577
578     def remote_get_version(self):
579         return self.VERSION
580
581     def remote_upload_chk(self, storage_index):
582         self.count("chk_upload_helper.upload_requests")
583         lp = self.log(format="helper: upload_chk query for SI %(si)s",
584                       si=si_b2a(storage_index))
585         if storage_index in self._active_uploads:
586             self.log("upload is currently active", parent=lp)
587             uh = self._active_uploads[storage_index]
588             return (None, uh)
589
590         d = self._check_chk(storage_index, lp)
591         d.addCallback(self._did_chk_check, storage_index, lp)
592         def _err(f):
593             self.log("error while checking for chk-already-in-grid",
594                      failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
595             return f
596         d.addErrback(_err)
597         return d
598
599     def _check_chk(self, storage_index, lp):
600         # see if this file is already in the grid
601         lp2 = self.log("doing a quick check+UEBfetch",
602                        parent=lp, level=log.NOISY)
603         sb = self._storage_broker
604         c = CHKCheckerAndUEBFetcher(sb.get_servers_for_psi, storage_index, lp2)
605         d = c.check()
606         def _checked(res):
607             if res:
608                 (sharemap, ueb_data, ueb_hash) = res
609                 self.log("found file in grid", level=log.NOISY, parent=lp)
610                 hur = upload.HelperUploadResults()
611                 hur.uri_extension_hash = ueb_hash
612                 hur.sharemap = sharemap
613                 hur.uri_extension_data = ueb_data
614                 hur.preexisting_shares = len(sharemap)
615                 hur.pushed_shares = 0
616                 return hur
617             return None
618         d.addCallback(_checked)
619         return d
620
621     def _did_chk_check(self, already_present, storage_index, lp):
622         if already_present:
623             # the necessary results are placed in the UploadResults
624             self.count("chk_upload_helper.upload_already_present")
625             self.log("file already found in grid", parent=lp)
626             return (already_present, None)
627
628         self.count("chk_upload_helper.upload_need_upload")
629         # the file is not present in the grid, by which we mean there are
630         # less than 'N' shares available.
631         self.log("unable to find file in the grid", parent=lp,
632                  level=log.NOISY)
633         # We need an upload helper. Check our active uploads again in
634         # case there was a race.
635         if storage_index in self._active_uploads:
636             self.log("upload is currently active", parent=lp)
637             uh = self._active_uploads[storage_index]
638         else:
639             self.log("creating new upload helper", parent=lp)
640             uh = self._make_chk_upload_helper(storage_index, lp)
641             self._active_uploads[storage_index] = uh
642             self._add_upload(uh)
643         return (None, uh)
644
645     def _make_chk_upload_helper(self, storage_index, lp):
646         si_s = si_b2a(storage_index)
647         incoming_file = os.path.join(self._chk_incoming, si_s)
648         encoding_file = os.path.join(self._chk_encoding, si_s)
649         uh = CHKUploadHelper(storage_index, self,
650                              self._storage_broker,
651                              self._secret_holder,
652                              incoming_file, encoding_file,
653                              lp)
654         return uh
655
656     def _add_upload(self, uh):
657         self._all_uploads[uh] = None
658         if self._history:
659             s = uh.get_upload_status()
660             self._history.notify_helper_upload(s)
661
662     def upload_finished(self, storage_index, size):
663         # this is called with size=0 if the upload failed
664         self.count("chk_upload_helper.encoded_bytes", size)
665         uh = self._active_uploads[storage_index]
666         del self._active_uploads[storage_index]
667         s = uh.get_upload_status()
668         s.set_active(False)