]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
hush pyflakes-0.4.0 warnings: remove trivial unused variables. For #900.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.rrefutil import add_version_to_remote_reference
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
20      NotEnoughSharesError, NoSharesError, NoServersError, \
21      InsufficientVersionError
22 from allmydata.immutable import layout
23 from pycryptopp.cipher.aes import AES
24
25 from cStringIO import StringIO
26
27
28 KiB=1024
29 MiB=1024*KiB
30 GiB=1024*MiB
31 TiB=1024*GiB
32 PiB=1024*TiB
33
34 class HaveAllPeersError(Exception):
35     # we use this to jump out of the loop
36     pass
37
38 # this wants to live in storage, not here
39 class TooFullError(Exception):
40     pass
41
42 class UploadResults(Copyable, RemoteCopy):
43     implements(IUploadResults)
44     # note: don't change this string, it needs to match the value used on the
45     # helper, and it does *not* need to match the fully-qualified
46     # package/module/class name
47     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
48     copytype = typeToCopy
49
50     # also, think twice about changing the shape of any existing attribute,
51     # because instances of this class are sent from the helper to its client,
52     # so changing this may break compatibility. Consider adding new fields
53     # instead of modifying existing ones.
54
55     def __init__(self):
56         self.timings = {} # dict of name to number of seconds
57         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
58         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
59         self.file_size = None
60         self.ciphertext_fetched = None # how much the helper fetched
61         self.uri = None
62         self.preexisting_shares = None # count of shares already present
63         self.pushed_shares = None # count of shares we pushed
64
65
66 # our current uri_extension is 846 bytes for small files, a few bytes
67 # more for larger ones (since the filesize is encoded in decimal in a
68 # few places). Ask for a little bit more just in case we need it. If
69 # the extension changes size, we can change EXTENSION_SIZE to
70 # allocate a more accurate amount of space.
71 EXTENSION_SIZE = 1000
72 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
73 # this.
74
75 class PeerTracker:
76     def __init__(self, peerid, storage_server,
77                  sharesize, blocksize, num_segments, num_share_hashes,
78                  storage_index,
79                  bucket_renewal_secret, bucket_cancel_secret):
80         precondition(isinstance(peerid, str), peerid)
81         precondition(len(peerid) == 20, peerid)
82         self.peerid = peerid
83         self._storageserver = storage_server # to an RIStorageServer
84         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
85         self.sharesize = sharesize
86
87         wbp = layout.make_write_bucket_proxy(None, sharesize,
88                                              blocksize, num_segments,
89                                              num_share_hashes,
90                                              EXTENSION_SIZE, peerid)
91         self.wbp_class = wbp.__class__ # to create more of them
92         self.allocated_size = wbp.get_allocated_size()
93         self.blocksize = blocksize
94         self.num_segments = num_segments
95         self.num_share_hashes = num_share_hashes
96         self.storage_index = storage_index
97
98         self.renew_secret = bucket_renewal_secret
99         self.cancel_secret = bucket_cancel_secret
100
101     def __repr__(self):
102         return ("<PeerTracker for peer %s and SI %s>"
103                 % (idlib.shortnodeid_b2a(self.peerid),
104                    si_b2a(self.storage_index)[:5]))
105
106     def query(self, sharenums):
107         d = self._storageserver.callRemote("allocate_buckets",
108                                            self.storage_index,
109                                            self.renew_secret,
110                                            self.cancel_secret,
111                                            sharenums,
112                                            self.allocated_size,
113                                            canary=Referenceable())
114         d.addCallback(self._got_reply)
115         return d
116
117     def _got_reply(self, (alreadygot, buckets)):
118         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
119         b = {}
120         for sharenum, rref in buckets.iteritems():
121             bp = self.wbp_class(rref, self.sharesize,
122                                 self.blocksize,
123                                 self.num_segments,
124                                 self.num_share_hashes,
125                                 EXTENSION_SIZE,
126                                 self.peerid)
127             b[sharenum] = bp
128         self.buckets.update(b)
129         return (alreadygot, set(b.keys()))
130
131 class Tahoe2PeerSelector:
132
133     def __init__(self, upload_id, logparent=None, upload_status=None):
134         self.upload_id = upload_id
135         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
136         self.error_count = 0
137         self.num_peers_contacted = 0
138         self.last_failure_msg = None
139         self._status = IUploadStatus(upload_status)
140         self._log_parent = log.msg("%s starting" % self, parent=logparent)
141
142     def __repr__(self):
143         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
144
145     def get_shareholders(self, storage_broker, secret_holder,
146                          storage_index, share_size, block_size,
147                          num_segments, total_shares, shares_of_happiness):
148         """
149         @return: (used_peers, already_peers), where used_peers is a set of
150                  PeerTracker instances that have agreed to hold some shares
151                  for us (the shnum is stashed inside the PeerTracker),
152                  and already_peers is a dict mapping shnum to a peer
153                  which claims to already have the share.
154         """
155
156         if self._status:
157             self._status.set_status("Contacting Peers..")
158
159         self.total_shares = total_shares
160         self.shares_of_happiness = shares_of_happiness
161
162         self.homeless_shares = range(total_shares)
163         # self.uncontacted_peers = list() # peers we haven't asked yet
164         self.contacted_peers = [] # peers worth asking again
165         self.contacted_peers2 = [] # peers that we have asked again
166         self._started_second_pass = False
167         self.use_peers = set() # PeerTrackers that have shares assigned to them
168         self.preexisting_shares = {} # sharenum -> peerid holding the share
169
170         peers = storage_broker.get_servers_for_index(storage_index)
171         if not peers:
172             raise NoServersError("client gave us zero peers")
173
174         # this needed_hashes computation should mirror
175         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
176         # (instead of a HashTree) because we don't require actual hashing
177         # just to count the levels.
178         ht = hashtree.IncompleteHashTree(total_shares)
179         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
180
181         # figure out how much space to ask for
182         wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
183                                              num_share_hashes, EXTENSION_SIZE,
184                                              None)
185         allocated_size = wbp.get_allocated_size()
186
187         # filter the list of peers according to which ones can accomodate
188         # this request. This excludes older peers (which used a 4-byte size
189         # field) from getting large shares (for files larger than about
190         # 12GiB). See #439 for details.
191         def _get_maxsize(peer):
192             (peerid, conn) = peer
193             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
194             return v1["maximum-immutable-share-size"]
195         peers = [peer for peer in peers
196                  if _get_maxsize(peer) >= allocated_size]
197         if not peers:
198             raise NoServersError("no peers could accept an allocated_size of %d" % allocated_size)
199
200         # decide upon the renewal/cancel secrets, to include them in the
201         # allocate_buckets query.
202         client_renewal_secret = secret_holder.get_renewal_secret()
203         client_cancel_secret = secret_holder.get_cancel_secret()
204
205         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
206                                                        storage_index)
207         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
208                                                      storage_index)
209
210         trackers = [ PeerTracker(peerid, conn,
211                                  share_size, block_size,
212                                  num_segments, num_share_hashes,
213                                  storage_index,
214                                  bucket_renewal_secret_hash(file_renewal_secret,
215                                                             peerid),
216                                  bucket_cancel_secret_hash(file_cancel_secret,
217                                                            peerid),
218                                  )
219                      for (peerid, conn) in peers ]
220         self.uncontacted_peers = trackers
221
222         d = defer.maybeDeferred(self._loop)
223         return d
224
225     def _loop(self):
226         if not self.homeless_shares:
227             # all done
228             msg = ("placed all %d shares, "
229                    "sent %d queries to %d peers, "
230                    "%d queries placed some shares, %d placed none, "
231                    "got %d errors" %
232                    (self.total_shares,
233                     self.query_count, self.num_peers_contacted,
234                     self.good_query_count, self.bad_query_count,
235                     self.error_count))
236             log.msg("peer selection successful for %s: %s" % (self, msg),
237                     parent=self._log_parent)
238             return (self.use_peers, self.preexisting_shares)
239
240         if self.uncontacted_peers:
241             peer = self.uncontacted_peers.pop(0)
242             # TODO: don't pre-convert all peerids to PeerTrackers
243             assert isinstance(peer, PeerTracker)
244
245             shares_to_ask = set([self.homeless_shares.pop(0)])
246             self.query_count += 1
247             self.num_peers_contacted += 1
248             if self._status:
249                 self._status.set_status("Contacting Peers [%s] (first query),"
250                                         " %d shares left.."
251                                         % (idlib.shortnodeid_b2a(peer.peerid),
252                                            len(self.homeless_shares)))
253             d = peer.query(shares_to_ask)
254             d.addBoth(self._got_response, peer, shares_to_ask,
255                       self.contacted_peers)
256             return d
257         elif self.contacted_peers:
258             # ask a peer that we've already asked.
259             if not self._started_second_pass:
260                 log.msg("starting second pass", parent=self._log_parent,
261                         level=log.NOISY)
262                 self._started_second_pass = True
263             num_shares = mathutil.div_ceil(len(self.homeless_shares),
264                                            len(self.contacted_peers))
265             peer = self.contacted_peers.pop(0)
266             shares_to_ask = set(self.homeless_shares[:num_shares])
267             self.homeless_shares[:num_shares] = []
268             self.query_count += 1
269             if self._status:
270                 self._status.set_status("Contacting Peers [%s] (second query),"
271                                         " %d shares left.."
272                                         % (idlib.shortnodeid_b2a(peer.peerid),
273                                            len(self.homeless_shares)))
274             d = peer.query(shares_to_ask)
275             d.addBoth(self._got_response, peer, shares_to_ask,
276                       self.contacted_peers2)
277             return d
278         elif self.contacted_peers2:
279             # we've finished the second-or-later pass. Move all the remaining
280             # peers back into self.contacted_peers for the next pass.
281             self.contacted_peers.extend(self.contacted_peers2)
282             self.contacted_peers2[:] = []
283             return self._loop()
284         else:
285             # no more peers. If we haven't placed enough shares, we fail.
286             placed_shares = self.total_shares - len(self.homeless_shares)
287             if placed_shares < self.shares_of_happiness:
288                 msg = ("placed %d shares out of %d total (%d homeless), "
289                        "want to place %d, "
290                        "sent %d queries to %d peers, "
291                        "%d queries placed some shares, %d placed none, "
292                        "got %d errors" %
293                        (self.total_shares - len(self.homeless_shares),
294                         self.total_shares, len(self.homeless_shares),
295                         self.shares_of_happiness,
296                         self.query_count, self.num_peers_contacted,
297                         self.good_query_count, self.bad_query_count,
298                         self.error_count))
299                 msg = "peer selection failed for %s: %s" % (self, msg)
300                 if self.last_failure_msg:
301                     msg += " (%s)" % (self.last_failure_msg,)
302                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
303                 if placed_shares:
304                     raise NotEnoughSharesError(msg)
305                 else:
306                     raise NoSharesError(msg)
307             else:
308                 # we placed enough to be happy, so we're done
309                 if self._status:
310                     self._status.set_status("Placed all shares")
311                 return self.use_peers
312
313     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
314         if isinstance(res, failure.Failure):
315             # This is unusual, and probably indicates a bug or a network
316             # problem.
317             log.msg("%s got error during peer selection: %s" % (peer, res),
318                     level=log.UNUSUAL, parent=self._log_parent)
319             self.error_count += 1
320             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
321             if (self.uncontacted_peers
322                 or self.contacted_peers
323                 or self.contacted_peers2):
324                 # there is still hope, so just loop
325                 pass
326             else:
327                 # No more peers, so this upload might fail (it depends upon
328                 # whether we've hit shares_of_happiness or not). Log the last
329                 # failure we got: if a coding error causes all peers to fail
330                 # in the same way, this allows the common failure to be seen
331                 # by the uploader and should help with debugging
332                 msg = ("last failure (from %s) was: %s" % (peer, res))
333                 self.last_failure_msg = msg
334         else:
335             (alreadygot, allocated) = res
336             log.msg("response from peer %s: alreadygot=%s, allocated=%s"
337                     % (idlib.shortnodeid_b2a(peer.peerid),
338                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
339                     level=log.NOISY, parent=self._log_parent)
340             progress = False
341             for s in alreadygot:
342                 self.preexisting_shares[s] = peer.peerid
343                 if s in self.homeless_shares:
344                     self.homeless_shares.remove(s)
345                     progress = True
346
347             # the PeerTracker will remember which shares were allocated on
348             # that peer. We just have to remember to use them.
349             if allocated:
350                 self.use_peers.add(peer)
351                 progress = True
352
353             not_yet_present = set(shares_to_ask) - set(alreadygot)
354             still_homeless = not_yet_present - set(allocated)
355
356             if progress:
357                 # they accepted or already had at least one share, so
358                 # progress has been made
359                 self.good_query_count += 1
360             else:
361                 self.bad_query_count += 1
362
363             if still_homeless:
364                 # In networks with lots of space, this is very unusual and
365                 # probably indicates an error. In networks with peers that
366                 # are full, it is merely unusual. In networks that are very
367                 # full, it is common, and many uploads will fail. In most
368                 # cases, this is obviously not fatal, and we'll just use some
369                 # other peers.
370
371                 # some shares are still homeless, keep trying to find them a
372                 # home. The ones that were rejected get first priority.
373                 self.homeless_shares = (list(still_homeless)
374                                         + self.homeless_shares)
375                 # Since they were unable to accept all of our requests, so it
376                 # is safe to assume that asking them again won't help.
377             else:
378                 # if they *were* able to accept everything, they might be
379                 # willing to accept even more.
380                 put_peer_here.append(peer)
381
382         # now loop
383         return self._loop()
384
385
386 class EncryptAnUploadable:
387     """This is a wrapper that takes an IUploadable and provides
388     IEncryptedUploadable."""
389     implements(IEncryptedUploadable)
390     CHUNKSIZE = 50*1024
391
392     def __init__(self, original, log_parent=None):
393         self.original = IUploadable(original)
394         self._log_number = log_parent
395         self._encryptor = None
396         self._plaintext_hasher = plaintext_hasher()
397         self._plaintext_segment_hasher = None
398         self._plaintext_segment_hashes = []
399         self._encoding_parameters = None
400         self._file_size = None
401         self._ciphertext_bytes_read = 0
402         self._status = None
403
404     def set_upload_status(self, upload_status):
405         self._status = IUploadStatus(upload_status)
406         self.original.set_upload_status(upload_status)
407
408     def log(self, *args, **kwargs):
409         if "facility" not in kwargs:
410             kwargs["facility"] = "upload.encryption"
411         if "parent" not in kwargs:
412             kwargs["parent"] = self._log_number
413         return log.msg(*args, **kwargs)
414
415     def get_size(self):
416         if self._file_size is not None:
417             return defer.succeed(self._file_size)
418         d = self.original.get_size()
419         def _got_size(size):
420             self._file_size = size
421             if self._status:
422                 self._status.set_size(size)
423             return size
424         d.addCallback(_got_size)
425         return d
426
427     def get_all_encoding_parameters(self):
428         if self._encoding_parameters is not None:
429             return defer.succeed(self._encoding_parameters)
430         d = self.original.get_all_encoding_parameters()
431         def _got(encoding_parameters):
432             (k, happy, n, segsize) = encoding_parameters
433             self._segment_size = segsize # used by segment hashers
434             self._encoding_parameters = encoding_parameters
435             self.log("my encoding parameters: %s" % (encoding_parameters,),
436                      level=log.NOISY)
437             return encoding_parameters
438         d.addCallback(_got)
439         return d
440
441     def _get_encryptor(self):
442         if self._encryptor:
443             return defer.succeed(self._encryptor)
444
445         d = self.original.get_encryption_key()
446         def _got(key):
447             e = AES(key)
448             self._encryptor = e
449
450             storage_index = storage_index_hash(key)
451             assert isinstance(storage_index, str)
452             # There's no point to having the SI be longer than the key, so we
453             # specify that it is truncated to the same 128 bits as the AES key.
454             assert len(storage_index) == 16  # SHA-256 truncated to 128b
455             self._storage_index = storage_index
456             if self._status:
457                 self._status.set_storage_index(storage_index)
458             return e
459         d.addCallback(_got)
460         return d
461
462     def get_storage_index(self):
463         d = self._get_encryptor()
464         d.addCallback(lambda res: self._storage_index)
465         return d
466
467     def _get_segment_hasher(self):
468         p = self._plaintext_segment_hasher
469         if p:
470             left = self._segment_size - self._plaintext_segment_hashed_bytes
471             return p, left
472         p = plaintext_segment_hasher()
473         self._plaintext_segment_hasher = p
474         self._plaintext_segment_hashed_bytes = 0
475         return p, self._segment_size
476
477     def _update_segment_hash(self, chunk):
478         offset = 0
479         while offset < len(chunk):
480             p, segment_left = self._get_segment_hasher()
481             chunk_left = len(chunk) - offset
482             this_segment = min(chunk_left, segment_left)
483             p.update(chunk[offset:offset+this_segment])
484             self._plaintext_segment_hashed_bytes += this_segment
485
486             if self._plaintext_segment_hashed_bytes == self._segment_size:
487                 # we've filled this segment
488                 self._plaintext_segment_hashes.append(p.digest())
489                 self._plaintext_segment_hasher = None
490                 self.log("closed hash [%d]: %dB" %
491                          (len(self._plaintext_segment_hashes)-1,
492                           self._plaintext_segment_hashed_bytes),
493                          level=log.NOISY)
494                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
495                          segnum=len(self._plaintext_segment_hashes)-1,
496                          hash=base32.b2a(p.digest()),
497                          level=log.NOISY)
498
499             offset += this_segment
500
501
502     def read_encrypted(self, length, hash_only):
503         # make sure our parameters have been set up first
504         d = self.get_all_encoding_parameters()
505         # and size
506         d.addCallback(lambda ignored: self.get_size())
507         d.addCallback(lambda ignored: self._get_encryptor())
508         # then fetch and encrypt the plaintext. The unusual structure here
509         # (passing a Deferred *into* a function) is needed to avoid
510         # overflowing the stack: Deferreds don't optimize out tail recursion.
511         # We also pass in a list, to which _read_encrypted will append
512         # ciphertext.
513         ciphertext = []
514         d2 = defer.Deferred()
515         d.addCallback(lambda ignored:
516                       self._read_encrypted(length, ciphertext, hash_only, d2))
517         d.addCallback(lambda ignored: d2)
518         return d
519
520     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
521         if not remaining:
522             fire_when_done.callback(ciphertext)
523             return None
524         # tolerate large length= values without consuming a lot of RAM by
525         # reading just a chunk (say 50kB) at a time. This only really matters
526         # when hash_only==True (i.e. resuming an interrupted upload), since
527         # that's the case where we will be skipping over a lot of data.
528         size = min(remaining, self.CHUNKSIZE)
529         remaining = remaining - size
530         # read a chunk of plaintext..
531         d = defer.maybeDeferred(self.original.read, size)
532         # N.B.: if read() is synchronous, then since everything else is
533         # actually synchronous too, we'd blow the stack unless we stall for a
534         # tick. Once you accept a Deferred from IUploadable.read(), you must
535         # be prepared to have it fire immediately too.
536         d.addCallback(fireEventually)
537         def _good(plaintext):
538             # and encrypt it..
539             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
540             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
541             ciphertext.extend(ct)
542             self._read_encrypted(remaining, ciphertext, hash_only,
543                                  fire_when_done)
544         def _err(why):
545             fire_when_done.errback(why)
546         d.addCallback(_good)
547         d.addErrback(_err)
548         return None
549
550     def _hash_and_encrypt_plaintext(self, data, hash_only):
551         assert isinstance(data, (tuple, list)), type(data)
552         data = list(data)
553         cryptdata = []
554         # we use data.pop(0) instead of 'for chunk in data' to save
555         # memory: each chunk is destroyed as soon as we're done with it.
556         bytes_processed = 0
557         while data:
558             chunk = data.pop(0)
559             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
560                      level=log.NOISY)
561             bytes_processed += len(chunk)
562             self._plaintext_hasher.update(chunk)
563             self._update_segment_hash(chunk)
564             # TODO: we have to encrypt the data (even if hash_only==True)
565             # because pycryptopp's AES-CTR implementation doesn't offer a
566             # way to change the counter value. Once pycryptopp acquires
567             # this ability, change this to simply update the counter
568             # before each call to (hash_only==False) _encryptor.process()
569             ciphertext = self._encryptor.process(chunk)
570             if hash_only:
571                 self.log("  skipping encryption", level=log.NOISY)
572             else:
573                 cryptdata.append(ciphertext)
574             del ciphertext
575             del chunk
576         self._ciphertext_bytes_read += bytes_processed
577         if self._status:
578             progress = float(self._ciphertext_bytes_read) / self._file_size
579             self._status.set_progress(1, progress)
580         return cryptdata
581
582
583     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
584         # this is currently unused, but will live again when we fix #453
585         if len(self._plaintext_segment_hashes) < num_segments:
586             # close out the last one
587             assert len(self._plaintext_segment_hashes) == num_segments-1
588             p, segment_left = self._get_segment_hasher()
589             self._plaintext_segment_hashes.append(p.digest())
590             del self._plaintext_segment_hasher
591             self.log("closing plaintext leaf hasher, hashed %d bytes" %
592                      self._plaintext_segment_hashed_bytes,
593                      level=log.NOISY)
594             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
595                      segnum=len(self._plaintext_segment_hashes)-1,
596                      hash=base32.b2a(p.digest()),
597                      level=log.NOISY)
598         assert len(self._plaintext_segment_hashes) == num_segments
599         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
600
601     def get_plaintext_hash(self):
602         h = self._plaintext_hasher.digest()
603         return defer.succeed(h)
604
605     def close(self):
606         return self.original.close()
607
608 class UploadStatus:
609     implements(IUploadStatus)
610     statusid_counter = itertools.count(0)
611
612     def __init__(self):
613         self.storage_index = None
614         self.size = None
615         self.helper = False
616         self.status = "Not started"
617         self.progress = [0.0, 0.0, 0.0]
618         self.active = True
619         self.results = None
620         self.counter = self.statusid_counter.next()
621         self.started = time.time()
622
623     def get_started(self):
624         return self.started
625     def get_storage_index(self):
626         return self.storage_index
627     def get_size(self):
628         return self.size
629     def using_helper(self):
630         return self.helper
631     def get_status(self):
632         return self.status
633     def get_progress(self):
634         return tuple(self.progress)
635     def get_active(self):
636         return self.active
637     def get_results(self):
638         return self.results
639     def get_counter(self):
640         return self.counter
641
642     def set_storage_index(self, si):
643         self.storage_index = si
644     def set_size(self, size):
645         self.size = size
646     def set_helper(self, helper):
647         self.helper = helper
648     def set_status(self, status):
649         self.status = status
650     def set_progress(self, which, value):
651         # [0]: chk, [1]: ciphertext, [2]: encode+push
652         self.progress[which] = value
653     def set_active(self, value):
654         self.active = value
655     def set_results(self, value):
656         self.results = value
657
658 class CHKUploader:
659     peer_selector_class = Tahoe2PeerSelector
660
661     def __init__(self, storage_broker, secret_holder):
662         # peer_selector needs storage_broker and secret_holder
663         self._storage_broker = storage_broker
664         self._secret_holder = secret_holder
665         self._log_number = self.log("CHKUploader starting", parent=None)
666         self._encoder = None
667         self._results = UploadResults()
668         self._storage_index = None
669         self._upload_status = UploadStatus()
670         self._upload_status.set_helper(False)
671         self._upload_status.set_active(True)
672         self._upload_status.set_results(self._results)
673
674         # locate_all_shareholders() will create the following attribute:
675         # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
676
677     def log(self, *args, **kwargs):
678         if "parent" not in kwargs:
679             kwargs["parent"] = self._log_number
680         if "facility" not in kwargs:
681             kwargs["facility"] = "tahoe.upload"
682         return log.msg(*args, **kwargs)
683
684     def start(self, encrypted_uploadable):
685         """Start uploading the file.
686
687         Returns a Deferred that will fire with the UploadResults instance.
688         """
689
690         self._started = time.time()
691         eu = IEncryptedUploadable(encrypted_uploadable)
692         self.log("starting upload of %s" % eu)
693
694         eu.set_upload_status(self._upload_status)
695         d = self.start_encrypted(eu)
696         def _done(uploadresults):
697             self._upload_status.set_active(False)
698             return uploadresults
699         d.addBoth(_done)
700         return d
701
702     def abort(self):
703         """Call this if the upload must be abandoned before it completes.
704         This will tell the shareholders to delete their partial shares. I
705         return a Deferred that fires when these messages have been acked."""
706         if not self._encoder:
707             # how did you call abort() before calling start() ?
708             return defer.succeed(None)
709         return self._encoder.abort()
710
711     def start_encrypted(self, encrypted):
712         """ Returns a Deferred that will fire with the UploadResults instance. """
713         eu = IEncryptedUploadable(encrypted)
714
715         started = time.time()
716         self._encoder = e = encode.Encoder(self._log_number,
717                                            self._upload_status)
718         d = e.set_encrypted_uploadable(eu)
719         d.addCallback(self.locate_all_shareholders, started)
720         d.addCallback(self.set_shareholders, e)
721         d.addCallback(lambda res: e.start())
722         d.addCallback(self._encrypted_done)
723         return d
724
725     def locate_all_shareholders(self, encoder, started):
726         peer_selection_started = now = time.time()
727         self._storage_index_elapsed = now - started
728         storage_broker = self._storage_broker
729         secret_holder = self._secret_holder
730         storage_index = encoder.get_param("storage_index")
731         self._storage_index = storage_index
732         upload_id = si_b2a(storage_index)[:5]
733         self.log("using storage index %s" % upload_id)
734         peer_selector = self.peer_selector_class(upload_id, self._log_number,
735                                                  self._upload_status)
736
737         share_size = encoder.get_param("share_size")
738         block_size = encoder.get_param("block_size")
739         num_segments = encoder.get_param("num_segments")
740         k,desired,n = encoder.get_param("share_counts")
741
742         self._peer_selection_started = time.time()
743         d = peer_selector.get_shareholders(storage_broker, secret_holder,
744                                            storage_index,
745                                            share_size, block_size,
746                                            num_segments, n, desired)
747         def _done(res):
748             self._peer_selection_elapsed = time.time() - peer_selection_started
749             return res
750         d.addCallback(_done)
751         return d
752
753     def set_shareholders(self, (used_peers, already_peers), encoder):
754         """
755         @param used_peers: a sequence of PeerTracker objects
756         @paran already_peers: a dict mapping sharenum to a peerid that
757                               claims to already have this share
758         """
759         self.log("_send_shares, used_peers is %s" % (used_peers,))
760         # record already-present shares in self._results
761         self._results.preexisting_shares = len(already_peers)
762
763         self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
764         for peer in used_peers:
765             assert isinstance(peer, PeerTracker)
766         buckets = {}
767         for peer in used_peers:
768             buckets.update(peer.buckets)
769             for shnum in peer.buckets:
770                 self._peer_trackers[shnum] = peer
771         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
772         encoder.set_shareholders(buckets)
773
774     def _encrypted_done(self, verifycap):
775         """ Returns a Deferred that will fire with the UploadResults instance. """
776         r = self._results
777         for shnum in self._encoder.get_shares_placed():
778             peer_tracker = self._peer_trackers[shnum]
779             peerid = peer_tracker.peerid
780             r.sharemap.add(shnum, peerid)
781             r.servermap.add(peerid, shnum)
782         r.pushed_shares = len(self._encoder.get_shares_placed())
783         now = time.time()
784         r.file_size = self._encoder.file_size
785         r.timings["total"] = now - self._started
786         r.timings["storage_index"] = self._storage_index_elapsed
787         r.timings["peer_selection"] = self._peer_selection_elapsed
788         r.timings.update(self._encoder.get_times())
789         r.uri_extension_data = self._encoder.get_uri_extension_data()
790         r.verifycapstr = verifycap.to_string()
791         return r
792
793     def get_upload_status(self):
794         return self._upload_status
795
796 def read_this_many_bytes(uploadable, size, prepend_data=[]):
797     if size == 0:
798         return defer.succeed([])
799     d = uploadable.read(size)
800     def _got(data):
801         assert isinstance(data, list)
802         bytes = sum([len(piece) for piece in data])
803         assert bytes > 0
804         assert bytes <= size
805         remaining = size - bytes
806         if remaining:
807             return read_this_many_bytes(uploadable, remaining,
808                                         prepend_data + data)
809         return prepend_data + data
810     d.addCallback(_got)
811     return d
812
813 class LiteralUploader:
814
815     def __init__(self):
816         self._results = UploadResults()
817         self._status = s = UploadStatus()
818         s.set_storage_index(None)
819         s.set_helper(False)
820         s.set_progress(0, 1.0)
821         s.set_active(False)
822         s.set_results(self._results)
823
824     def start(self, uploadable):
825         uploadable = IUploadable(uploadable)
826         d = uploadable.get_size()
827         def _got_size(size):
828             self._size = size
829             self._status.set_size(size)
830             self._results.file_size = size
831             return read_this_many_bytes(uploadable, size)
832         d.addCallback(_got_size)
833         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
834         d.addCallback(lambda u: u.to_string())
835         d.addCallback(self._build_results)
836         return d
837
838     def _build_results(self, uri):
839         self._results.uri = uri
840         self._status.set_status("Finished")
841         self._status.set_progress(1, 1.0)
842         self._status.set_progress(2, 1.0)
843         return self._results
844
845     def close(self):
846         pass
847
848     def get_upload_status(self):
849         return self._status
850
851 class RemoteEncryptedUploadable(Referenceable):
852     implements(RIEncryptedUploadable)
853
854     def __init__(self, encrypted_uploadable, upload_status):
855         self._eu = IEncryptedUploadable(encrypted_uploadable)
856         self._offset = 0
857         self._bytes_sent = 0
858         self._status = IUploadStatus(upload_status)
859         # we are responsible for updating the status string while we run, and
860         # for setting the ciphertext-fetch progress.
861         self._size = None
862
863     def get_size(self):
864         if self._size is not None:
865             return defer.succeed(self._size)
866         d = self._eu.get_size()
867         def _got_size(size):
868             self._size = size
869             return size
870         d.addCallback(_got_size)
871         return d
872
873     def remote_get_size(self):
874         return self.get_size()
875     def remote_get_all_encoding_parameters(self):
876         return self._eu.get_all_encoding_parameters()
877
878     def _read_encrypted(self, length, hash_only):
879         d = self._eu.read_encrypted(length, hash_only)
880         def _read(strings):
881             if hash_only:
882                 self._offset += length
883             else:
884                 size = sum([len(data) for data in strings])
885                 self._offset += size
886             return strings
887         d.addCallback(_read)
888         return d
889
890     def remote_read_encrypted(self, offset, length):
891         # we don't support seek backwards, but we allow skipping forwards
892         precondition(offset >= 0, offset)
893         precondition(length >= 0, length)
894         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
895                      level=log.NOISY)
896         precondition(offset >= self._offset, offset, self._offset)
897         if offset > self._offset:
898             # read the data from disk anyways, to build up the hash tree
899             skip = offset - self._offset
900             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
901                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
902             d = self._read_encrypted(skip, hash_only=True)
903         else:
904             d = defer.succeed(None)
905
906         def _at_correct_offset(res):
907             assert offset == self._offset, "%d != %d" % (offset, self._offset)
908             return self._read_encrypted(length, hash_only=False)
909         d.addCallback(_at_correct_offset)
910
911         def _read(strings):
912             size = sum([len(data) for data in strings])
913             self._bytes_sent += size
914             return strings
915         d.addCallback(_read)
916         return d
917
918     def remote_close(self):
919         return self._eu.close()
920
921
922 class AssistedUploader:
923
924     def __init__(self, helper):
925         self._helper = helper
926         self._log_number = log.msg("AssistedUploader starting")
927         self._storage_index = None
928         self._upload_status = s = UploadStatus()
929         s.set_helper(True)
930         s.set_active(True)
931
932     def log(self, *args, **kwargs):
933         if "parent" not in kwargs:
934             kwargs["parent"] = self._log_number
935         return log.msg(*args, **kwargs)
936
937     def start(self, encrypted_uploadable, storage_index):
938         """Start uploading the file.
939
940         Returns a Deferred that will fire with the UploadResults instance.
941         """
942         precondition(isinstance(storage_index, str), storage_index)
943         self._started = time.time()
944         eu = IEncryptedUploadable(encrypted_uploadable)
945         eu.set_upload_status(self._upload_status)
946         self._encuploadable = eu
947         self._storage_index = storage_index
948         d = eu.get_size()
949         d.addCallback(self._got_size)
950         d.addCallback(lambda res: eu.get_all_encoding_parameters())
951         d.addCallback(self._got_all_encoding_parameters)
952         d.addCallback(self._contact_helper)
953         d.addCallback(self._build_verifycap)
954         def _done(res):
955             self._upload_status.set_active(False)
956             return res
957         d.addBoth(_done)
958         return d
959
960     def _got_size(self, size):
961         self._size = size
962         self._upload_status.set_size(size)
963
964     def _got_all_encoding_parameters(self, params):
965         k, happy, n, segment_size = params
966         # stash these for URI generation later
967         self._needed_shares = k
968         self._total_shares = n
969         self._segment_size = segment_size
970
971     def _contact_helper(self, res):
972         now = self._time_contacting_helper_start = time.time()
973         self._storage_index_elapsed = now - self._started
974         self.log(format="contacting helper for SI %(si)s..",
975                  si=si_b2a(self._storage_index))
976         self._upload_status.set_status("Contacting Helper")
977         d = self._helper.callRemote("upload_chk", self._storage_index)
978         d.addCallback(self._contacted_helper)
979         return d
980
981     def _contacted_helper(self, (upload_results, upload_helper)):
982         now = time.time()
983         elapsed = now - self._time_contacting_helper_start
984         self._elapsed_time_contacting_helper = elapsed
985         if upload_helper:
986             self.log("helper says we need to upload")
987             self._upload_status.set_status("Uploading Ciphertext")
988             # we need to upload the file
989             reu = RemoteEncryptedUploadable(self._encuploadable,
990                                             self._upload_status)
991             # let it pre-compute the size for progress purposes
992             d = reu.get_size()
993             d.addCallback(lambda ignored:
994                           upload_helper.callRemote("upload", reu))
995             # this Deferred will fire with the upload results
996             return d
997         self.log("helper says file is already uploaded")
998         self._upload_status.set_progress(1, 1.0)
999         self._upload_status.set_results(upload_results)
1000         return upload_results
1001
1002     def _convert_old_upload_results(self, upload_results):
1003         # pre-1.3.0 helpers return upload results which contain a mapping
1004         # from shnum to a single human-readable string, containing things
1005         # like "Found on [x],[y],[z]" (for healthy files that were already in
1006         # the grid), "Found on [x]" (for files that needed upload but which
1007         # discovered pre-existing shares), and "Placed on [x]" (for newly
1008         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1009         # set of binary serverid strings.
1010
1011         # the old results are too hard to deal with (they don't even contain
1012         # as much information as the new results, since the nodeids are
1013         # abbreviated), so if we detect old results, just clobber them.
1014
1015         sharemap = upload_results.sharemap
1016         if str in [type(v) for v in sharemap.values()]:
1017             upload_results.sharemap = None
1018
1019     def _build_verifycap(self, upload_results):
1020         self.log("upload finished, building readcap")
1021         self._convert_old_upload_results(upload_results)
1022         self._upload_status.set_status("Building Readcap")
1023         r = upload_results
1024         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1025         assert r.uri_extension_data["total_shares"] == self._total_shares
1026         assert r.uri_extension_data["segment_size"] == self._segment_size
1027         assert r.uri_extension_data["size"] == self._size
1028         r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1029                                              uri_extension_hash=r.uri_extension_hash,
1030                                              needed_shares=self._needed_shares,
1031                                              total_shares=self._total_shares, size=self._size
1032                                              ).to_string()
1033         now = time.time()
1034         r.file_size = self._size
1035         r.timings["storage_index"] = self._storage_index_elapsed
1036         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1037         if "total" in r.timings:
1038             r.timings["helper_total"] = r.timings["total"]
1039         r.timings["total"] = now - self._started
1040         self._upload_status.set_status("Finished")
1041         self._upload_status.set_results(r)
1042         return r
1043
1044     def get_upload_status(self):
1045         return self._upload_status
1046
1047 class BaseUploadable:
1048     default_max_segment_size = 128*KiB # overridden by max_segment_size
1049     default_encoding_param_k = 3 # overridden by encoding_parameters
1050     default_encoding_param_happy = 7
1051     default_encoding_param_n = 10
1052
1053     max_segment_size = None
1054     encoding_param_k = None
1055     encoding_param_happy = None
1056     encoding_param_n = None
1057
1058     _all_encoding_parameters = None
1059     _status = None
1060
1061     def set_upload_status(self, upload_status):
1062         self._status = IUploadStatus(upload_status)
1063
1064     def set_default_encoding_parameters(self, default_params):
1065         assert isinstance(default_params, dict)
1066         for k,v in default_params.items():
1067             precondition(isinstance(k, str), k, v)
1068             precondition(isinstance(v, int), k, v)
1069         if "k" in default_params:
1070             self.default_encoding_param_k = default_params["k"]
1071         if "happy" in default_params:
1072             self.default_encoding_param_happy = default_params["happy"]
1073         if "n" in default_params:
1074             self.default_encoding_param_n = default_params["n"]
1075         if "max_segment_size" in default_params:
1076             self.default_max_segment_size = default_params["max_segment_size"]
1077
1078     def get_all_encoding_parameters(self):
1079         if self._all_encoding_parameters:
1080             return defer.succeed(self._all_encoding_parameters)
1081
1082         max_segsize = self.max_segment_size or self.default_max_segment_size
1083         k = self.encoding_param_k or self.default_encoding_param_k
1084         happy = self.encoding_param_happy or self.default_encoding_param_happy
1085         n = self.encoding_param_n or self.default_encoding_param_n
1086
1087         d = self.get_size()
1088         def _got_size(file_size):
1089             # for small files, shrink the segment size to avoid wasting space
1090             segsize = min(max_segsize, file_size)
1091             # this must be a multiple of 'required_shares'==k
1092             segsize = mathutil.next_multiple(segsize, k)
1093             encoding_parameters = (k, happy, n, segsize)
1094             self._all_encoding_parameters = encoding_parameters
1095             return encoding_parameters
1096         d.addCallback(_got_size)
1097         return d
1098
1099 class FileHandle(BaseUploadable):
1100     implements(IUploadable)
1101
1102     def __init__(self, filehandle, convergence):
1103         """
1104         Upload the data from the filehandle.  If convergence is None then a
1105         random encryption key will be used, else the plaintext will be hashed,
1106         then the hash will be hashed together with the string in the
1107         "convergence" argument to form the encryption key.
1108         """
1109         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1110         self._filehandle = filehandle
1111         self._key = None
1112         self.convergence = convergence
1113         self._size = None
1114
1115     def _get_encryption_key_convergent(self):
1116         if self._key is not None:
1117             return defer.succeed(self._key)
1118
1119         d = self.get_size()
1120         # that sets self._size as a side-effect
1121         d.addCallback(lambda size: self.get_all_encoding_parameters())
1122         def _got(params):
1123             k, happy, n, segsize = params
1124             f = self._filehandle
1125             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1126             f.seek(0)
1127             BLOCKSIZE = 64*1024
1128             bytes_read = 0
1129             while True:
1130                 data = f.read(BLOCKSIZE)
1131                 if not data:
1132                     break
1133                 enckey_hasher.update(data)
1134                 # TODO: setting progress in a non-yielding loop is kind of
1135                 # pointless, but I'm anticipating (perhaps prematurely) the
1136                 # day when we use a slowjob or twisted's CooperatorService to
1137                 # make this yield time to other jobs.
1138                 bytes_read += len(data)
1139                 if self._status:
1140                     self._status.set_progress(0, float(bytes_read)/self._size)
1141             f.seek(0)
1142             self._key = enckey_hasher.digest()
1143             if self._status:
1144                 self._status.set_progress(0, 1.0)
1145             assert len(self._key) == 16
1146             return self._key
1147         d.addCallback(_got)
1148         return d
1149
1150     def _get_encryption_key_random(self):
1151         if self._key is None:
1152             self._key = os.urandom(16)
1153         return defer.succeed(self._key)
1154
1155     def get_encryption_key(self):
1156         if self.convergence is not None:
1157             return self._get_encryption_key_convergent()
1158         else:
1159             return self._get_encryption_key_random()
1160
1161     def get_size(self):
1162         if self._size is not None:
1163             return defer.succeed(self._size)
1164         self._filehandle.seek(0,2)
1165         size = self._filehandle.tell()
1166         self._size = size
1167         self._filehandle.seek(0)
1168         return defer.succeed(size)
1169
1170     def read(self, length):
1171         return defer.succeed([self._filehandle.read(length)])
1172
1173     def close(self):
1174         # the originator of the filehandle reserves the right to close it
1175         pass
1176
1177 class FileName(FileHandle):
1178     def __init__(self, filename, convergence):
1179         """
1180         Upload the data from the filename.  If convergence is None then a
1181         random encryption key will be used, else the plaintext will be hashed,
1182         then the hash will be hashed together with the string in the
1183         "convergence" argument to form the encryption key.
1184         """
1185         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1186         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1187     def close(self):
1188         FileHandle.close(self)
1189         self._filehandle.close()
1190
1191 class Data(FileHandle):
1192     def __init__(self, data, convergence):
1193         """
1194         Upload the data from the data argument.  If convergence is None then a
1195         random encryption key will be used, else the plaintext will be hashed,
1196         then the hash will be hashed together with the string in the
1197         "convergence" argument to form the encryption key.
1198         """
1199         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1200         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1201
1202 class Uploader(service.MultiService, log.PrefixingLogMixin):
1203     """I am a service that allows file uploading. I am a service-child of the
1204     Client.
1205     """
1206     implements(IUploader)
1207     name = "uploader"
1208     URI_LIT_SIZE_THRESHOLD = 55
1209
1210     def __init__(self, helper_furl=None, stats_provider=None):
1211         self._helper_furl = helper_furl
1212         self.stats_provider = stats_provider
1213         self._helper = None
1214         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1215         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1216         service.MultiService.__init__(self)
1217
1218     def startService(self):
1219         service.MultiService.startService(self)
1220         if self._helper_furl:
1221             self.parent.tub.connectTo(self._helper_furl,
1222                                       self._got_helper)
1223
1224     def _got_helper(self, helper):
1225         self.log("got helper connection, getting versions")
1226         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1227                     { },
1228                     "application-version": "unknown: no get_version()",
1229                     }
1230         d = add_version_to_remote_reference(helper, default)
1231         d.addCallback(self._got_versioned_helper)
1232
1233     def _got_versioned_helper(self, helper):
1234         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1235         if needed not in helper.version:
1236             raise InsufficientVersionError(needed, helper.version)
1237         self._helper = helper
1238         helper.notifyOnDisconnect(self._lost_helper)
1239
1240     def _lost_helper(self):
1241         self._helper = None
1242
1243     def get_helper_info(self):
1244         # return a tuple of (helper_furl_or_None, connected_bool)
1245         return (self._helper_furl, bool(self._helper))
1246
1247
1248     def upload(self, uploadable, history=None):
1249         """
1250         Returns a Deferred that will fire with the UploadResults instance.
1251         """
1252         assert self.parent
1253         assert self.running
1254
1255         uploadable = IUploadable(uploadable)
1256         d = uploadable.get_size()
1257         def _got_size(size):
1258             default_params = self.parent.get_encoding_parameters()
1259             precondition(isinstance(default_params, dict), default_params)
1260             precondition("max_segment_size" in default_params, default_params)
1261             uploadable.set_default_encoding_parameters(default_params)
1262
1263             if self.stats_provider:
1264                 self.stats_provider.count('uploader.files_uploaded', 1)
1265                 self.stats_provider.count('uploader.bytes_uploaded', size)
1266
1267             if size <= self.URI_LIT_SIZE_THRESHOLD:
1268                 uploader = LiteralUploader()
1269                 return uploader.start(uploadable)
1270             else:
1271                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1272                 d2 = defer.succeed(None)
1273                 if self._helper:
1274                     uploader = AssistedUploader(self._helper)
1275                     d2.addCallback(lambda x: eu.get_storage_index())
1276                     d2.addCallback(lambda si: uploader.start(eu, si))
1277                 else:
1278                     storage_broker = self.parent.get_storage_broker()
1279                     secret_holder = self.parent._secret_holder
1280                     uploader = CHKUploader(storage_broker, secret_holder)
1281                     d2.addCallback(lambda x: uploader.start(eu))
1282
1283                 self._all_uploads[uploader] = None
1284                 if history:
1285                     history.add_upload(uploader.get_upload_status())
1286                 def turn_verifycap_into_read_cap(uploadresults):
1287                     # Generate the uri from the verifycap plus the key.
1288                     d3 = uploadable.get_encryption_key()
1289                     def put_readcap_into_results(key):
1290                         v = uri.from_string(uploadresults.verifycapstr)
1291                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1292                         uploadresults.uri = r.to_string()
1293                         return uploadresults
1294                     d3.addCallback(put_readcap_into_results)
1295                     return d3
1296                 d2.addCallback(turn_verifycap_into_read_cap)
1297                 return d2
1298         d.addCallback(_got_size)
1299         def _done(res):
1300             uploadable.close()
1301             return res
1302         d.addBoth(_done)
1303         return d