]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
ec3619f67f1fa0c4d41a86b12f421c2d7843cc80
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap import Referenceable, Copyable, RemoteCopy
7 from foolscap import eventual
8
9 from allmydata.util.hashutil import file_renewal_secret_hash, \
10      file_cancel_secret_hash, bucket_renewal_secret_hash, \
11      bucket_cancel_secret_hash, plaintext_hasher, \
12      storage_index_hash, plaintext_segment_hasher, convergence_hasher
13 from allmydata import hashtree, uri
14 from allmydata.storage.server import si_b2a
15 from allmydata.immutable import encode
16 from allmydata.util import base32, dictutil, idlib, log, mathutil
17 from allmydata.util.assertutil import precondition
18 from allmydata.util.rrefutil import get_versioned_remote_reference
19 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
20      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
21      NotEnoughSharesError, InsufficientVersionError, NoServersError
22 from allmydata.immutable import layout
23 from pycryptopp.cipher.aes import AES
24
25 from cStringIO import StringIO
26
27
28 KiB=1024
29 MiB=1024*KiB
30 GiB=1024*MiB
31 TiB=1024*GiB
32 PiB=1024*TiB
33
34 class HaveAllPeersError(Exception):
35     # we use this to jump out of the loop
36     pass
37
38 # this wants to live in storage, not here
39 class TooFullError(Exception):
40     pass
41
42 class UploadResults(Copyable, RemoteCopy):
43     implements(IUploadResults)
44     # note: don't change this string, it needs to match the value used on the
45     # helper, and it does *not* need to match the fully-qualified
46     # package/module/class name
47     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
48     copytype = typeToCopy
49
50     # also, think twice about changing the shape of any existing attribute,
51     # because instances of this class are sent from the helper to its client,
52     # so changing this may break compatibility. Consider adding new fields
53     # instead of modifying existing ones.
54
55     def __init__(self):
56         self.timings = {} # dict of name to number of seconds
57         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
58         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
59         self.file_size = None
60         self.ciphertext_fetched = None # how much the helper fetched
61         self.uri = None
62         self.preexisting_shares = None # count of shares already present
63         self.pushed_shares = None # count of shares we pushed
64
65
66 # our current uri_extension is 846 bytes for small files, a few bytes
67 # more for larger ones (since the filesize is encoded in decimal in a
68 # few places). Ask for a little bit more just in case we need it. If
69 # the extension changes size, we can change EXTENSION_SIZE to
70 # allocate a more accurate amount of space.
71 EXTENSION_SIZE = 1000
72 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
73 # this.
74
75 class PeerTracker:
76     def __init__(self, peerid, storage_server,
77                  sharesize, blocksize, num_segments, num_share_hashes,
78                  storage_index,
79                  bucket_renewal_secret, bucket_cancel_secret):
80         precondition(isinstance(peerid, str), peerid)
81         precondition(len(peerid) == 20, peerid)
82         self.peerid = peerid
83         self._storageserver = storage_server # to an RIStorageServer
84         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
85         self.sharesize = sharesize
86
87         wbp = layout.make_write_bucket_proxy(None, sharesize,
88                                              blocksize, num_segments,
89                                              num_share_hashes,
90                                              EXTENSION_SIZE, peerid)
91         self.wbp_class = wbp.__class__ # to create more of them
92         self.allocated_size = wbp.get_allocated_size()
93         self.blocksize = blocksize
94         self.num_segments = num_segments
95         self.num_share_hashes = num_share_hashes
96         self.storage_index = storage_index
97
98         self.renew_secret = bucket_renewal_secret
99         self.cancel_secret = bucket_cancel_secret
100
101     def __repr__(self):
102         return ("<PeerTracker for peer %s and SI %s>"
103                 % (idlib.shortnodeid_b2a(self.peerid),
104                    si_b2a(self.storage_index)[:5]))
105
106     def query(self, sharenums):
107         d = self._storageserver.callRemote("allocate_buckets",
108                                            self.storage_index,
109                                            self.renew_secret,
110                                            self.cancel_secret,
111                                            sharenums,
112                                            self.allocated_size,
113                                            canary=Referenceable())
114         d.addCallback(self._got_reply)
115         return d
116
117     def _got_reply(self, (alreadygot, buckets)):
118         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
119         b = {}
120         for sharenum, rref in buckets.iteritems():
121             bp = self.wbp_class(rref, self.sharesize,
122                                 self.blocksize,
123                                 self.num_segments,
124                                 self.num_share_hashes,
125                                 EXTENSION_SIZE,
126                                 self.peerid)
127             b[sharenum] = bp
128         self.buckets.update(b)
129         return (alreadygot, set(b.keys()))
130
131 class Tahoe2PeerSelector:
132
133     def __init__(self, upload_id, logparent=None, upload_status=None):
134         self.upload_id = upload_id
135         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
136         self.error_count = 0
137         self.num_peers_contacted = 0
138         self.last_failure_msg = None
139         self._status = IUploadStatus(upload_status)
140         self._log_parent = log.msg("%s starting" % self, parent=logparent)
141
142     def __repr__(self):
143         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
144
145     def get_shareholders(self, client,
146                          storage_index, share_size, block_size,
147                          num_segments, total_shares, shares_of_happiness):
148         """
149         @return: (used_peers, already_peers), where used_peers is a set of
150                  PeerTracker instances that have agreed to hold some shares
151                  for us (the shnum is stashed inside the PeerTracker),
152                  and already_peers is a dict mapping shnum to a peer
153                  which claims to already have the share.
154         """
155
156         if self._status:
157             self._status.set_status("Contacting Peers..")
158
159         self.total_shares = total_shares
160         self.shares_of_happiness = shares_of_happiness
161
162         self.homeless_shares = range(total_shares)
163         # self.uncontacted_peers = list() # peers we haven't asked yet
164         self.contacted_peers = [] # peers worth asking again
165         self.contacted_peers2 = [] # peers that we have asked again
166         self._started_second_pass = False
167         self.use_peers = set() # PeerTrackers that have shares assigned to them
168         self.preexisting_shares = {} # sharenum -> peerid holding the share
169
170         peers = client.get_permuted_peers("storage", storage_index)
171         if not peers:
172             raise NoServersError("client gave us zero peers")
173
174         # this needed_hashes computation should mirror
175         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
176         # (instead of a HashTree) because we don't require actual hashing
177         # just to count the levels.
178         ht = hashtree.IncompleteHashTree(total_shares)
179         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
180
181         # figure out how much space to ask for
182         wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
183                                              num_share_hashes, EXTENSION_SIZE,
184                                              None)
185         allocated_size = wbp.get_allocated_size()
186
187         # filter the list of peers according to which ones can accomodate
188         # this request. This excludes older peers (which used a 4-byte size
189         # field) from getting large shares (for files larger than about
190         # 12GiB). See #439 for details.
191         def _get_maxsize(peer):
192             (peerid, conn) = peer
193             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
194             return v1["maximum-immutable-share-size"]
195         peers = [peer for peer in peers
196                  if _get_maxsize(peer) >= allocated_size]
197         if not peers:
198             raise NoServersError("no peers could accept an allocated_size of %d" % allocated_size)
199
200         # decide upon the renewal/cancel secrets, to include them in the
201         # allocat_buckets query.
202         client_renewal_secret = client.get_renewal_secret()
203         client_cancel_secret = client.get_cancel_secret()
204
205         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
206                                                        storage_index)
207         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
208                                                      storage_index)
209
210         trackers = [ PeerTracker(peerid, conn,
211                                  share_size, block_size,
212                                  num_segments, num_share_hashes,
213                                  storage_index,
214                                  bucket_renewal_secret_hash(file_renewal_secret,
215                                                             peerid),
216                                  bucket_cancel_secret_hash(file_cancel_secret,
217                                                            peerid),
218                                  )
219                      for (peerid, conn) in peers ]
220         self.uncontacted_peers = trackers
221
222         d = defer.maybeDeferred(self._loop)
223         return d
224
225     def _loop(self):
226         if not self.homeless_shares:
227             # all done
228             msg = ("placed all %d shares, "
229                    "sent %d queries to %d peers, "
230                    "%d queries placed some shares, %d placed none, "
231                    "got %d errors" %
232                    (self.total_shares,
233                     self.query_count, self.num_peers_contacted,
234                     self.good_query_count, self.bad_query_count,
235                     self.error_count))
236             log.msg("peer selection successful for %s: %s" % (self, msg),
237                     parent=self._log_parent)
238             return (self.use_peers, self.preexisting_shares)
239
240         if self.uncontacted_peers:
241             peer = self.uncontacted_peers.pop(0)
242             # TODO: don't pre-convert all peerids to PeerTrackers
243             assert isinstance(peer, PeerTracker)
244
245             shares_to_ask = set([self.homeless_shares.pop(0)])
246             self.query_count += 1
247             self.num_peers_contacted += 1
248             if self._status:
249                 self._status.set_status("Contacting Peers [%s] (first query),"
250                                         " %d shares left.."
251                                         % (idlib.shortnodeid_b2a(peer.peerid),
252                                            len(self.homeless_shares)))
253             d = peer.query(shares_to_ask)
254             d.addBoth(self._got_response, peer, shares_to_ask,
255                       self.contacted_peers)
256             return d
257         elif self.contacted_peers:
258             # ask a peer that we've already asked.
259             if not self._started_second_pass:
260                 log.msg("starting second pass", parent=self._log_parent,
261                         level=log.NOISY)
262                 self._started_second_pass = True
263             num_shares = mathutil.div_ceil(len(self.homeless_shares),
264                                            len(self.contacted_peers))
265             peer = self.contacted_peers.pop(0)
266             shares_to_ask = set(self.homeless_shares[:num_shares])
267             self.homeless_shares[:num_shares] = []
268             self.query_count += 1
269             if self._status:
270                 self._status.set_status("Contacting Peers [%s] (second query),"
271                                         " %d shares left.."
272                                         % (idlib.shortnodeid_b2a(peer.peerid),
273                                            len(self.homeless_shares)))
274             d = peer.query(shares_to_ask)
275             d.addBoth(self._got_response, peer, shares_to_ask,
276                       self.contacted_peers2)
277             return d
278         elif self.contacted_peers2:
279             # we've finished the second-or-later pass. Move all the remaining
280             # peers back into self.contacted_peers for the next pass.
281             self.contacted_peers.extend(self.contacted_peers2)
282             self.contacted_peers[:] = []
283             return self._loop()
284         else:
285             # no more peers. If we haven't placed enough shares, we fail.
286             placed_shares = self.total_shares - len(self.homeless_shares)
287             if placed_shares < self.shares_of_happiness:
288                 msg = ("placed %d shares out of %d total (%d homeless), "
289                        "sent %d queries to %d peers, "
290                        "%d queries placed some shares, %d placed none, "
291                        "got %d errors" %
292                        (self.total_shares - len(self.homeless_shares),
293                         self.total_shares, len(self.homeless_shares),
294                         self.query_count, self.num_peers_contacted,
295                         self.good_query_count, self.bad_query_count,
296                         self.error_count))
297                 msg = "peer selection failed for %s: %s" % (self, msg)
298                 if self.last_failure_msg:
299                     msg += " (%s)" % (self.last_failure_msg,)
300                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
301                 raise NotEnoughSharesError(msg, placed_shares,
302                                            self.shares_of_happiness)
303             else:
304                 # we placed enough to be happy, so we're done
305                 if self._status:
306                     self._status.set_status("Placed all shares")
307                 return self.use_peers
308
309     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
310         if isinstance(res, failure.Failure):
311             # This is unusual, and probably indicates a bug or a network
312             # problem.
313             log.msg("%s got error during peer selection: %s" % (peer, res),
314                     level=log.UNUSUAL, parent=self._log_parent)
315             self.error_count += 1
316             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
317             if (self.uncontacted_peers
318                 or self.contacted_peers
319                 or self.contacted_peers2):
320                 # there is still hope, so just loop
321                 pass
322             else:
323                 # No more peers, so this upload might fail (it depends upon
324                 # whether we've hit shares_of_happiness or not). Log the last
325                 # failure we got: if a coding error causes all peers to fail
326                 # in the same way, this allows the common failure to be seen
327                 # by the uploader and should help with debugging
328                 msg = ("last failure (from %s) was: %s" % (peer, res))
329                 self.last_failure_msg = msg
330         else:
331             (alreadygot, allocated) = res
332             log.msg("response from peer %s: alreadygot=%s, allocated=%s"
333                     % (idlib.shortnodeid_b2a(peer.peerid),
334                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
335                     level=log.NOISY, parent=self._log_parent)
336             progress = False
337             for s in alreadygot:
338                 self.preexisting_shares[s] = peer.peerid
339                 if s in self.homeless_shares:
340                     self.homeless_shares.remove(s)
341                     progress = True
342
343             # the PeerTracker will remember which shares were allocated on
344             # that peer. We just have to remember to use them.
345             if allocated:
346                 self.use_peers.add(peer)
347                 progress = True
348
349             not_yet_present = set(shares_to_ask) - set(alreadygot)
350             still_homeless = not_yet_present - set(allocated)
351
352             if progress:
353                 # they accepted or already had at least one share, so
354                 # progress has been made
355                 self.good_query_count += 1
356             else:
357                 self.bad_query_count += 1
358
359             if still_homeless:
360                 # In networks with lots of space, this is very unusual and
361                 # probably indicates an error. In networks with peers that
362                 # are full, it is merely unusual. In networks that are very
363                 # full, it is common, and many uploads will fail. In most
364                 # cases, this is obviously not fatal, and we'll just use some
365                 # other peers.
366
367                 # some shares are still homeless, keep trying to find them a
368                 # home. The ones that were rejected get first priority.
369                 self.homeless_shares = (list(still_homeless)
370                                         + self.homeless_shares)
371                 # Since they were unable to accept all of our requests, so it
372                 # is safe to assume that asking them again won't help.
373             else:
374                 # if they *were* able to accept everything, they might be
375                 # willing to accept even more.
376                 put_peer_here.append(peer)
377
378         # now loop
379         return self._loop()
380
381
382 class EncryptAnUploadable:
383     """This is a wrapper that takes an IUploadable and provides
384     IEncryptedUploadable."""
385     implements(IEncryptedUploadable)
386     CHUNKSIZE = 50*1024
387
388     def __init__(self, original, log_parent=None):
389         self.original = IUploadable(original)
390         self._log_number = log_parent
391         self._encryptor = None
392         self._plaintext_hasher = plaintext_hasher()
393         self._plaintext_segment_hasher = None
394         self._plaintext_segment_hashes = []
395         self._encoding_parameters = None
396         self._file_size = None
397         self._ciphertext_bytes_read = 0
398         self._status = None
399
400     def set_upload_status(self, upload_status):
401         self._status = IUploadStatus(upload_status)
402         self.original.set_upload_status(upload_status)
403
404     def log(self, *args, **kwargs):
405         if "facility" not in kwargs:
406             kwargs["facility"] = "upload.encryption"
407         if "parent" not in kwargs:
408             kwargs["parent"] = self._log_number
409         return log.msg(*args, **kwargs)
410
411     def get_size(self):
412         if self._file_size is not None:
413             return defer.succeed(self._file_size)
414         d = self.original.get_size()
415         def _got_size(size):
416             self._file_size = size
417             if self._status:
418                 self._status.set_size(size)
419             return size
420         d.addCallback(_got_size)
421         return d
422
423     def get_all_encoding_parameters(self):
424         if self._encoding_parameters is not None:
425             return defer.succeed(self._encoding_parameters)
426         d = self.original.get_all_encoding_parameters()
427         def _got(encoding_parameters):
428             (k, happy, n, segsize) = encoding_parameters
429             self._segment_size = segsize # used by segment hashers
430             self._encoding_parameters = encoding_parameters
431             self.log("my encoding parameters: %s" % (encoding_parameters,),
432                      level=log.NOISY)
433             return encoding_parameters
434         d.addCallback(_got)
435         return d
436
437     def _get_encryptor(self):
438         if self._encryptor:
439             return defer.succeed(self._encryptor)
440
441         d = self.original.get_encryption_key()
442         def _got(key):
443             e = AES(key)
444             self._encryptor = e
445
446             storage_index = storage_index_hash(key)
447             assert isinstance(storage_index, str)
448             # There's no point to having the SI be longer than the key, so we
449             # specify that it is truncated to the same 128 bits as the AES key.
450             assert len(storage_index) == 16  # SHA-256 truncated to 128b
451             self._storage_index = storage_index
452             if self._status:
453                 self._status.set_storage_index(storage_index)
454             return e
455         d.addCallback(_got)
456         return d
457
458     def get_storage_index(self):
459         d = self._get_encryptor()
460         d.addCallback(lambda res: self._storage_index)
461         return d
462
463     def _get_segment_hasher(self):
464         p = self._plaintext_segment_hasher
465         if p:
466             left = self._segment_size - self._plaintext_segment_hashed_bytes
467             return p, left
468         p = plaintext_segment_hasher()
469         self._plaintext_segment_hasher = p
470         self._plaintext_segment_hashed_bytes = 0
471         return p, self._segment_size
472
473     def _update_segment_hash(self, chunk):
474         offset = 0
475         while offset < len(chunk):
476             p, segment_left = self._get_segment_hasher()
477             chunk_left = len(chunk) - offset
478             this_segment = min(chunk_left, segment_left)
479             p.update(chunk[offset:offset+this_segment])
480             self._plaintext_segment_hashed_bytes += this_segment
481
482             if self._plaintext_segment_hashed_bytes == self._segment_size:
483                 # we've filled this segment
484                 self._plaintext_segment_hashes.append(p.digest())
485                 self._plaintext_segment_hasher = None
486                 self.log("closed hash [%d]: %dB" %
487                          (len(self._plaintext_segment_hashes)-1,
488                           self._plaintext_segment_hashed_bytes),
489                          level=log.NOISY)
490                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
491                          segnum=len(self._plaintext_segment_hashes)-1,
492                          hash=base32.b2a(p.digest()),
493                          level=log.NOISY)
494
495             offset += this_segment
496
497
498     def read_encrypted(self, length, hash_only):
499         # make sure our parameters have been set up first
500         d = self.get_all_encoding_parameters()
501         # and size
502         d.addCallback(lambda ignored: self.get_size())
503         d.addCallback(lambda ignored: self._get_encryptor())
504         # then fetch and encrypt the plaintext. The unusual structure here
505         # (passing a Deferred *into* a function) is needed to avoid
506         # overflowing the stack: Deferreds don't optimize out tail recursion.
507         # We also pass in a list, to which _read_encrypted will append
508         # ciphertext.
509         ciphertext = []
510         d2 = defer.Deferred()
511         d.addCallback(lambda ignored:
512                       self._read_encrypted(length, ciphertext, hash_only, d2))
513         d.addCallback(lambda ignored: d2)
514         return d
515
516     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
517         if not remaining:
518             fire_when_done.callback(ciphertext)
519             return None
520         # tolerate large length= values without consuming a lot of RAM by
521         # reading just a chunk (say 50kB) at a time. This only really matters
522         # when hash_only==True (i.e. resuming an interrupted upload), since
523         # that's the case where we will be skipping over a lot of data.
524         size = min(remaining, self.CHUNKSIZE)
525         remaining = remaining - size
526         # read a chunk of plaintext..
527         d = defer.maybeDeferred(self.original.read, size)
528         # N.B.: if read() is synchronous, then since everything else is
529         # actually synchronous too, we'd blow the stack unless we stall for a
530         # tick. Once you accept a Deferred from IUploadable.read(), you must
531         # be prepared to have it fire immediately too.
532         d.addCallback(eventual.fireEventually)
533         def _good(plaintext):
534             # and encrypt it..
535             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
536             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
537             ciphertext.extend(ct)
538             self._read_encrypted(remaining, ciphertext, hash_only,
539                                  fire_when_done)
540         def _err(why):
541             fire_when_done.errback(why)
542         d.addCallback(_good)
543         d.addErrback(_err)
544         return None
545
546     def _hash_and_encrypt_plaintext(self, data, hash_only):
547         assert isinstance(data, (tuple, list)), type(data)
548         data = list(data)
549         cryptdata = []
550         # we use data.pop(0) instead of 'for chunk in data' to save
551         # memory: each chunk is destroyed as soon as we're done with it.
552         bytes_processed = 0
553         while data:
554             chunk = data.pop(0)
555             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
556                      level=log.NOISY)
557             bytes_processed += len(chunk)
558             self._plaintext_hasher.update(chunk)
559             self._update_segment_hash(chunk)
560             # TODO: we have to encrypt the data (even if hash_only==True)
561             # because pycryptopp's AES-CTR implementation doesn't offer a
562             # way to change the counter value. Once pycryptopp acquires
563             # this ability, change this to simply update the counter
564             # before each call to (hash_only==False) _encryptor.process()
565             ciphertext = self._encryptor.process(chunk)
566             if hash_only:
567                 self.log("  skipping encryption", level=log.NOISY)
568             else:
569                 cryptdata.append(ciphertext)
570             del ciphertext
571             del chunk
572         self._ciphertext_bytes_read += bytes_processed
573         if self._status:
574             progress = float(self._ciphertext_bytes_read) / self._file_size
575             self._status.set_progress(1, progress)
576         return cryptdata
577
578
579     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
580         if len(self._plaintext_segment_hashes) < num_segments:
581             # close out the last one
582             assert len(self._plaintext_segment_hashes) == num_segments-1
583             p, segment_left = self._get_segment_hasher()
584             self._plaintext_segment_hashes.append(p.digest())
585             del self._plaintext_segment_hasher
586             self.log("closing plaintext leaf hasher, hashed %d bytes" %
587                      self._plaintext_segment_hashed_bytes,
588                      level=log.NOISY)
589             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
590                      segnum=len(self._plaintext_segment_hashes)-1,
591                      hash=base32.b2a(p.digest()),
592                      level=log.NOISY)
593         assert len(self._plaintext_segment_hashes) == num_segments
594         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
595
596     def get_plaintext_hash(self):
597         h = self._plaintext_hasher.digest()
598         return defer.succeed(h)
599
600     def close(self):
601         return self.original.close()
602
603 class UploadStatus:
604     implements(IUploadStatus)
605     statusid_counter = itertools.count(0)
606
607     def __init__(self):
608         self.storage_index = None
609         self.size = None
610         self.helper = False
611         self.status = "Not started"
612         self.progress = [0.0, 0.0, 0.0]
613         self.active = True
614         self.results = None
615         self.counter = self.statusid_counter.next()
616         self.started = time.time()
617
618     def get_started(self):
619         return self.started
620     def get_storage_index(self):
621         return self.storage_index
622     def get_size(self):
623         return self.size
624     def using_helper(self):
625         return self.helper
626     def get_status(self):
627         return self.status
628     def get_progress(self):
629         return tuple(self.progress)
630     def get_active(self):
631         return self.active
632     def get_results(self):
633         return self.results
634     def get_counter(self):
635         return self.counter
636
637     def set_storage_index(self, si):
638         self.storage_index = si
639     def set_size(self, size):
640         self.size = size
641     def set_helper(self, helper):
642         self.helper = helper
643     def set_status(self, status):
644         self.status = status
645     def set_progress(self, which, value):
646         # [0]: chk, [1]: ciphertext, [2]: encode+push
647         self.progress[which] = value
648     def set_active(self, value):
649         self.active = value
650     def set_results(self, value):
651         self.results = value
652
653 class CHKUploader:
654     peer_selector_class = Tahoe2PeerSelector
655
656     def __init__(self, client):
657         self._client = client
658         self._log_number = self._client.log("CHKUploader starting")
659         self._encoder = None
660         self._results = UploadResults()
661         self._storage_index = None
662         self._upload_status = UploadStatus()
663         self._upload_status.set_helper(False)
664         self._upload_status.set_active(True)
665         self._upload_status.set_results(self._results)
666
667         # locate_all_shareholders() will create the following attribute:
668         # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
669
670     def log(self, *args, **kwargs):
671         if "parent" not in kwargs:
672             kwargs["parent"] = self._log_number
673         if "facility" not in kwargs:
674             kwargs["facility"] = "tahoe.upload"
675         return self._client.log(*args, **kwargs)
676
677     def start(self, encrypted_uploadable):
678         """Start uploading the file.
679
680         Returns a Deferred that will fire with the UploadResults instance.
681         """
682
683         self._started = time.time()
684         eu = IEncryptedUploadable(encrypted_uploadable)
685         self.log("starting upload of %s" % eu)
686
687         eu.set_upload_status(self._upload_status)
688         d = self.start_encrypted(eu)
689         def _done(uploadresults):
690             self._upload_status.set_active(False)
691             return uploadresults
692         d.addBoth(_done)
693         return d
694
695     def abort(self):
696         """Call this if the upload must be abandoned before it completes.
697         This will tell the shareholders to delete their partial shares. I
698         return a Deferred that fires when these messages have been acked."""
699         if not self._encoder:
700             # how did you call abort() before calling start() ?
701             return defer.succeed(None)
702         return self._encoder.abort()
703
704     def start_encrypted(self, encrypted):
705         """ Returns a Deferred that will fire with the UploadResults instance. """
706         eu = IEncryptedUploadable(encrypted)
707
708         started = time.time()
709         self._encoder = e = encode.Encoder(self._log_number,
710                                            self._upload_status)
711         d = e.set_encrypted_uploadable(eu)
712         d.addCallback(self.locate_all_shareholders, started)
713         d.addCallback(self.set_shareholders, e)
714         d.addCallback(lambda res: e.start())
715         d.addCallback(self._encrypted_done)
716         return d
717
718     def locate_all_shareholders(self, encoder, started):
719         peer_selection_started = now = time.time()
720         self._storage_index_elapsed = now - started
721         storage_index = encoder.get_param("storage_index")
722         self._storage_index = storage_index
723         upload_id = si_b2a(storage_index)[:5]
724         self.log("using storage index %s" % upload_id)
725         peer_selector = self.peer_selector_class(upload_id, self._log_number,
726                                                  self._upload_status)
727
728         share_size = encoder.get_param("share_size")
729         block_size = encoder.get_param("block_size")
730         num_segments = encoder.get_param("num_segments")
731         k,desired,n = encoder.get_param("share_counts")
732
733         self._peer_selection_started = time.time()
734         d = peer_selector.get_shareholders(self._client, storage_index,
735                                            share_size, block_size,
736                                            num_segments, n, desired)
737         def _done(res):
738             self._peer_selection_elapsed = time.time() - peer_selection_started
739             return res
740         d.addCallback(_done)
741         return d
742
743     def set_shareholders(self, (used_peers, already_peers), encoder):
744         """
745         @param used_peers: a sequence of PeerTracker objects
746         @paran already_peers: a dict mapping sharenum to a peerid that
747                               claims to already have this share
748         """
749         self.log("_send_shares, used_peers is %s" % (used_peers,))
750         # record already-present shares in self._results
751         self._results.preexisting_shares = len(already_peers)
752
753         self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
754         for peer in used_peers:
755             assert isinstance(peer, PeerTracker)
756         buckets = {}
757         for peer in used_peers:
758             buckets.update(peer.buckets)
759             for shnum in peer.buckets:
760                 self._peer_trackers[shnum] = peer
761         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
762         encoder.set_shareholders(buckets)
763
764     def _encrypted_done(self, verifycap):
765         """ Returns a Deferred that will fire with the UploadResults instance. """
766         r = self._results
767         for shnum in self._encoder.get_shares_placed():
768             peer_tracker = self._peer_trackers[shnum]
769             peerid = peer_tracker.peerid
770             peerid_s = idlib.shortnodeid_b2a(peerid)
771             r.sharemap.add(shnum, peerid)
772             r.servermap.add(peerid, shnum)
773         r.pushed_shares = len(self._encoder.get_shares_placed())
774         now = time.time()
775         r.file_size = self._encoder.file_size
776         r.timings["total"] = now - self._started
777         r.timings["storage_index"] = self._storage_index_elapsed
778         r.timings["peer_selection"] = self._peer_selection_elapsed
779         r.timings.update(self._encoder.get_times())
780         r.uri_extension_data = self._encoder.get_uri_extension_data()
781         r.verifycapstr = verifycap.to_string()
782         return r
783
784     def get_upload_status(self):
785         return self._upload_status
786
787 def read_this_many_bytes(uploadable, size, prepend_data=[]):
788     if size == 0:
789         return defer.succeed([])
790     d = uploadable.read(size)
791     def _got(data):
792         assert isinstance(data, list)
793         bytes = sum([len(piece) for piece in data])
794         assert bytes > 0
795         assert bytes <= size
796         remaining = size - bytes
797         if remaining:
798             return read_this_many_bytes(uploadable, remaining,
799                                         prepend_data + data)
800         return prepend_data + data
801     d.addCallback(_got)
802     return d
803
804 class LiteralUploader:
805
806     def __init__(self, client):
807         self._client = client
808         self._results = UploadResults()
809         self._status = s = UploadStatus()
810         s.set_storage_index(None)
811         s.set_helper(False)
812         s.set_progress(0, 1.0)
813         s.set_active(False)
814         s.set_results(self._results)
815
816     def start(self, uploadable):
817         uploadable = IUploadable(uploadable)
818         d = uploadable.get_size()
819         def _got_size(size):
820             self._size = size
821             self._status.set_size(size)
822             self._results.file_size = size
823             return read_this_many_bytes(uploadable, size)
824         d.addCallback(_got_size)
825         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
826         d.addCallback(lambda u: u.to_string())
827         d.addCallback(self._build_results)
828         return d
829
830     def _build_results(self, uri):
831         self._results.uri = uri
832         self._status.set_status("Done")
833         self._status.set_progress(1, 1.0)
834         self._status.set_progress(2, 1.0)
835         return self._results
836
837     def close(self):
838         pass
839
840     def get_upload_status(self):
841         return self._status
842
843 class RemoteEncryptedUploadable(Referenceable):
844     implements(RIEncryptedUploadable)
845
846     def __init__(self, encrypted_uploadable, upload_status):
847         self._eu = IEncryptedUploadable(encrypted_uploadable)
848         self._offset = 0
849         self._bytes_sent = 0
850         self._status = IUploadStatus(upload_status)
851         # we are responsible for updating the status string while we run, and
852         # for setting the ciphertext-fetch progress.
853         self._size = None
854
855     def get_size(self):
856         if self._size is not None:
857             return defer.succeed(self._size)
858         d = self._eu.get_size()
859         def _got_size(size):
860             self._size = size
861             return size
862         d.addCallback(_got_size)
863         return d
864
865     def remote_get_size(self):
866         return self.get_size()
867     def remote_get_all_encoding_parameters(self):
868         return self._eu.get_all_encoding_parameters()
869
870     def _read_encrypted(self, length, hash_only):
871         d = self._eu.read_encrypted(length, hash_only)
872         def _read(strings):
873             if hash_only:
874                 self._offset += length
875             else:
876                 size = sum([len(data) for data in strings])
877                 self._offset += size
878             return strings
879         d.addCallback(_read)
880         return d
881
882     def remote_read_encrypted(self, offset, length):
883         # we don't support seek backwards, but we allow skipping forwards
884         precondition(offset >= 0, offset)
885         precondition(length >= 0, length)
886         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
887                      level=log.NOISY)
888         precondition(offset >= self._offset, offset, self._offset)
889         if offset > self._offset:
890             # read the data from disk anyways, to build up the hash tree
891             skip = offset - self._offset
892             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
893                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
894             d = self._read_encrypted(skip, hash_only=True)
895         else:
896             d = defer.succeed(None)
897
898         def _at_correct_offset(res):
899             assert offset == self._offset, "%d != %d" % (offset, self._offset)
900             return self._read_encrypted(length, hash_only=False)
901         d.addCallback(_at_correct_offset)
902
903         def _read(strings):
904             size = sum([len(data) for data in strings])
905             self._bytes_sent += size
906             return strings
907         d.addCallback(_read)
908         return d
909
910     def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
911         log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
912                 (first, last-1, num_segments),
913                 level=log.NOISY)
914         d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
915         d.addCallback(list)
916         return d
917     def remote_get_plaintext_hash(self):
918         return self._eu.get_plaintext_hash()
919     def remote_close(self):
920         return self._eu.close()
921
922
923 class AssistedUploader:
924
925     def __init__(self, helper):
926         self._helper = helper
927         self._log_number = log.msg("AssistedUploader starting")
928         self._storage_index = None
929         self._upload_status = s = UploadStatus()
930         s.set_helper(True)
931         s.set_active(True)
932
933     def log(self, *args, **kwargs):
934         if "parent" not in kwargs:
935             kwargs["parent"] = self._log_number
936         return log.msg(*args, **kwargs)
937
938     def start(self, encrypted_uploadable, storage_index):
939         """Start uploading the file.
940
941         Returns a Deferred that will fire with the UploadResults instance.
942         """
943         precondition(isinstance(storage_index, str), storage_index)
944         self._started = time.time()
945         eu = IEncryptedUploadable(encrypted_uploadable)
946         eu.set_upload_status(self._upload_status)
947         self._encuploadable = eu
948         self._storage_index = storage_index
949         d = eu.get_size()
950         d.addCallback(self._got_size)
951         d.addCallback(lambda res: eu.get_all_encoding_parameters())
952         d.addCallback(self._got_all_encoding_parameters)
953         d.addCallback(self._contact_helper)
954         d.addCallback(self._build_verifycap)
955         def _done(res):
956             self._upload_status.set_active(False)
957             return res
958         d.addBoth(_done)
959         return d
960
961     def _got_size(self, size):
962         self._size = size
963         self._upload_status.set_size(size)
964
965     def _got_all_encoding_parameters(self, params):
966         k, happy, n, segment_size = params
967         # stash these for URI generation later
968         self._needed_shares = k
969         self._total_shares = n
970         self._segment_size = segment_size
971
972     def _contact_helper(self, res):
973         now = self._time_contacting_helper_start = time.time()
974         self._storage_index_elapsed = now - self._started
975         self.log(format="contacting helper for SI %(si)s..",
976                  si=si_b2a(self._storage_index))
977         self._upload_status.set_status("Contacting Helper")
978         d = self._helper.callRemote("upload_chk", self._storage_index)
979         d.addCallback(self._contacted_helper)
980         return d
981
982     def _contacted_helper(self, (upload_results, upload_helper)):
983         now = time.time()
984         elapsed = now - self._time_contacting_helper_start
985         self._elapsed_time_contacting_helper = elapsed
986         if upload_helper:
987             self.log("helper says we need to upload")
988             self._upload_status.set_status("Uploading Ciphertext")
989             # we need to upload the file
990             reu = RemoteEncryptedUploadable(self._encuploadable,
991                                             self._upload_status)
992             # let it pre-compute the size for progress purposes
993             d = reu.get_size()
994             d.addCallback(lambda ignored:
995                           upload_helper.callRemote("upload", reu))
996             # this Deferred will fire with the upload results
997             return d
998         self.log("helper says file is already uploaded")
999         self._upload_status.set_progress(1, 1.0)
1000         self._upload_status.set_results(upload_results)
1001         return upload_results
1002
1003     def _convert_old_upload_results(self, upload_results):
1004         # pre-1.3.0 helpers return upload results which contain a mapping
1005         # from shnum to a single human-readable string, containing things
1006         # like "Found on [x],[y],[z]" (for healthy files that were already in
1007         # the grid), "Found on [x]" (for files that needed upload but which
1008         # discovered pre-existing shares), and "Placed on [x]" (for newly
1009         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1010         # set of binary serverid strings.
1011
1012         # the old results are too hard to deal with (they don't even contain
1013         # as much information as the new results, since the nodeids are
1014         # abbreviated), so if we detect old results, just clobber them.
1015
1016         sharemap = upload_results.sharemap
1017         if str in [type(v) for v in sharemap.values()]:
1018             upload_results.sharemap = None
1019
1020     def _build_verifycap(self, upload_results):
1021         self.log("upload finished, building readcap")
1022         self._convert_old_upload_results(upload_results)
1023         self._upload_status.set_status("Building Readcap")
1024         r = upload_results
1025         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1026         assert r.uri_extension_data["total_shares"] == self._total_shares
1027         assert r.uri_extension_data["segment_size"] == self._segment_size
1028         assert r.uri_extension_data["size"] == self._size
1029         r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1030                                              uri_extension_hash=r.uri_extension_hash,
1031                                              needed_shares=self._needed_shares,
1032                                              total_shares=self._total_shares, size=self._size
1033                                              ).to_string()
1034         now = time.time()
1035         r.file_size = self._size
1036         r.timings["storage_index"] = self._storage_index_elapsed
1037         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1038         if "total" in r.timings:
1039             r.timings["helper_total"] = r.timings["total"]
1040         r.timings["total"] = now - self._started
1041         self._upload_status.set_status("Done")
1042         self._upload_status.set_results(r)
1043         return r
1044
1045     def get_upload_status(self):
1046         return self._upload_status
1047
1048 class BaseUploadable:
1049     default_max_segment_size = 128*KiB # overridden by max_segment_size
1050     default_encoding_param_k = 3 # overridden by encoding_parameters
1051     default_encoding_param_happy = 7
1052     default_encoding_param_n = 10
1053
1054     max_segment_size = None
1055     encoding_param_k = None
1056     encoding_param_happy = None
1057     encoding_param_n = None
1058
1059     _all_encoding_parameters = None
1060     _status = None
1061
1062     def set_upload_status(self, upload_status):
1063         self._status = IUploadStatus(upload_status)
1064
1065     def set_default_encoding_parameters(self, default_params):
1066         assert isinstance(default_params, dict)
1067         for k,v in default_params.items():
1068             precondition(isinstance(k, str), k, v)
1069             precondition(isinstance(v, int), k, v)
1070         if "k" in default_params:
1071             self.default_encoding_param_k = default_params["k"]
1072         if "happy" in default_params:
1073             self.default_encoding_param_happy = default_params["happy"]
1074         if "n" in default_params:
1075             self.default_encoding_param_n = default_params["n"]
1076         if "max_segment_size" in default_params:
1077             self.default_max_segment_size = default_params["max_segment_size"]
1078
1079     def get_all_encoding_parameters(self):
1080         if self._all_encoding_parameters:
1081             return defer.succeed(self._all_encoding_parameters)
1082
1083         max_segsize = self.max_segment_size or self.default_max_segment_size
1084         k = self.encoding_param_k or self.default_encoding_param_k
1085         happy = self.encoding_param_happy or self.default_encoding_param_happy
1086         n = self.encoding_param_n or self.default_encoding_param_n
1087
1088         d = self.get_size()
1089         def _got_size(file_size):
1090             # for small files, shrink the segment size to avoid wasting space
1091             segsize = min(max_segsize, file_size)
1092             # this must be a multiple of 'required_shares'==k
1093             segsize = mathutil.next_multiple(segsize, k)
1094             encoding_parameters = (k, happy, n, segsize)
1095             self._all_encoding_parameters = encoding_parameters
1096             return encoding_parameters
1097         d.addCallback(_got_size)
1098         return d
1099
1100 class FileHandle(BaseUploadable):
1101     implements(IUploadable)
1102
1103     def __init__(self, filehandle, convergence):
1104         """
1105         Upload the data from the filehandle.  If convergence is None then a
1106         random encryption key will be used, else the plaintext will be hashed,
1107         then the hash will be hashed together with the string in the
1108         "convergence" argument to form the encryption key.
1109         """
1110         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1111         self._filehandle = filehandle
1112         self._key = None
1113         self.convergence = convergence
1114         self._size = None
1115
1116     def _get_encryption_key_convergent(self):
1117         if self._key is not None:
1118             return defer.succeed(self._key)
1119
1120         d = self.get_size()
1121         # that sets self._size as a side-effect
1122         d.addCallback(lambda size: self.get_all_encoding_parameters())
1123         def _got(params):
1124             k, happy, n, segsize = params
1125             f = self._filehandle
1126             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1127             f.seek(0)
1128             BLOCKSIZE = 64*1024
1129             bytes_read = 0
1130             while True:
1131                 data = f.read(BLOCKSIZE)
1132                 if not data:
1133                     break
1134                 enckey_hasher.update(data)
1135                 # TODO: setting progress in a non-yielding loop is kind of
1136                 # pointless, but I'm anticipating (perhaps prematurely) the
1137                 # day when we use a slowjob or twisted's CooperatorService to
1138                 # make this yield time to other jobs.
1139                 bytes_read += len(data)
1140                 if self._status:
1141                     self._status.set_progress(0, float(bytes_read)/self._size)
1142             f.seek(0)
1143             self._key = enckey_hasher.digest()
1144             if self._status:
1145                 self._status.set_progress(0, 1.0)
1146             assert len(self._key) == 16
1147             return self._key
1148         d.addCallback(_got)
1149         return d
1150
1151     def _get_encryption_key_random(self):
1152         if self._key is None:
1153             self._key = os.urandom(16)
1154         return defer.succeed(self._key)
1155
1156     def get_encryption_key(self):
1157         if self.convergence is not None:
1158             return self._get_encryption_key_convergent()
1159         else:
1160             return self._get_encryption_key_random()
1161
1162     def get_size(self):
1163         if self._size is not None:
1164             return defer.succeed(self._size)
1165         self._filehandle.seek(0,2)
1166         size = self._filehandle.tell()
1167         self._size = size
1168         self._filehandle.seek(0)
1169         return defer.succeed(size)
1170
1171     def read(self, length):
1172         return defer.succeed([self._filehandle.read(length)])
1173
1174     def close(self):
1175         # the originator of the filehandle reserves the right to close it
1176         pass
1177
1178 class FileName(FileHandle):
1179     def __init__(self, filename, convergence):
1180         """
1181         Upload the data from the filename.  If convergence is None then a
1182         random encryption key will be used, else the plaintext will be hashed,
1183         then the hash will be hashed together with the string in the
1184         "convergence" argument to form the encryption key.
1185         """
1186         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1187         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1188     def close(self):
1189         FileHandle.close(self)
1190         self._filehandle.close()
1191
1192 class Data(FileHandle):
1193     def __init__(self, data, convergence):
1194         """
1195         Upload the data from the data argument.  If convergence is None then a
1196         random encryption key will be used, else the plaintext will be hashed,
1197         then the hash will be hashed together with the string in the
1198         "convergence" argument to form the encryption key.
1199         """
1200         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1201         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1202
1203 class Uploader(service.MultiService, log.PrefixingLogMixin):
1204     """I am a service that allows file uploading. I am a service-child of the
1205     Client.
1206     """
1207     implements(IUploader)
1208     name = "uploader"
1209     URI_LIT_SIZE_THRESHOLD = 55
1210
1211     def __init__(self, helper_furl=None, stats_provider=None):
1212         self._helper_furl = helper_furl
1213         self.stats_provider = stats_provider
1214         self._helper = None
1215         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1216         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1217         service.MultiService.__init__(self)
1218
1219     def startService(self):
1220         service.MultiService.startService(self)
1221         if self._helper_furl:
1222             self.parent.tub.connectTo(self._helper_furl,
1223                                       self._got_helper)
1224
1225     def _got_helper(self, helper):
1226         self.log("got helper connection, getting versions")
1227         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1228                     { },
1229                     "application-version": "unknown: no get_version()",
1230                     }
1231         d = get_versioned_remote_reference(helper, default)
1232         d.addCallback(self._got_versioned_helper)
1233
1234     def _got_versioned_helper(self, helper):
1235         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1236         if needed not in helper.version:
1237             raise InsufficientVersionError(needed, helper.version)
1238         self._helper = helper
1239         helper.notifyOnDisconnect(self._lost_helper)
1240
1241     def _lost_helper(self):
1242         self._helper = None
1243
1244     def get_helper_info(self):
1245         # return a tuple of (helper_furl_or_None, connected_bool)
1246         return (self._helper_furl, bool(self._helper))
1247
1248
1249     def upload(self, uploadable, history=None):
1250         """
1251         Returns a Deferred that will fire with the UploadResults instance.
1252         """
1253         assert self.parent
1254         assert self.running
1255
1256         uploadable = IUploadable(uploadable)
1257         d = uploadable.get_size()
1258         def _got_size(size):
1259             default_params = self.parent.get_encoding_parameters()
1260             precondition(isinstance(default_params, dict), default_params)
1261             precondition("max_segment_size" in default_params, default_params)
1262             uploadable.set_default_encoding_parameters(default_params)
1263
1264             if self.stats_provider:
1265                 self.stats_provider.count('uploader.files_uploaded', 1)
1266                 self.stats_provider.count('uploader.bytes_uploaded', size)
1267
1268             if size <= self.URI_LIT_SIZE_THRESHOLD:
1269                 uploader = LiteralUploader(self.parent)
1270                 return uploader.start(uploadable)
1271             else:
1272                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1273                 d2 = defer.succeed(None)
1274                 if self._helper:
1275                     uploader = AssistedUploader(self._helper)
1276                     d2.addCallback(lambda x: eu.get_storage_index())
1277                     d2.addCallback(lambda si: uploader.start(eu, si))
1278                 else:
1279                     uploader = CHKUploader(self.parent)
1280                     d2.addCallback(lambda x: uploader.start(eu))
1281
1282                 self._all_uploads[uploader] = None
1283                 if history:
1284                     history.add_upload(uploader.get_upload_status())
1285                 def turn_verifycap_into_read_cap(uploadresults):
1286                     # Generate the uri from the verifycap plus the key.
1287                     d3 = uploadable.get_encryption_key()
1288                     def put_readcap_into_results(key):
1289                         v = uri.from_string(uploadresults.verifycapstr)
1290                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1291                         uploadresults.uri = r.to_string()
1292                         return uploadresults
1293                     d3.addCallback(put_readcap_into_results)
1294                     return d3
1295                 d2.addCallback(turn_verifycap_into_read_cap)
1296                 return d2
1297         d.addCallback(_got_size)
1298         def _done(res):
1299             uploadable.close()
1300             return res
1301         d.addBoth(_done)
1302         return d