]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/offloaded.py
10097e3aa30a5ac40d84228773a599ecde1c8b48
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / offloaded.py
1
2 import os, stat, time, weakref
3 from zope.interface import implements
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap import Referenceable, DeadReferenceError
7 from foolscap.eventual import eventually
8 import allmydata
9 from allmydata import interfaces, storage, uri
10 from allmydata.immutable import upload
11 from allmydata.immutable.layout import ReadBucketProxy
12 from allmydata.util import idlib, log, observer, fileutil, hashutil
13
14
15 class NotEnoughWritersError(Exception):
16     pass
17
18
19 class CHKCheckerAndUEBFetcher:
20     """I check to see if a file is already present in the grid. I also fetch
21     the URI Extension Block, which is useful for an uploading client who
22     wants to avoid the work of encryption and encoding.
23
24     I return False if the file is not completely healthy: i.e. if there are
25     less than 'N' shares present.
26
27     If the file is completely healthy, I return a tuple of (sharemap,
28     UEB_data, UEB_hash).
29     """
30
31     def __init__(self, peer_getter, storage_index, logparent=None):
32         self._peer_getter = peer_getter
33         self._found_shares = set()
34         self._storage_index = storage_index
35         self._sharemap = {}
36         self._readers = set()
37         self._ueb_hash = None
38         self._ueb_data = None
39         self._logparent = logparent
40
41     def log(self, *args, **kwargs):
42         if 'facility' not in kwargs:
43             kwargs['facility'] = "tahoe.helper.chk.checkandUEBfetch"
44         if 'parent' not in kwargs:
45             kwargs['parent'] = self._logparent
46         return log.msg(*args, **kwargs)
47
48     def check(self):
49         d = self._get_all_shareholders(self._storage_index)
50         d.addCallback(self._get_uri_extension)
51         d.addCallback(self._done)
52         return d
53
54     def _get_all_shareholders(self, storage_index):
55         dl = []
56         for (peerid, ss) in self._peer_getter("storage", storage_index):
57             d = ss.callRemote("get_buckets", storage_index)
58             d.addCallbacks(self._got_response, self._got_error,
59                            callbackArgs=(peerid,))
60             dl.append(d)
61         return defer.DeferredList(dl)
62
63     def _got_response(self, buckets, peerid):
64         # buckets is a dict: maps shum to an rref of the server who holds it
65         shnums_s = ",".join([str(shnum) for shnum in buckets])
66         self.log("got_response: [%s] has %d shares (%s)" %
67                  (idlib.shortnodeid_b2a(peerid), len(buckets), shnums_s),
68                  level=log.NOISY)
69         self._found_shares.update(buckets.keys())
70         for k in buckets:
71             if k not in self._sharemap:
72                 self._sharemap[k] = []
73             self._sharemap[k].append(peerid)
74         self._readers.update( [ (bucket, peerid)
75                                 for bucket in buckets.values() ] )
76
77     def _got_error(self, f):
78         if f.check(DeadReferenceError):
79             return
80         log.err(f, parent=self._logparent)
81         pass
82
83     def _get_uri_extension(self, res):
84         # assume that we can pull the UEB from any share. If we get an error,
85         # declare the whole file unavailable.
86         if not self._readers:
87             self.log("no readers, so no UEB", level=log.NOISY)
88             return
89         b,peerid = self._readers.pop()
90         rbp = ReadBucketProxy(b, peerid, storage.si_b2a(self._storage_index))
91         d = rbp.get_uri_extension()
92         d.addCallback(self._got_uri_extension)
93         d.addErrback(self._ueb_error)
94         return d
95
96     def _got_uri_extension(self, ueb):
97         self.log("_got_uri_extension", level=log.NOISY)
98         self._ueb_hash = hashutil.uri_extension_hash(ueb)
99         self._ueb_data = uri.unpack_extension(ueb)
100
101     def _ueb_error(self, f):
102         # an error means the file is unavailable, but the overall check
103         # shouldn't fail.
104         self.log("UEB fetch failed", failure=f, level=log.WEIRD, umid="sJLKVg")
105         return None
106
107     def _done(self, res):
108         if self._ueb_data:
109             found = len(self._found_shares)
110             total = self._ueb_data['total_shares']
111             self.log(format="got %(found)d shares of %(total)d",
112                      found=found, total=total, level=log.NOISY)
113             if found < total:
114                 # not all shares are present in the grid
115                 self.log("not enough to qualify, file not found in grid",
116                          level=log.NOISY)
117                 return False
118             # all shares are present
119             self.log("all shares present, file is found in grid",
120                      level=log.NOISY)
121             return (self._sharemap, self._ueb_data, self._ueb_hash)
122         # no shares are present
123         self.log("unable to find UEB data, file not found in grid",
124                  level=log.NOISY)
125         return False
126
127
128 class CHKUploadHelper(Referenceable, upload.CHKUploader):
129     """I am the helper-server -side counterpart to AssistedUploader. I handle
130     peer selection, encoding, and share pushing. I read ciphertext from the
131     remote AssistedUploader.
132     """
133     implements(interfaces.RICHKUploadHelper)
134     VERSION = { "http://allmydata.org/tahoe/protocols/helper/chk-upload/v1" :
135                  { },
136                 "application-version": str(allmydata.__version__),
137                 }
138
139     def __init__(self, storage_index, helper,
140                  incoming_file, encoding_file,
141                  results, log_number):
142         self._storage_index = storage_index
143         self._helper = helper
144         self._incoming_file = incoming_file
145         self._encoding_file = encoding_file
146         self._upload_id = storage.si_b2a(storage_index)[:5]
147         self._log_number = log_number
148         self._results = results
149         self._upload_status = upload.UploadStatus()
150         self._upload_status.set_helper(False)
151         self._upload_status.set_storage_index(storage_index)
152         self._upload_status.set_status("fetching ciphertext")
153         self._upload_status.set_progress(0, 1.0)
154         self._helper.log("CHKUploadHelper starting for SI %s" % self._upload_id,
155                          parent=log_number)
156
157         self._client = helper.parent
158         self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
159                                              self._log_number)
160         self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
161         self._finished_observers = observer.OneShotObserverList()
162
163         d = self._fetcher.when_done()
164         d.addCallback(lambda res: self._reader.start())
165         d.addCallback(lambda res: self.start_encrypted(self._reader))
166         d.addCallback(self._finished)
167         d.addErrback(self._failed)
168
169     def log(self, *args, **kwargs):
170         if 'facility' not in kwargs:
171             kwargs['facility'] = "tahoe.helper.chk"
172         return upload.CHKUploader.log(self, *args, **kwargs)
173
174     def start(self):
175         self._started = time.time()
176         # determine if we need to upload the file. If so, return ({},self) .
177         # If not, return (UploadResults,None) .
178         self.log("deciding whether to upload the file or not", level=log.NOISY)
179         if os.path.exists(self._encoding_file):
180             # we have the whole file, and we might be encoding it (or the
181             # encode/upload might have failed, and we need to restart it).
182             self.log("ciphertext already in place", level=log.UNUSUAL)
183             return (self._results, self)
184         if os.path.exists(self._incoming_file):
185             # we have some of the file, but not all of it (otherwise we'd be
186             # encoding). The caller might be useful.
187             self.log("partial ciphertext already present", level=log.UNUSUAL)
188             return (self._results, self)
189         # we don't remember uploading this file
190         self.log("no ciphertext yet", level=log.NOISY)
191         return (self._results, self)
192
193     def remote_get_version(self):
194         return self.VERSION
195
196     def remote_upload(self, reader):
197         # reader is an RIEncryptedUploadable. I am specified to return an
198         # UploadResults dictionary.
199
200         # let our fetcher pull ciphertext from the reader.
201         self._fetcher.add_reader(reader)
202         # and also hashes
203         self._reader.add_reader(reader)
204
205         # and inform the client when the upload has finished
206         return self._finished_observers.when_fired()
207
208     def _finished(self, res):
209         (uri_extension_hash, needed_shares, total_shares, size) = res
210         r = self._results
211         r.uri_extension_hash = uri_extension_hash
212         f_times = self._fetcher.get_times()
213         r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
214         r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
215         r.timings["total_fetch"] = f_times["total"]
216         self._reader.close()
217         os.unlink(self._encoding_file)
218         self._finished_observers.fire(r)
219         self._helper.upload_finished(self._storage_index, size)
220         del self._reader
221
222     def _failed(self, f):
223         self.log(format="CHKUploadHelper(%(si)s) failed",
224                  si=storage.si_b2a(self._storage_index)[:5],
225                  failure=f,
226                  level=log.UNUSUAL)
227         self._finished_observers.fire(f)
228         self._helper.upload_finished(self._storage_index, 0)
229         del self._reader
230
231 class AskUntilSuccessMixin:
232     # create me with a _reader array
233     _last_failure = None
234
235     def add_reader(self, reader):
236         self._readers.append(reader)
237
238     def call(self, *args, **kwargs):
239         if not self._readers:
240             raise NotEnoughWritersError("ran out of assisted uploaders, last failure was %s" % self._last_failure)
241         rr = self._readers[0]
242         d = rr.callRemote(*args, **kwargs)
243         def _err(f):
244             self._last_failure = f
245             if rr in self._readers:
246                 self._readers.remove(rr)
247             self._upload_helper.log("call to assisted uploader %s failed" % rr,
248                                     failure=f, level=log.UNUSUAL)
249             # we can try again with someone else who's left
250             return self.call(*args, **kwargs)
251         d.addErrback(_err)
252         return d
253
254 class CHKCiphertextFetcher(AskUntilSuccessMixin):
255     """I use one or more remote RIEncryptedUploadable instances to gather
256     ciphertext on disk. When I'm done, the file I create can be used by a
257     LocalCiphertextReader to satisfy the ciphertext needs of a CHK upload
258     process.
259
260     I begin pulling ciphertext as soon as a reader is added. I remove readers
261     when they have any sort of error. If the last reader is removed, I fire
262     my when_done() Deferred with a failure.
263
264     I fire my when_done() Deferred (with None) immediately after I have moved
265     the ciphertext to 'encoded_file'.
266     """
267
268     def __init__(self, helper, incoming_file, encoded_file, logparent):
269         self._upload_helper = helper
270         self._incoming_file = incoming_file
271         self._encoding_file = encoded_file
272         self._upload_id = helper._upload_id
273         self._log_parent = logparent
274         self._done_observers = observer.OneShotObserverList()
275         self._readers = []
276         self._started = False
277         self._f = None
278         self._times = {
279             "cumulative_fetch": 0.0,
280             "total": 0.0,
281             }
282         self._ciphertext_fetched = 0
283
284     def log(self, *args, **kwargs):
285         if "facility" not in kwargs:
286             kwargs["facility"] = "tahoe.helper.chkupload.fetch"
287         if "parent" not in kwargs:
288             kwargs["parent"] = self._log_parent
289         return log.msg(*args, **kwargs)
290
291     def add_reader(self, reader):
292         AskUntilSuccessMixin.add_reader(self, reader)
293         eventually(self._start)
294
295     def _start(self):
296         if self._started:
297             return
298         self._started = True
299         started = time.time()
300
301         if os.path.exists(self._encoding_file):
302             self.log("ciphertext already present, bypassing fetch",
303                      level=log.UNUSUAL)
304             # we'll still need the plaintext hashes (when
305             # LocalCiphertextReader.get_plaintext_hashtree_leaves() is
306             # called), and currently the easiest way to get them is to ask
307             # the sender for the last byte of ciphertext. That will provoke
308             # them into reading and hashing (but not sending) everything
309             # else.
310             have = os.stat(self._encoding_file)[stat.ST_SIZE]
311             d = self.call("read_encrypted", have-1, 1)
312             d.addCallback(self._done2, started)
313             return
314
315         # first, find out how large the file is going to be
316         d = self.call("get_size")
317         d.addCallback(self._got_size)
318         d.addCallback(self._start_reading)
319         d.addCallback(self._done)
320         d.addCallback(self._done2, started)
321         d.addErrback(self._failed)
322
323     def _got_size(self, size):
324         self.log("total size is %d bytes" % size, level=log.NOISY)
325         self._upload_helper._upload_status.set_size(size)
326         self._expected_size = size
327
328     def _start_reading(self, res):
329         # then find out how much crypttext we have on disk
330         if os.path.exists(self._incoming_file):
331             self._have = os.stat(self._incoming_file)[stat.ST_SIZE]
332             self._upload_helper._helper.count("chk_upload_helper.resumes")
333             self.log("we already have %d bytes" % self._have, level=log.NOISY)
334         else:
335             self._have = 0
336             self.log("we do not have any ciphertext yet", level=log.NOISY)
337         self.log("starting ciphertext fetch", level=log.NOISY)
338         self._f = open(self._incoming_file, "ab")
339
340         # now loop to pull the data from the readers
341         d = defer.Deferred()
342         self._loop(d)
343         # this Deferred will be fired once the last byte has been written to
344         # self._f
345         return d
346
347     # read data in 50kB chunks. We should choose a more considered number
348     # here, possibly letting the client specify it. The goal should be to
349     # keep the RTT*bandwidth to be less than 10% of the chunk size, to reduce
350     # the upload bandwidth lost because this protocol is non-windowing. Too
351     # large, however, means more memory consumption for both ends. Something
352     # that can be transferred in, say, 10 seconds sounds about right. On my
353     # home DSL line (50kBps upstream), that suggests 500kB. Most lines are
354     # slower, maybe 10kBps, which suggests 100kB, and that's a bit more
355     # memory than I want to hang on to, so I'm going to go with 50kB and see
356     # how that works.
357     CHUNK_SIZE = 50*1024
358
359     def _loop(self, fire_when_done):
360         # this slightly weird structure is needed because Deferreds don't do
361         # tail-recursion, so it is important to let each one retire promptly.
362         # Simply chaining them will cause a stack overflow at the end of a
363         # transfer that involves more than a few hundred chunks.
364         # 'fire_when_done' lives a long time, but the Deferreds returned by
365         # the inner _fetch() call do not.
366         start = time.time()
367         d = defer.maybeDeferred(self._fetch)
368         def _done(finished):
369             elapsed = time.time() - start
370             self._times["cumulative_fetch"] += elapsed
371             if finished:
372                 self.log("finished reading ciphertext", level=log.NOISY)
373                 fire_when_done.callback(None)
374             else:
375                 self._loop(fire_when_done)
376         def _err(f):
377             self.log(format="[%(si)s] ciphertext read failed",
378                      si=self._upload_id, failure=f, level=log.UNUSUAL)
379             fire_when_done.errback(f)
380         d.addCallbacks(_done, _err)
381         return None
382
383     def _fetch(self):
384         needed = self._expected_size - self._have
385         fetch_size = min(needed, self.CHUNK_SIZE)
386         if fetch_size == 0:
387             self._upload_helper._upload_status.set_progress(1, 1.0)
388             return True # all done
389         percent = 0.0
390         if self._expected_size:
391             percent = 1.0 * (self._have+fetch_size) / self._expected_size
392         self.log(format="fetching [%(si)s] %(start)d-%(end)d of %(total)d (%(percent)d%%)",
393                  si=self._upload_id,
394                  start=self._have,
395                  end=self._have+fetch_size,
396                  total=self._expected_size,
397                  percent=int(100.0*percent),
398                  level=log.NOISY)
399         d = self.call("read_encrypted", self._have, fetch_size)
400         def _got_data(ciphertext_v):
401             for data in ciphertext_v:
402                 self._f.write(data)
403                 self._have += len(data)
404                 self._ciphertext_fetched += len(data)
405                 self._upload_helper._helper.count("chk_upload_helper.fetched_bytes", len(data))
406                 self._upload_helper._upload_status.set_progress(1, percent)
407             return False # not done
408         d.addCallback(_got_data)
409         return d
410
411     def _done(self, res):
412         self._f.close()
413         self._f = None
414         self.log(format="done fetching ciphertext, size=%(size)d",
415                  size=os.stat(self._incoming_file)[stat.ST_SIZE],
416                  level=log.NOISY)
417         os.rename(self._incoming_file, self._encoding_file)
418
419     def _done2(self, _ignored, started):
420         self.log("done2", level=log.NOISY)
421         elapsed = time.time() - started
422         self._times["total"] = elapsed
423         self._readers = []
424         self._done_observers.fire(None)
425
426     def _failed(self, f):
427         if self._f:
428             self._f.close()
429         self._readers = []
430         self._done_observers.fire(f)
431
432     def when_done(self):
433         return self._done_observers.when_fired()
434
435     def get_times(self):
436         return self._times
437
438     def get_ciphertext_fetched(self):
439         return self._ciphertext_fetched
440
441
442 class LocalCiphertextReader(AskUntilSuccessMixin):
443     implements(interfaces.IEncryptedUploadable)
444
445     def __init__(self, upload_helper, storage_index, encoding_file):
446         self._readers = []
447         self._upload_helper = upload_helper
448         self._storage_index = storage_index
449         self._encoding_file = encoding_file
450         self._status = None
451
452     def start(self):
453         self._upload_helper._upload_status.set_status("pushing")
454         self._size = os.stat(self._encoding_file)[stat.ST_SIZE]
455         self.f = open(self._encoding_file, "rb")
456
457     def get_size(self):
458         return defer.succeed(self._size)
459
460     def get_all_encoding_parameters(self):
461         return self.call("get_all_encoding_parameters")
462
463     def get_storage_index(self):
464         return defer.succeed(self._storage_index)
465
466     def read_encrypted(self, length, hash_only):
467         assert hash_only is False
468         d = defer.maybeDeferred(self.f.read, length)
469         d.addCallback(lambda data: [data])
470         return d
471     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
472         return self.call("get_plaintext_hashtree_leaves", first, last,
473                          num_segments)
474     def get_plaintext_hash(self):
475         return self.call("get_plaintext_hash")
476     def close(self):
477         self.f.close()
478         # ??. I'm not sure if it makes sense to forward the close message.
479         return self.call("close")
480
481
482
483 class Helper(Referenceable, service.MultiService):
484     implements(interfaces.RIHelper, interfaces.IStatsProducer)
485     # this is the non-distributed version. When we need to have multiple
486     # helpers, this object will become the HelperCoordinator, and will query
487     # the farm of Helpers to see if anyone has the storage_index of interest,
488     # and send the request off to them. If nobody has it, we'll choose a
489     # helper at random.
490
491     name = "helper"
492     VERSION = { "http://allmydata.org/tahoe/protocols/helper/v1" :
493                  { },
494                 "application-version": str(allmydata.__version__),
495                 }
496     chk_upload_helper_class = CHKUploadHelper
497     MAX_UPLOAD_STATUSES = 10
498
499     def __init__(self, basedir, stats_provider=None):
500         self._basedir = basedir
501         self._chk_incoming = os.path.join(basedir, "CHK_incoming")
502         self._chk_encoding = os.path.join(basedir, "CHK_encoding")
503         fileutil.make_dirs(self._chk_incoming)
504         fileutil.make_dirs(self._chk_encoding)
505         self._active_uploads = {}
506         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
507         self._all_upload_statuses = weakref.WeakKeyDictionary()
508         self._recent_upload_statuses = []
509         self.stats_provider = stats_provider
510         if stats_provider:
511             stats_provider.register_producer(self)
512         self._counters = {"chk_upload_helper.upload_requests": 0,
513                           "chk_upload_helper.upload_already_present": 0,
514                           "chk_upload_helper.upload_need_upload": 0,
515                           "chk_upload_helper.resumes": 0,
516                           "chk_upload_helper.fetched_bytes": 0,
517                           "chk_upload_helper.encoded_bytes": 0,
518                           }
519         service.MultiService.__init__(self)
520
521     def setServiceParent(self, parent):
522         service.MultiService.setServiceParent(self, parent)
523
524     def log(self, *args, **kwargs):
525         if 'facility' not in kwargs:
526             kwargs['facility'] = "tahoe.helper"
527         return self.parent.log(*args, **kwargs)
528
529     def count(self, key, value=1):
530         if self.stats_provider:
531             self.stats_provider.count(key, value)
532         self._counters[key] += value
533
534     def get_stats(self):
535         OLD = 86400*2 # 48hours
536         now = time.time()
537         inc_count = inc_size = inc_size_old = 0
538         enc_count = enc_size = enc_size_old = 0
539         inc = os.listdir(self._chk_incoming)
540         enc = os.listdir(self._chk_encoding)
541         for f in inc:
542             s = os.stat(os.path.join(self._chk_incoming, f))
543             size = s[stat.ST_SIZE]
544             mtime = s[stat.ST_MTIME]
545             inc_count += 1
546             inc_size += size
547             if now - mtime > OLD:
548                 inc_size_old += size
549         for f in enc:
550             s = os.stat(os.path.join(self._chk_encoding, f))
551             size = s[stat.ST_SIZE]
552             mtime = s[stat.ST_MTIME]
553             enc_count += 1
554             enc_size += size
555             if now - mtime > OLD:
556                 enc_size_old += size
557         stats = { 'chk_upload_helper.active_uploads': len(self._active_uploads),
558                   'chk_upload_helper.incoming_count': inc_count,
559                   'chk_upload_helper.incoming_size': inc_size,
560                   'chk_upload_helper.incoming_size_old': inc_size_old,
561                   'chk_upload_helper.encoding_count': enc_count,
562                   'chk_upload_helper.encoding_size': enc_size,
563                   'chk_upload_helper.encoding_size_old': enc_size_old,
564                   }
565         stats.update(self._counters)
566         return stats
567
568     def remote_get_version(self):
569         return self.VERSION
570
571     def remote_upload_chk(self, storage_index):
572         self.count("chk_upload_helper.upload_requests")
573         r = upload.UploadResults()
574         started = time.time()
575         si_s = storage.si_b2a(storage_index)
576         lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
577         incoming_file = os.path.join(self._chk_incoming, si_s)
578         encoding_file = os.path.join(self._chk_encoding, si_s)
579         if storage_index in self._active_uploads:
580             self.log("upload is currently active", parent=lp)
581             uh = self._active_uploads[storage_index]
582             return uh.start()
583
584         d = self._check_for_chk_already_in_grid(storage_index, r, lp)
585         def _checked(already_present):
586             elapsed = time.time() - started
587             r.timings['existence_check'] = elapsed
588             if already_present:
589                 # the necessary results are placed in the UploadResults
590                 self.count("chk_upload_helper.upload_already_present")
591                 self.log("file already found in grid", parent=lp)
592                 return (r, None)
593
594             self.count("chk_upload_helper.upload_need_upload")
595             # the file is not present in the grid, by which we mean there are
596             # less than 'N' shares available.
597             self.log("unable to find file in the grid", parent=lp,
598                      level=log.NOISY)
599             # We need an upload helper. Check our active uploads again in
600             # case there was a race.
601             if storage_index in self._active_uploads:
602                 self.log("upload is currently active", parent=lp)
603                 uh = self._active_uploads[storage_index]
604             else:
605                 self.log("creating new upload helper", parent=lp)
606                 uh = self.chk_upload_helper_class(storage_index, self,
607                                                   incoming_file, encoding_file,
608                                                   r, lp)
609                 self._active_uploads[storage_index] = uh
610                 self._add_upload(uh)
611             return uh.start()
612         d.addCallback(_checked)
613         def _err(f):
614             self.log("error while checking for chk-already-in-grid",
615                      failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
616             return f
617         d.addErrback(_err)
618         return d
619
620     def _check_for_chk_already_in_grid(self, storage_index, results, lp):
621         # see if this file is already in the grid
622         lp2 = self.log("doing a quick check+UEBfetch",
623                        parent=lp, level=log.NOISY)
624         c = CHKCheckerAndUEBFetcher(self.parent.get_permuted_peers,
625                                     storage_index, lp2)
626         d = c.check()
627         def _checked(res):
628             if res:
629                 (sharemap, ueb_data, ueb_hash) = res
630                 self.log("found file in grid", level=log.NOISY, parent=lp)
631                 results.uri_extension_hash = ueb_hash
632                 results.sharemap = {}
633                 for shnum, peerids in sharemap.items():
634                     peers_s = ",".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
635                                         for peerid in peerids])
636                     results.sharemap[shnum] = "Found on " + peers_s
637                 results.uri_extension_data = ueb_data
638                 results.preexisting_shares = len(sharemap)
639                 results.pushed_shares = 0
640                 return True
641             return False
642         d.addCallback(_checked)
643         return d
644
645     def _add_upload(self, uh):
646         self._all_uploads[uh] = None
647         s = uh.get_upload_status()
648         self._all_upload_statuses[s] = None
649         self._recent_upload_statuses.append(s)
650         while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
651             self._recent_upload_statuses.pop(0)
652
653     def upload_finished(self, storage_index, size):
654         # this is called with size=0 if the upload failed
655         self.count("chk_upload_helper.encoded_bytes", size)
656         uh = self._active_uploads[storage_index]
657         del self._active_uploads[storage_index]
658         s = uh.get_upload_status()
659         s.set_active(False)
660
661     def get_all_upload_statuses(self):
662         return self._all_upload_statuses