]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/offloaded.py
c8da456de6098b4b7c9d2a7134817a16cbb226e8
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / offloaded.py
1
2 import os, stat, time, weakref
3 from zope.interface import implements
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap.api import Referenceable, DeadReferenceError, eventually
7 import allmydata # for __full_version__
8 from allmydata import interfaces, uri
9 from allmydata.storage.server import si_b2a
10 from allmydata.immutable import upload
11 from allmydata.immutable.layout import ReadBucketProxy
12 from allmydata.util.assertutil import precondition
13 from allmydata.util import idlib, log, observer, fileutil, hashutil, dictutil
14
15
16 class NotEnoughWritersError(Exception):
17     pass
18
19
20 class CHKCheckerAndUEBFetcher:
21     """I check to see if a file is already present in the grid. I also fetch
22     the URI Extension Block, which is useful for an uploading client who
23     wants to avoid the work of encryption and encoding.
24
25     I return False if the file is not completely healthy: i.e. if there are
26     less than 'N' shares present.
27
28     If the file is completely healthy, I return a tuple of (sharemap,
29     UEB_data, UEB_hash).
30     """
31
32     def __init__(self, peer_getter, storage_index, logparent=None):
33         self._peer_getter = peer_getter
34         self._found_shares = set()
35         self._storage_index = storage_index
36         self._sharemap = dictutil.DictOfSets()
37         self._readers = set()
38         self._ueb_hash = None
39         self._ueb_data = None
40         self._logparent = logparent
41
42     def log(self, *args, **kwargs):
43         if 'facility' not in kwargs:
44             kwargs['facility'] = "tahoe.helper.chk.checkandUEBfetch"
45         if 'parent' not in kwargs:
46             kwargs['parent'] = self._logparent
47         return log.msg(*args, **kwargs)
48
49     def check(self):
50         d = self._get_all_shareholders(self._storage_index)
51         d.addCallback(self._get_uri_extension)
52         d.addCallback(self._done)
53         return d
54
55     def _get_all_shareholders(self, storage_index):
56         dl = []
57         for (peerid, ss) in self._peer_getter("storage", storage_index):
58             d = ss.callRemote("get_buckets", storage_index)
59             d.addCallbacks(self._got_response, self._got_error,
60                            callbackArgs=(peerid,))
61             dl.append(d)
62         return defer.DeferredList(dl)
63
64     def _got_response(self, buckets, peerid):
65         # buckets is a dict: maps shum to an rref of the server who holds it
66         shnums_s = ",".join([str(shnum) for shnum in buckets])
67         self.log("got_response: [%s] has %d shares (%s)" %
68                  (idlib.shortnodeid_b2a(peerid), len(buckets), shnums_s),
69                  level=log.NOISY)
70         self._found_shares.update(buckets.keys())
71         for k in buckets:
72             self._sharemap.add(k, peerid)
73         self._readers.update( [ (bucket, peerid)
74                                 for bucket in buckets.values() ] )
75
76     def _got_error(self, f):
77         if f.check(DeadReferenceError):
78             return
79         log.err(f, parent=self._logparent)
80         pass
81
82     def _get_uri_extension(self, res):
83         # assume that we can pull the UEB from any share. If we get an error,
84         # declare the whole file unavailable.
85         if not self._readers:
86             self.log("no readers, so no UEB", level=log.NOISY)
87             return
88         b,peerid = self._readers.pop()
89         rbp = ReadBucketProxy(b, peerid, si_b2a(self._storage_index))
90         d = rbp.get_uri_extension()
91         d.addCallback(self._got_uri_extension)
92         d.addErrback(self._ueb_error)
93         return d
94
95     def _got_uri_extension(self, ueb):
96         self.log("_got_uri_extension", level=log.NOISY)
97         self._ueb_hash = hashutil.uri_extension_hash(ueb)
98         self._ueb_data = uri.unpack_extension(ueb)
99
100     def _ueb_error(self, f):
101         # an error means the file is unavailable, but the overall check
102         # shouldn't fail.
103         self.log("UEB fetch failed", failure=f, level=log.WEIRD, umid="sJLKVg")
104         return None
105
106     def _done(self, res):
107         if self._ueb_data:
108             found = len(self._found_shares)
109             total = self._ueb_data['total_shares']
110             self.log(format="got %(found)d shares of %(total)d",
111                      found=found, total=total, level=log.NOISY)
112             if found < total:
113                 # not all shares are present in the grid
114                 self.log("not enough to qualify, file not found in grid",
115                          level=log.NOISY)
116                 return False
117             # all shares are present
118             self.log("all shares present, file is found in grid",
119                      level=log.NOISY)
120             return (self._sharemap, self._ueb_data, self._ueb_hash)
121         # no shares are present
122         self.log("unable to find UEB data, file not found in grid",
123                  level=log.NOISY)
124         return False
125
126
127 class CHKUploadHelper(Referenceable, upload.CHKUploader):
128     """I am the helper-server -side counterpart to AssistedUploader. I handle
129     peer selection, encoding, and share pushing. I read ciphertext from the
130     remote AssistedUploader.
131     """
132     implements(interfaces.RICHKUploadHelper)
133     VERSION = { "http://allmydata.org/tahoe/protocols/helper/chk-upload/v1" :
134                  { },
135                 "application-version": str(allmydata.__full_version__),
136                 }
137
138     def __init__(self, storage_index, helper,
139                  incoming_file, encoding_file,
140                  results, log_number):
141         self._storage_index = storage_index
142         self._helper = helper
143         self._incoming_file = incoming_file
144         self._encoding_file = encoding_file
145         self._upload_id = si_b2a(storage_index)[:5]
146         self._log_number = log_number
147         self._results = results
148         self._upload_status = upload.UploadStatus()
149         self._upload_status.set_helper(False)
150         self._upload_status.set_storage_index(storage_index)
151         self._upload_status.set_status("fetching ciphertext")
152         self._upload_status.set_progress(0, 1.0)
153         self._helper.log("CHKUploadHelper starting for SI %s" % self._upload_id,
154                          parent=log_number)
155
156         self._client = helper.parent
157         self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
158                                              self._log_number)
159         self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
160         self._finished_observers = observer.OneShotObserverList()
161
162         d = self._fetcher.when_done()
163         d.addCallback(lambda res: self._reader.start())
164         d.addCallback(lambda res: self.start_encrypted(self._reader))
165         d.addCallback(self._finished)
166         d.addErrback(self._failed)
167
168     def log(self, *args, **kwargs):
169         if 'facility' not in kwargs:
170             kwargs['facility'] = "tahoe.helper.chk"
171         return upload.CHKUploader.log(self, *args, **kwargs)
172
173     def start(self):
174         self._started = time.time()
175         # determine if we need to upload the file. If so, return ({},self) .
176         # If not, return (UploadResults,None) .
177         self.log("deciding whether to upload the file or not", level=log.NOISY)
178         if os.path.exists(self._encoding_file):
179             # we have the whole file, and we might be encoding it (or the
180             # encode/upload might have failed, and we need to restart it).
181             self.log("ciphertext already in place", level=log.UNUSUAL)
182             return (self._results, self)
183         if os.path.exists(self._incoming_file):
184             # we have some of the file, but not all of it (otherwise we'd be
185             # encoding). The caller might be useful.
186             self.log("partial ciphertext already present", level=log.UNUSUAL)
187             return (self._results, self)
188         # we don't remember uploading this file
189         self.log("no ciphertext yet", level=log.NOISY)
190         return (self._results, self)
191
192     def remote_get_version(self):
193         return self.VERSION
194
195     def remote_upload(self, reader):
196         # reader is an RIEncryptedUploadable. I am specified to return an
197         # UploadResults dictionary.
198
199         # let our fetcher pull ciphertext from the reader.
200         self._fetcher.add_reader(reader)
201         # and also hashes
202         self._reader.add_reader(reader)
203
204         # and inform the client when the upload has finished
205         return self._finished_observers.when_fired()
206
207     def _finished(self, uploadresults):
208         precondition(isinstance(uploadresults.verifycapstr, str), uploadresults.verifycapstr)
209         assert interfaces.IUploadResults.providedBy(uploadresults), uploadresults
210         r = uploadresults
211         v = uri.from_string(r.verifycapstr)
212         r.uri_extension_hash = v.uri_extension_hash
213         f_times = self._fetcher.get_times()
214         r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
215         r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
216         r.timings["total_fetch"] = f_times["total"]
217         self._reader.close()
218         os.unlink(self._encoding_file)
219         self._finished_observers.fire(r)
220         self._helper.upload_finished(self._storage_index, v.size)
221         del self._reader
222
223     def _failed(self, f):
224         self.log(format="CHKUploadHelper(%(si)s) failed",
225                  si=si_b2a(self._storage_index)[:5],
226                  failure=f,
227                  level=log.UNUSUAL)
228         self._finished_observers.fire(f)
229         self._helper.upload_finished(self._storage_index, 0)
230         del self._reader
231
232 class AskUntilSuccessMixin:
233     # create me with a _reader array
234     _last_failure = None
235
236     def add_reader(self, reader):
237         self._readers.append(reader)
238
239     def call(self, *args, **kwargs):
240         if not self._readers:
241             raise NotEnoughWritersError("ran out of assisted uploaders, last failure was %s" % self._last_failure)
242         rr = self._readers[0]
243         d = rr.callRemote(*args, **kwargs)
244         def _err(f):
245             self._last_failure = f
246             if rr in self._readers:
247                 self._readers.remove(rr)
248             self._upload_helper.log("call to assisted uploader %s failed" % rr,
249                                     failure=f, level=log.UNUSUAL)
250             # we can try again with someone else who's left
251             return self.call(*args, **kwargs)
252         d.addErrback(_err)
253         return d
254
255 class CHKCiphertextFetcher(AskUntilSuccessMixin):
256     """I use one or more remote RIEncryptedUploadable instances to gather
257     ciphertext on disk. When I'm done, the file I create can be used by a
258     LocalCiphertextReader to satisfy the ciphertext needs of a CHK upload
259     process.
260
261     I begin pulling ciphertext as soon as a reader is added. I remove readers
262     when they have any sort of error. If the last reader is removed, I fire
263     my when_done() Deferred with a failure.
264
265     I fire my when_done() Deferred (with None) immediately after I have moved
266     the ciphertext to 'encoded_file'.
267     """
268
269     def __init__(self, helper, incoming_file, encoded_file, logparent):
270         self._upload_helper = helper
271         self._incoming_file = incoming_file
272         self._encoding_file = encoded_file
273         self._upload_id = helper._upload_id
274         self._log_parent = logparent
275         self._done_observers = observer.OneShotObserverList()
276         self._readers = []
277         self._started = False
278         self._f = None
279         self._times = {
280             "cumulative_fetch": 0.0,
281             "total": 0.0,
282             }
283         self._ciphertext_fetched = 0
284
285     def log(self, *args, **kwargs):
286         if "facility" not in kwargs:
287             kwargs["facility"] = "tahoe.helper.chkupload.fetch"
288         if "parent" not in kwargs:
289             kwargs["parent"] = self._log_parent
290         return log.msg(*args, **kwargs)
291
292     def add_reader(self, reader):
293         AskUntilSuccessMixin.add_reader(self, reader)
294         eventually(self._start)
295
296     def _start(self):
297         if self._started:
298             return
299         self._started = True
300         started = time.time()
301
302         if os.path.exists(self._encoding_file):
303             self.log("ciphertext already present, bypassing fetch",
304                      level=log.UNUSUAL)
305             # we'll still need the plaintext hashes (when
306             # LocalCiphertextReader.get_plaintext_hashtree_leaves() is
307             # called), and currently the easiest way to get them is to ask
308             # the sender for the last byte of ciphertext. That will provoke
309             # them into reading and hashing (but not sending) everything
310             # else.
311             have = os.stat(self._encoding_file)[stat.ST_SIZE]
312             d = self.call("read_encrypted", have-1, 1)
313             d.addCallback(self._done2, started)
314             return
315
316         # first, find out how large the file is going to be
317         d = self.call("get_size")
318         d.addCallback(self._got_size)
319         d.addCallback(self._start_reading)
320         d.addCallback(self._done)
321         d.addCallback(self._done2, started)
322         d.addErrback(self._failed)
323
324     def _got_size(self, size):
325         self.log("total size is %d bytes" % size, level=log.NOISY)
326         self._upload_helper._upload_status.set_size(size)
327         self._expected_size = size
328
329     def _start_reading(self, res):
330         # then find out how much crypttext we have on disk
331         if os.path.exists(self._incoming_file):
332             self._have = os.stat(self._incoming_file)[stat.ST_SIZE]
333             self._upload_helper._helper.count("chk_upload_helper.resumes")
334             self.log("we already have %d bytes" % self._have, level=log.NOISY)
335         else:
336             self._have = 0
337             self.log("we do not have any ciphertext yet", level=log.NOISY)
338         self.log("starting ciphertext fetch", level=log.NOISY)
339         self._f = open(self._incoming_file, "ab")
340
341         # now loop to pull the data from the readers
342         d = defer.Deferred()
343         self._loop(d)
344         # this Deferred will be fired once the last byte has been written to
345         # self._f
346         return d
347
348     # read data in 50kB chunks. We should choose a more considered number
349     # here, possibly letting the client specify it. The goal should be to
350     # keep the RTT*bandwidth to be less than 10% of the chunk size, to reduce
351     # the upload bandwidth lost because this protocol is non-windowing. Too
352     # large, however, means more memory consumption for both ends. Something
353     # that can be transferred in, say, 10 seconds sounds about right. On my
354     # home DSL line (50kBps upstream), that suggests 500kB. Most lines are
355     # slower, maybe 10kBps, which suggests 100kB, and that's a bit more
356     # memory than I want to hang on to, so I'm going to go with 50kB and see
357     # how that works.
358     CHUNK_SIZE = 50*1024
359
360     def _loop(self, fire_when_done):
361         # this slightly weird structure is needed because Deferreds don't do
362         # tail-recursion, so it is important to let each one retire promptly.
363         # Simply chaining them will cause a stack overflow at the end of a
364         # transfer that involves more than a few hundred chunks.
365         # 'fire_when_done' lives a long time, but the Deferreds returned by
366         # the inner _fetch() call do not.
367         start = time.time()
368         d = defer.maybeDeferred(self._fetch)
369         def _done(finished):
370             elapsed = time.time() - start
371             self._times["cumulative_fetch"] += elapsed
372             if finished:
373                 self.log("finished reading ciphertext", level=log.NOISY)
374                 fire_when_done.callback(None)
375             else:
376                 self._loop(fire_when_done)
377         def _err(f):
378             self.log(format="[%(si)s] ciphertext read failed",
379                      si=self._upload_id, failure=f, level=log.UNUSUAL)
380             fire_when_done.errback(f)
381         d.addCallbacks(_done, _err)
382         return None
383
384     def _fetch(self):
385         needed = self._expected_size - self._have
386         fetch_size = min(needed, self.CHUNK_SIZE)
387         if fetch_size == 0:
388             self._upload_helper._upload_status.set_progress(1, 1.0)
389             return True # all done
390         percent = 0.0
391         if self._expected_size:
392             percent = 1.0 * (self._have+fetch_size) / self._expected_size
393         self.log(format="fetching [%(si)s] %(start)d-%(end)d of %(total)d (%(percent)d%%)",
394                  si=self._upload_id,
395                  start=self._have,
396                  end=self._have+fetch_size,
397                  total=self._expected_size,
398                  percent=int(100.0*percent),
399                  level=log.NOISY)
400         d = self.call("read_encrypted", self._have, fetch_size)
401         def _got_data(ciphertext_v):
402             for data in ciphertext_v:
403                 self._f.write(data)
404                 self._have += len(data)
405                 self._ciphertext_fetched += len(data)
406                 self._upload_helper._helper.count("chk_upload_helper.fetched_bytes", len(data))
407                 self._upload_helper._upload_status.set_progress(1, percent)
408             return False # not done
409         d.addCallback(_got_data)
410         return d
411
412     def _done(self, res):
413         self._f.close()
414         self._f = None
415         self.log(format="done fetching ciphertext, size=%(size)d",
416                  size=os.stat(self._incoming_file)[stat.ST_SIZE],
417                  level=log.NOISY)
418         os.rename(self._incoming_file, self._encoding_file)
419
420     def _done2(self, _ignored, started):
421         self.log("done2", level=log.NOISY)
422         elapsed = time.time() - started
423         self._times["total"] = elapsed
424         self._readers = []
425         self._done_observers.fire(None)
426
427     def _failed(self, f):
428         if self._f:
429             self._f.close()
430         self._readers = []
431         self._done_observers.fire(f)
432
433     def when_done(self):
434         return self._done_observers.when_fired()
435
436     def get_times(self):
437         return self._times
438
439     def get_ciphertext_fetched(self):
440         return self._ciphertext_fetched
441
442
443 class LocalCiphertextReader(AskUntilSuccessMixin):
444     implements(interfaces.IEncryptedUploadable)
445
446     def __init__(self, upload_helper, storage_index, encoding_file):
447         self._readers = []
448         self._upload_helper = upload_helper
449         self._storage_index = storage_index
450         self._encoding_file = encoding_file
451         self._status = None
452
453     def start(self):
454         self._upload_helper._upload_status.set_status("pushing")
455         self._size = os.stat(self._encoding_file)[stat.ST_SIZE]
456         self.f = open(self._encoding_file, "rb")
457
458     def get_size(self):
459         return defer.succeed(self._size)
460
461     def get_all_encoding_parameters(self):
462         return self.call("get_all_encoding_parameters")
463
464     def get_storage_index(self):
465         return defer.succeed(self._storage_index)
466
467     def read_encrypted(self, length, hash_only):
468         assert hash_only is False
469         d = defer.maybeDeferred(self.f.read, length)
470         d.addCallback(lambda data: [data])
471         return d
472     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
473         return self.call("get_plaintext_hashtree_leaves", first, last,
474                          num_segments)
475     def get_plaintext_hash(self):
476         return self.call("get_plaintext_hash")
477     def close(self):
478         self.f.close()
479         # ??. I'm not sure if it makes sense to forward the close message.
480         return self.call("close")
481
482
483
484 class Helper(Referenceable, service.MultiService):
485     implements(interfaces.RIHelper, interfaces.IStatsProducer)
486     # this is the non-distributed version. When we need to have multiple
487     # helpers, this object will become the HelperCoordinator, and will query
488     # the farm of Helpers to see if anyone has the storage_index of interest,
489     # and send the request off to them. If nobody has it, we'll choose a
490     # helper at random.
491
492     name = "helper"
493     VERSION = { "http://allmydata.org/tahoe/protocols/helper/v1" :
494                  { },
495                 "application-version": str(allmydata.__full_version__),
496                 }
497     chk_upload_helper_class = CHKUploadHelper
498     MAX_UPLOAD_STATUSES = 10
499
500     def __init__(self, basedir, stats_provider=None):
501         self._basedir = basedir
502         self._chk_incoming = os.path.join(basedir, "CHK_incoming")
503         self._chk_encoding = os.path.join(basedir, "CHK_encoding")
504         fileutil.make_dirs(self._chk_incoming)
505         fileutil.make_dirs(self._chk_encoding)
506         self._active_uploads = {}
507         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
508         self._all_upload_statuses = weakref.WeakKeyDictionary()
509         self._recent_upload_statuses = []
510         self.stats_provider = stats_provider
511         if stats_provider:
512             stats_provider.register_producer(self)
513         self._counters = {"chk_upload_helper.upload_requests": 0,
514                           "chk_upload_helper.upload_already_present": 0,
515                           "chk_upload_helper.upload_need_upload": 0,
516                           "chk_upload_helper.resumes": 0,
517                           "chk_upload_helper.fetched_bytes": 0,
518                           "chk_upload_helper.encoded_bytes": 0,
519                           }
520         service.MultiService.__init__(self)
521
522     def setServiceParent(self, parent):
523         service.MultiService.setServiceParent(self, parent)
524
525     def log(self, *args, **kwargs):
526         if 'facility' not in kwargs:
527             kwargs['facility'] = "tahoe.helper"
528         return self.parent.log(*args, **kwargs)
529
530     def count(self, key, value=1):
531         if self.stats_provider:
532             self.stats_provider.count(key, value)
533         self._counters[key] += value
534
535     def get_stats(self):
536         OLD = 86400*2 # 48hours
537         now = time.time()
538         inc_count = inc_size = inc_size_old = 0
539         enc_count = enc_size = enc_size_old = 0
540         inc = os.listdir(self._chk_incoming)
541         enc = os.listdir(self._chk_encoding)
542         for f in inc:
543             s = os.stat(os.path.join(self._chk_incoming, f))
544             size = s[stat.ST_SIZE]
545             mtime = s[stat.ST_MTIME]
546             inc_count += 1
547             inc_size += size
548             if now - mtime > OLD:
549                 inc_size_old += size
550         for f in enc:
551             s = os.stat(os.path.join(self._chk_encoding, f))
552             size = s[stat.ST_SIZE]
553             mtime = s[stat.ST_MTIME]
554             enc_count += 1
555             enc_size += size
556             if now - mtime > OLD:
557                 enc_size_old += size
558         stats = { 'chk_upload_helper.active_uploads': len(self._active_uploads),
559                   'chk_upload_helper.incoming_count': inc_count,
560                   'chk_upload_helper.incoming_size': inc_size,
561                   'chk_upload_helper.incoming_size_old': inc_size_old,
562                   'chk_upload_helper.encoding_count': enc_count,
563                   'chk_upload_helper.encoding_size': enc_size,
564                   'chk_upload_helper.encoding_size_old': enc_size_old,
565                   }
566         stats.update(self._counters)
567         return stats
568
569     def remote_get_version(self):
570         return self.VERSION
571
572     def remote_upload_chk(self, storage_index):
573         self.count("chk_upload_helper.upload_requests")
574         r = upload.UploadResults()
575         started = time.time()
576         si_s = si_b2a(storage_index)
577         lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
578         incoming_file = os.path.join(self._chk_incoming, si_s)
579         encoding_file = os.path.join(self._chk_encoding, si_s)
580         if storage_index in self._active_uploads:
581             self.log("upload is currently active", parent=lp)
582             uh = self._active_uploads[storage_index]
583             return uh.start()
584
585         d = self._check_for_chk_already_in_grid(storage_index, r, lp)
586         def _checked(already_present):
587             elapsed = time.time() - started
588             r.timings['existence_check'] = elapsed
589             if already_present:
590                 # the necessary results are placed in the UploadResults
591                 self.count("chk_upload_helper.upload_already_present")
592                 self.log("file already found in grid", parent=lp)
593                 return (r, None)
594
595             self.count("chk_upload_helper.upload_need_upload")
596             # the file is not present in the grid, by which we mean there are
597             # less than 'N' shares available.
598             self.log("unable to find file in the grid", parent=lp,
599                      level=log.NOISY)
600             # We need an upload helper. Check our active uploads again in
601             # case there was a race.
602             if storage_index in self._active_uploads:
603                 self.log("upload is currently active", parent=lp)
604                 uh = self._active_uploads[storage_index]
605             else:
606                 self.log("creating new upload helper", parent=lp)
607                 uh = self.chk_upload_helper_class(storage_index, self,
608                                                   incoming_file, encoding_file,
609                                                   r, lp)
610                 self._active_uploads[storage_index] = uh
611                 self._add_upload(uh)
612             return uh.start()
613         d.addCallback(_checked)
614         def _err(f):
615             self.log("error while checking for chk-already-in-grid",
616                      failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
617             return f
618         d.addErrback(_err)
619         return d
620
621     def _check_for_chk_already_in_grid(self, storage_index, results, lp):
622         # see if this file is already in the grid
623         lp2 = self.log("doing a quick check+UEBfetch",
624                        parent=lp, level=log.NOISY)
625         c = CHKCheckerAndUEBFetcher(self.parent.get_permuted_peers,
626                                     storage_index, lp2)
627         d = c.check()
628         def _checked(res):
629             if res:
630                 (sharemap, ueb_data, ueb_hash) = res
631                 self.log("found file in grid", level=log.NOISY, parent=lp)
632                 results.uri_extension_hash = ueb_hash
633                 results.sharemap = sharemap
634                 results.uri_extension_data = ueb_data
635                 results.preexisting_shares = len(sharemap)
636                 results.pushed_shares = 0
637                 return True
638             return False
639         d.addCallback(_checked)
640         return d
641
642     def _add_upload(self, uh):
643         self._all_uploads[uh] = None
644         s = uh.get_upload_status()
645         self._all_upload_statuses[s] = None
646         self._recent_upload_statuses.append(s)
647         while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
648             self._recent_upload_statuses.pop(0)
649
650     def upload_finished(self, storage_index, size):
651         # this is called with size=0 if the upload failed
652         self.count("chk_upload_helper.encoded_bytes", size)
653         uh = self._active_uploads[storage_index]
654         del self._active_uploads[storage_index]
655         s = uh.get_upload_status()
656         s.set_active(False)
657
658     def get_all_upload_statuses(self):
659         return self._all_upload_statuses