]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
switch all foolscap imports to use foolscap.api or foolscap.logging
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.rrefutil import get_versioned_remote_reference
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
20      NotEnoughSharesError, InsufficientVersionError, NoServersError
21 from allmydata.immutable import layout
22 from pycryptopp.cipher.aes import AES
23
24 from cStringIO import StringIO
25
26
27 KiB=1024
28 MiB=1024*KiB
29 GiB=1024*MiB
30 TiB=1024*GiB
31 PiB=1024*TiB
32
33 class HaveAllPeersError(Exception):
34     # we use this to jump out of the loop
35     pass
36
37 # this wants to live in storage, not here
38 class TooFullError(Exception):
39     pass
40
41 class UploadResults(Copyable, RemoteCopy):
42     implements(IUploadResults)
43     # note: don't change this string, it needs to match the value used on the
44     # helper, and it does *not* need to match the fully-qualified
45     # package/module/class name
46     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
47     copytype = typeToCopy
48
49     # also, think twice about changing the shape of any existing attribute,
50     # because instances of this class are sent from the helper to its client,
51     # so changing this may break compatibility. Consider adding new fields
52     # instead of modifying existing ones.
53
54     def __init__(self):
55         self.timings = {} # dict of name to number of seconds
56         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
57         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
58         self.file_size = None
59         self.ciphertext_fetched = None # how much the helper fetched
60         self.uri = None
61         self.preexisting_shares = None # count of shares already present
62         self.pushed_shares = None # count of shares we pushed
63
64
65 # our current uri_extension is 846 bytes for small files, a few bytes
66 # more for larger ones (since the filesize is encoded in decimal in a
67 # few places). Ask for a little bit more just in case we need it. If
68 # the extension changes size, we can change EXTENSION_SIZE to
69 # allocate a more accurate amount of space.
70 EXTENSION_SIZE = 1000
71 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
72 # this.
73
74 class PeerTracker:
75     def __init__(self, peerid, storage_server,
76                  sharesize, blocksize, num_segments, num_share_hashes,
77                  storage_index,
78                  bucket_renewal_secret, bucket_cancel_secret):
79         precondition(isinstance(peerid, str), peerid)
80         precondition(len(peerid) == 20, peerid)
81         self.peerid = peerid
82         self._storageserver = storage_server # to an RIStorageServer
83         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
84         self.sharesize = sharesize
85
86         wbp = layout.make_write_bucket_proxy(None, sharesize,
87                                              blocksize, num_segments,
88                                              num_share_hashes,
89                                              EXTENSION_SIZE, peerid)
90         self.wbp_class = wbp.__class__ # to create more of them
91         self.allocated_size = wbp.get_allocated_size()
92         self.blocksize = blocksize
93         self.num_segments = num_segments
94         self.num_share_hashes = num_share_hashes
95         self.storage_index = storage_index
96
97         self.renew_secret = bucket_renewal_secret
98         self.cancel_secret = bucket_cancel_secret
99
100     def __repr__(self):
101         return ("<PeerTracker for peer %s and SI %s>"
102                 % (idlib.shortnodeid_b2a(self.peerid),
103                    si_b2a(self.storage_index)[:5]))
104
105     def query(self, sharenums):
106         d = self._storageserver.callRemote("allocate_buckets",
107                                            self.storage_index,
108                                            self.renew_secret,
109                                            self.cancel_secret,
110                                            sharenums,
111                                            self.allocated_size,
112                                            canary=Referenceable())
113         d.addCallback(self._got_reply)
114         return d
115
116     def _got_reply(self, (alreadygot, buckets)):
117         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
118         b = {}
119         for sharenum, rref in buckets.iteritems():
120             bp = self.wbp_class(rref, self.sharesize,
121                                 self.blocksize,
122                                 self.num_segments,
123                                 self.num_share_hashes,
124                                 EXTENSION_SIZE,
125                                 self.peerid)
126             b[sharenum] = bp
127         self.buckets.update(b)
128         return (alreadygot, set(b.keys()))
129
130 class Tahoe2PeerSelector:
131
132     def __init__(self, upload_id, logparent=None, upload_status=None):
133         self.upload_id = upload_id
134         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
135         self.error_count = 0
136         self.num_peers_contacted = 0
137         self.last_failure_msg = None
138         self._status = IUploadStatus(upload_status)
139         self._log_parent = log.msg("%s starting" % self, parent=logparent)
140
141     def __repr__(self):
142         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
143
144     def get_shareholders(self, client,
145                          storage_index, share_size, block_size,
146                          num_segments, total_shares, shares_of_happiness):
147         """
148         @return: (used_peers, already_peers), where used_peers is a set of
149                  PeerTracker instances that have agreed to hold some shares
150                  for us (the shnum is stashed inside the PeerTracker),
151                  and already_peers is a dict mapping shnum to a peer
152                  which claims to already have the share.
153         """
154
155         if self._status:
156             self._status.set_status("Contacting Peers..")
157
158         self.total_shares = total_shares
159         self.shares_of_happiness = shares_of_happiness
160
161         self.homeless_shares = range(total_shares)
162         # self.uncontacted_peers = list() # peers we haven't asked yet
163         self.contacted_peers = [] # peers worth asking again
164         self.contacted_peers2 = [] # peers that we have asked again
165         self._started_second_pass = False
166         self.use_peers = set() # PeerTrackers that have shares assigned to them
167         self.preexisting_shares = {} # sharenum -> peerid holding the share
168
169         peers = client.get_permuted_peers("storage", storage_index)
170         if not peers:
171             raise NoServersError("client gave us zero peers")
172
173         # this needed_hashes computation should mirror
174         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
175         # (instead of a HashTree) because we don't require actual hashing
176         # just to count the levels.
177         ht = hashtree.IncompleteHashTree(total_shares)
178         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
179
180         # figure out how much space to ask for
181         wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
182                                              num_share_hashes, EXTENSION_SIZE,
183                                              None)
184         allocated_size = wbp.get_allocated_size()
185
186         # filter the list of peers according to which ones can accomodate
187         # this request. This excludes older peers (which used a 4-byte size
188         # field) from getting large shares (for files larger than about
189         # 12GiB). See #439 for details.
190         def _get_maxsize(peer):
191             (peerid, conn) = peer
192             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
193             return v1["maximum-immutable-share-size"]
194         peers = [peer for peer in peers
195                  if _get_maxsize(peer) >= allocated_size]
196         if not peers:
197             raise NoServersError("no peers could accept an allocated_size of %d" % allocated_size)
198
199         # decide upon the renewal/cancel secrets, to include them in the
200         # allocat_buckets query.
201         client_renewal_secret = client.get_renewal_secret()
202         client_cancel_secret = client.get_cancel_secret()
203
204         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
205                                                        storage_index)
206         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
207                                                      storage_index)
208
209         trackers = [ PeerTracker(peerid, conn,
210                                  share_size, block_size,
211                                  num_segments, num_share_hashes,
212                                  storage_index,
213                                  bucket_renewal_secret_hash(file_renewal_secret,
214                                                             peerid),
215                                  bucket_cancel_secret_hash(file_cancel_secret,
216                                                            peerid),
217                                  )
218                      for (peerid, conn) in peers ]
219         self.uncontacted_peers = trackers
220
221         d = defer.maybeDeferred(self._loop)
222         return d
223
224     def _loop(self):
225         if not self.homeless_shares:
226             # all done
227             msg = ("placed all %d shares, "
228                    "sent %d queries to %d peers, "
229                    "%d queries placed some shares, %d placed none, "
230                    "got %d errors" %
231                    (self.total_shares,
232                     self.query_count, self.num_peers_contacted,
233                     self.good_query_count, self.bad_query_count,
234                     self.error_count))
235             log.msg("peer selection successful for %s: %s" % (self, msg),
236                     parent=self._log_parent)
237             return (self.use_peers, self.preexisting_shares)
238
239         if self.uncontacted_peers:
240             peer = self.uncontacted_peers.pop(0)
241             # TODO: don't pre-convert all peerids to PeerTrackers
242             assert isinstance(peer, PeerTracker)
243
244             shares_to_ask = set([self.homeless_shares.pop(0)])
245             self.query_count += 1
246             self.num_peers_contacted += 1
247             if self._status:
248                 self._status.set_status("Contacting Peers [%s] (first query),"
249                                         " %d shares left.."
250                                         % (idlib.shortnodeid_b2a(peer.peerid),
251                                            len(self.homeless_shares)))
252             d = peer.query(shares_to_ask)
253             d.addBoth(self._got_response, peer, shares_to_ask,
254                       self.contacted_peers)
255             return d
256         elif self.contacted_peers:
257             # ask a peer that we've already asked.
258             if not self._started_second_pass:
259                 log.msg("starting second pass", parent=self._log_parent,
260                         level=log.NOISY)
261                 self._started_second_pass = True
262             num_shares = mathutil.div_ceil(len(self.homeless_shares),
263                                            len(self.contacted_peers))
264             peer = self.contacted_peers.pop(0)
265             shares_to_ask = set(self.homeless_shares[:num_shares])
266             self.homeless_shares[:num_shares] = []
267             self.query_count += 1
268             if self._status:
269                 self._status.set_status("Contacting Peers [%s] (second query),"
270                                         " %d shares left.."
271                                         % (idlib.shortnodeid_b2a(peer.peerid),
272                                            len(self.homeless_shares)))
273             d = peer.query(shares_to_ask)
274             d.addBoth(self._got_response, peer, shares_to_ask,
275                       self.contacted_peers2)
276             return d
277         elif self.contacted_peers2:
278             # we've finished the second-or-later pass. Move all the remaining
279             # peers back into self.contacted_peers for the next pass.
280             self.contacted_peers.extend(self.contacted_peers2)
281             self.contacted_peers[:] = []
282             return self._loop()
283         else:
284             # no more peers. If we haven't placed enough shares, we fail.
285             placed_shares = self.total_shares - len(self.homeless_shares)
286             if placed_shares < self.shares_of_happiness:
287                 msg = ("placed %d shares out of %d total (%d homeless), "
288                        "sent %d queries to %d peers, "
289                        "%d queries placed some shares, %d placed none, "
290                        "got %d errors" %
291                        (self.total_shares - len(self.homeless_shares),
292                         self.total_shares, len(self.homeless_shares),
293                         self.query_count, self.num_peers_contacted,
294                         self.good_query_count, self.bad_query_count,
295                         self.error_count))
296                 msg = "peer selection failed for %s: %s" % (self, msg)
297                 if self.last_failure_msg:
298                     msg += " (%s)" % (self.last_failure_msg,)
299                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
300                 raise NotEnoughSharesError(msg, placed_shares,
301                                            self.shares_of_happiness)
302             else:
303                 # we placed enough to be happy, so we're done
304                 if self._status:
305                     self._status.set_status("Placed all shares")
306                 return self.use_peers
307
308     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
309         if isinstance(res, failure.Failure):
310             # This is unusual, and probably indicates a bug or a network
311             # problem.
312             log.msg("%s got error during peer selection: %s" % (peer, res),
313                     level=log.UNUSUAL, parent=self._log_parent)
314             self.error_count += 1
315             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
316             if (self.uncontacted_peers
317                 or self.contacted_peers
318                 or self.contacted_peers2):
319                 # there is still hope, so just loop
320                 pass
321             else:
322                 # No more peers, so this upload might fail (it depends upon
323                 # whether we've hit shares_of_happiness or not). Log the last
324                 # failure we got: if a coding error causes all peers to fail
325                 # in the same way, this allows the common failure to be seen
326                 # by the uploader and should help with debugging
327                 msg = ("last failure (from %s) was: %s" % (peer, res))
328                 self.last_failure_msg = msg
329         else:
330             (alreadygot, allocated) = res
331             log.msg("response from peer %s: alreadygot=%s, allocated=%s"
332                     % (idlib.shortnodeid_b2a(peer.peerid),
333                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
334                     level=log.NOISY, parent=self._log_parent)
335             progress = False
336             for s in alreadygot:
337                 self.preexisting_shares[s] = peer.peerid
338                 if s in self.homeless_shares:
339                     self.homeless_shares.remove(s)
340                     progress = True
341
342             # the PeerTracker will remember which shares were allocated on
343             # that peer. We just have to remember to use them.
344             if allocated:
345                 self.use_peers.add(peer)
346                 progress = True
347
348             not_yet_present = set(shares_to_ask) - set(alreadygot)
349             still_homeless = not_yet_present - set(allocated)
350
351             if progress:
352                 # they accepted or already had at least one share, so
353                 # progress has been made
354                 self.good_query_count += 1
355             else:
356                 self.bad_query_count += 1
357
358             if still_homeless:
359                 # In networks with lots of space, this is very unusual and
360                 # probably indicates an error. In networks with peers that
361                 # are full, it is merely unusual. In networks that are very
362                 # full, it is common, and many uploads will fail. In most
363                 # cases, this is obviously not fatal, and we'll just use some
364                 # other peers.
365
366                 # some shares are still homeless, keep trying to find them a
367                 # home. The ones that were rejected get first priority.
368                 self.homeless_shares = (list(still_homeless)
369                                         + self.homeless_shares)
370                 # Since they were unable to accept all of our requests, so it
371                 # is safe to assume that asking them again won't help.
372             else:
373                 # if they *were* able to accept everything, they might be
374                 # willing to accept even more.
375                 put_peer_here.append(peer)
376
377         # now loop
378         return self._loop()
379
380
381 class EncryptAnUploadable:
382     """This is a wrapper that takes an IUploadable and provides
383     IEncryptedUploadable."""
384     implements(IEncryptedUploadable)
385     CHUNKSIZE = 50*1024
386
387     def __init__(self, original, log_parent=None):
388         self.original = IUploadable(original)
389         self._log_number = log_parent
390         self._encryptor = None
391         self._plaintext_hasher = plaintext_hasher()
392         self._plaintext_segment_hasher = None
393         self._plaintext_segment_hashes = []
394         self._encoding_parameters = None
395         self._file_size = None
396         self._ciphertext_bytes_read = 0
397         self._status = None
398
399     def set_upload_status(self, upload_status):
400         self._status = IUploadStatus(upload_status)
401         self.original.set_upload_status(upload_status)
402
403     def log(self, *args, **kwargs):
404         if "facility" not in kwargs:
405             kwargs["facility"] = "upload.encryption"
406         if "parent" not in kwargs:
407             kwargs["parent"] = self._log_number
408         return log.msg(*args, **kwargs)
409
410     def get_size(self):
411         if self._file_size is not None:
412             return defer.succeed(self._file_size)
413         d = self.original.get_size()
414         def _got_size(size):
415             self._file_size = size
416             if self._status:
417                 self._status.set_size(size)
418             return size
419         d.addCallback(_got_size)
420         return d
421
422     def get_all_encoding_parameters(self):
423         if self._encoding_parameters is not None:
424             return defer.succeed(self._encoding_parameters)
425         d = self.original.get_all_encoding_parameters()
426         def _got(encoding_parameters):
427             (k, happy, n, segsize) = encoding_parameters
428             self._segment_size = segsize # used by segment hashers
429             self._encoding_parameters = encoding_parameters
430             self.log("my encoding parameters: %s" % (encoding_parameters,),
431                      level=log.NOISY)
432             return encoding_parameters
433         d.addCallback(_got)
434         return d
435
436     def _get_encryptor(self):
437         if self._encryptor:
438             return defer.succeed(self._encryptor)
439
440         d = self.original.get_encryption_key()
441         def _got(key):
442             e = AES(key)
443             self._encryptor = e
444
445             storage_index = storage_index_hash(key)
446             assert isinstance(storage_index, str)
447             # There's no point to having the SI be longer than the key, so we
448             # specify that it is truncated to the same 128 bits as the AES key.
449             assert len(storage_index) == 16  # SHA-256 truncated to 128b
450             self._storage_index = storage_index
451             if self._status:
452                 self._status.set_storage_index(storage_index)
453             return e
454         d.addCallback(_got)
455         return d
456
457     def get_storage_index(self):
458         d = self._get_encryptor()
459         d.addCallback(lambda res: self._storage_index)
460         return d
461
462     def _get_segment_hasher(self):
463         p = self._plaintext_segment_hasher
464         if p:
465             left = self._segment_size - self._plaintext_segment_hashed_bytes
466             return p, left
467         p = plaintext_segment_hasher()
468         self._plaintext_segment_hasher = p
469         self._plaintext_segment_hashed_bytes = 0
470         return p, self._segment_size
471
472     def _update_segment_hash(self, chunk):
473         offset = 0
474         while offset < len(chunk):
475             p, segment_left = self._get_segment_hasher()
476             chunk_left = len(chunk) - offset
477             this_segment = min(chunk_left, segment_left)
478             p.update(chunk[offset:offset+this_segment])
479             self._plaintext_segment_hashed_bytes += this_segment
480
481             if self._plaintext_segment_hashed_bytes == self._segment_size:
482                 # we've filled this segment
483                 self._plaintext_segment_hashes.append(p.digest())
484                 self._plaintext_segment_hasher = None
485                 self.log("closed hash [%d]: %dB" %
486                          (len(self._plaintext_segment_hashes)-1,
487                           self._plaintext_segment_hashed_bytes),
488                          level=log.NOISY)
489                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
490                          segnum=len(self._plaintext_segment_hashes)-1,
491                          hash=base32.b2a(p.digest()),
492                          level=log.NOISY)
493
494             offset += this_segment
495
496
497     def read_encrypted(self, length, hash_only):
498         # make sure our parameters have been set up first
499         d = self.get_all_encoding_parameters()
500         # and size
501         d.addCallback(lambda ignored: self.get_size())
502         d.addCallback(lambda ignored: self._get_encryptor())
503         # then fetch and encrypt the plaintext. The unusual structure here
504         # (passing a Deferred *into* a function) is needed to avoid
505         # overflowing the stack: Deferreds don't optimize out tail recursion.
506         # We also pass in a list, to which _read_encrypted will append
507         # ciphertext.
508         ciphertext = []
509         d2 = defer.Deferred()
510         d.addCallback(lambda ignored:
511                       self._read_encrypted(length, ciphertext, hash_only, d2))
512         d.addCallback(lambda ignored: d2)
513         return d
514
515     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
516         if not remaining:
517             fire_when_done.callback(ciphertext)
518             return None
519         # tolerate large length= values without consuming a lot of RAM by
520         # reading just a chunk (say 50kB) at a time. This only really matters
521         # when hash_only==True (i.e. resuming an interrupted upload), since
522         # that's the case where we will be skipping over a lot of data.
523         size = min(remaining, self.CHUNKSIZE)
524         remaining = remaining - size
525         # read a chunk of plaintext..
526         d = defer.maybeDeferred(self.original.read, size)
527         # N.B.: if read() is synchronous, then since everything else is
528         # actually synchronous too, we'd blow the stack unless we stall for a
529         # tick. Once you accept a Deferred from IUploadable.read(), you must
530         # be prepared to have it fire immediately too.
531         d.addCallback(fireEventually)
532         def _good(plaintext):
533             # and encrypt it..
534             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
535             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
536             ciphertext.extend(ct)
537             self._read_encrypted(remaining, ciphertext, hash_only,
538                                  fire_when_done)
539         def _err(why):
540             fire_when_done.errback(why)
541         d.addCallback(_good)
542         d.addErrback(_err)
543         return None
544
545     def _hash_and_encrypt_plaintext(self, data, hash_only):
546         assert isinstance(data, (tuple, list)), type(data)
547         data = list(data)
548         cryptdata = []
549         # we use data.pop(0) instead of 'for chunk in data' to save
550         # memory: each chunk is destroyed as soon as we're done with it.
551         bytes_processed = 0
552         while data:
553             chunk = data.pop(0)
554             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
555                      level=log.NOISY)
556             bytes_processed += len(chunk)
557             self._plaintext_hasher.update(chunk)
558             self._update_segment_hash(chunk)
559             # TODO: we have to encrypt the data (even if hash_only==True)
560             # because pycryptopp's AES-CTR implementation doesn't offer a
561             # way to change the counter value. Once pycryptopp acquires
562             # this ability, change this to simply update the counter
563             # before each call to (hash_only==False) _encryptor.process()
564             ciphertext = self._encryptor.process(chunk)
565             if hash_only:
566                 self.log("  skipping encryption", level=log.NOISY)
567             else:
568                 cryptdata.append(ciphertext)
569             del ciphertext
570             del chunk
571         self._ciphertext_bytes_read += bytes_processed
572         if self._status:
573             progress = float(self._ciphertext_bytes_read) / self._file_size
574             self._status.set_progress(1, progress)
575         return cryptdata
576
577
578     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
579         if len(self._plaintext_segment_hashes) < num_segments:
580             # close out the last one
581             assert len(self._plaintext_segment_hashes) == num_segments-1
582             p, segment_left = self._get_segment_hasher()
583             self._plaintext_segment_hashes.append(p.digest())
584             del self._plaintext_segment_hasher
585             self.log("closing plaintext leaf hasher, hashed %d bytes" %
586                      self._plaintext_segment_hashed_bytes,
587                      level=log.NOISY)
588             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
589                      segnum=len(self._plaintext_segment_hashes)-1,
590                      hash=base32.b2a(p.digest()),
591                      level=log.NOISY)
592         assert len(self._plaintext_segment_hashes) == num_segments
593         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
594
595     def get_plaintext_hash(self):
596         h = self._plaintext_hasher.digest()
597         return defer.succeed(h)
598
599     def close(self):
600         return self.original.close()
601
602 class UploadStatus:
603     implements(IUploadStatus)
604     statusid_counter = itertools.count(0)
605
606     def __init__(self):
607         self.storage_index = None
608         self.size = None
609         self.helper = False
610         self.status = "Not started"
611         self.progress = [0.0, 0.0, 0.0]
612         self.active = True
613         self.results = None
614         self.counter = self.statusid_counter.next()
615         self.started = time.time()
616
617     def get_started(self):
618         return self.started
619     def get_storage_index(self):
620         return self.storage_index
621     def get_size(self):
622         return self.size
623     def using_helper(self):
624         return self.helper
625     def get_status(self):
626         return self.status
627     def get_progress(self):
628         return tuple(self.progress)
629     def get_active(self):
630         return self.active
631     def get_results(self):
632         return self.results
633     def get_counter(self):
634         return self.counter
635
636     def set_storage_index(self, si):
637         self.storage_index = si
638     def set_size(self, size):
639         self.size = size
640     def set_helper(self, helper):
641         self.helper = helper
642     def set_status(self, status):
643         self.status = status
644     def set_progress(self, which, value):
645         # [0]: chk, [1]: ciphertext, [2]: encode+push
646         self.progress[which] = value
647     def set_active(self, value):
648         self.active = value
649     def set_results(self, value):
650         self.results = value
651
652 class CHKUploader:
653     peer_selector_class = Tahoe2PeerSelector
654
655     def __init__(self, client):
656         self._client = client
657         self._log_number = self._client.log("CHKUploader starting")
658         self._encoder = None
659         self._results = UploadResults()
660         self._storage_index = None
661         self._upload_status = UploadStatus()
662         self._upload_status.set_helper(False)
663         self._upload_status.set_active(True)
664         self._upload_status.set_results(self._results)
665
666         # locate_all_shareholders() will create the following attribute:
667         # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
668
669     def log(self, *args, **kwargs):
670         if "parent" not in kwargs:
671             kwargs["parent"] = self._log_number
672         if "facility" not in kwargs:
673             kwargs["facility"] = "tahoe.upload"
674         return self._client.log(*args, **kwargs)
675
676     def start(self, encrypted_uploadable):
677         """Start uploading the file.
678
679         Returns a Deferred that will fire with the UploadResults instance.
680         """
681
682         self._started = time.time()
683         eu = IEncryptedUploadable(encrypted_uploadable)
684         self.log("starting upload of %s" % eu)
685
686         eu.set_upload_status(self._upload_status)
687         d = self.start_encrypted(eu)
688         def _done(uploadresults):
689             self._upload_status.set_active(False)
690             return uploadresults
691         d.addBoth(_done)
692         return d
693
694     def abort(self):
695         """Call this if the upload must be abandoned before it completes.
696         This will tell the shareholders to delete their partial shares. I
697         return a Deferred that fires when these messages have been acked."""
698         if not self._encoder:
699             # how did you call abort() before calling start() ?
700             return defer.succeed(None)
701         return self._encoder.abort()
702
703     def start_encrypted(self, encrypted):
704         """ Returns a Deferred that will fire with the UploadResults instance. """
705         eu = IEncryptedUploadable(encrypted)
706
707         started = time.time()
708         self._encoder = e = encode.Encoder(self._log_number,
709                                            self._upload_status)
710         d = e.set_encrypted_uploadable(eu)
711         d.addCallback(self.locate_all_shareholders, started)
712         d.addCallback(self.set_shareholders, e)
713         d.addCallback(lambda res: e.start())
714         d.addCallback(self._encrypted_done)
715         return d
716
717     def locate_all_shareholders(self, encoder, started):
718         peer_selection_started = now = time.time()
719         self._storage_index_elapsed = now - started
720         storage_index = encoder.get_param("storage_index")
721         self._storage_index = storage_index
722         upload_id = si_b2a(storage_index)[:5]
723         self.log("using storage index %s" % upload_id)
724         peer_selector = self.peer_selector_class(upload_id, self._log_number,
725                                                  self._upload_status)
726
727         share_size = encoder.get_param("share_size")
728         block_size = encoder.get_param("block_size")
729         num_segments = encoder.get_param("num_segments")
730         k,desired,n = encoder.get_param("share_counts")
731
732         self._peer_selection_started = time.time()
733         d = peer_selector.get_shareholders(self._client, storage_index,
734                                            share_size, block_size,
735                                            num_segments, n, desired)
736         def _done(res):
737             self._peer_selection_elapsed = time.time() - peer_selection_started
738             return res
739         d.addCallback(_done)
740         return d
741
742     def set_shareholders(self, (used_peers, already_peers), encoder):
743         """
744         @param used_peers: a sequence of PeerTracker objects
745         @paran already_peers: a dict mapping sharenum to a peerid that
746                               claims to already have this share
747         """
748         self.log("_send_shares, used_peers is %s" % (used_peers,))
749         # record already-present shares in self._results
750         self._results.preexisting_shares = len(already_peers)
751
752         self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
753         for peer in used_peers:
754             assert isinstance(peer, PeerTracker)
755         buckets = {}
756         for peer in used_peers:
757             buckets.update(peer.buckets)
758             for shnum in peer.buckets:
759                 self._peer_trackers[shnum] = peer
760         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
761         encoder.set_shareholders(buckets)
762
763     def _encrypted_done(self, verifycap):
764         """ Returns a Deferred that will fire with the UploadResults instance. """
765         r = self._results
766         for shnum in self._encoder.get_shares_placed():
767             peer_tracker = self._peer_trackers[shnum]
768             peerid = peer_tracker.peerid
769             peerid_s = idlib.shortnodeid_b2a(peerid)
770             r.sharemap.add(shnum, peerid)
771             r.servermap.add(peerid, shnum)
772         r.pushed_shares = len(self._encoder.get_shares_placed())
773         now = time.time()
774         r.file_size = self._encoder.file_size
775         r.timings["total"] = now - self._started
776         r.timings["storage_index"] = self._storage_index_elapsed
777         r.timings["peer_selection"] = self._peer_selection_elapsed
778         r.timings.update(self._encoder.get_times())
779         r.uri_extension_data = self._encoder.get_uri_extension_data()
780         r.verifycapstr = verifycap.to_string()
781         return r
782
783     def get_upload_status(self):
784         return self._upload_status
785
786 def read_this_many_bytes(uploadable, size, prepend_data=[]):
787     if size == 0:
788         return defer.succeed([])
789     d = uploadable.read(size)
790     def _got(data):
791         assert isinstance(data, list)
792         bytes = sum([len(piece) for piece in data])
793         assert bytes > 0
794         assert bytes <= size
795         remaining = size - bytes
796         if remaining:
797             return read_this_many_bytes(uploadable, remaining,
798                                         prepend_data + data)
799         return prepend_data + data
800     d.addCallback(_got)
801     return d
802
803 class LiteralUploader:
804
805     def __init__(self, client):
806         self._client = client
807         self._results = UploadResults()
808         self._status = s = UploadStatus()
809         s.set_storage_index(None)
810         s.set_helper(False)
811         s.set_progress(0, 1.0)
812         s.set_active(False)
813         s.set_results(self._results)
814
815     def start(self, uploadable):
816         uploadable = IUploadable(uploadable)
817         d = uploadable.get_size()
818         def _got_size(size):
819             self._size = size
820             self._status.set_size(size)
821             self._results.file_size = size
822             return read_this_many_bytes(uploadable, size)
823         d.addCallback(_got_size)
824         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
825         d.addCallback(lambda u: u.to_string())
826         d.addCallback(self._build_results)
827         return d
828
829     def _build_results(self, uri):
830         self._results.uri = uri
831         self._status.set_status("Done")
832         self._status.set_progress(1, 1.0)
833         self._status.set_progress(2, 1.0)
834         return self._results
835
836     def close(self):
837         pass
838
839     def get_upload_status(self):
840         return self._status
841
842 class RemoteEncryptedUploadable(Referenceable):
843     implements(RIEncryptedUploadable)
844
845     def __init__(self, encrypted_uploadable, upload_status):
846         self._eu = IEncryptedUploadable(encrypted_uploadable)
847         self._offset = 0
848         self._bytes_sent = 0
849         self._status = IUploadStatus(upload_status)
850         # we are responsible for updating the status string while we run, and
851         # for setting the ciphertext-fetch progress.
852         self._size = None
853
854     def get_size(self):
855         if self._size is not None:
856             return defer.succeed(self._size)
857         d = self._eu.get_size()
858         def _got_size(size):
859             self._size = size
860             return size
861         d.addCallback(_got_size)
862         return d
863
864     def remote_get_size(self):
865         return self.get_size()
866     def remote_get_all_encoding_parameters(self):
867         return self._eu.get_all_encoding_parameters()
868
869     def _read_encrypted(self, length, hash_only):
870         d = self._eu.read_encrypted(length, hash_only)
871         def _read(strings):
872             if hash_only:
873                 self._offset += length
874             else:
875                 size = sum([len(data) for data in strings])
876                 self._offset += size
877             return strings
878         d.addCallback(_read)
879         return d
880
881     def remote_read_encrypted(self, offset, length):
882         # we don't support seek backwards, but we allow skipping forwards
883         precondition(offset >= 0, offset)
884         precondition(length >= 0, length)
885         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
886                      level=log.NOISY)
887         precondition(offset >= self._offset, offset, self._offset)
888         if offset > self._offset:
889             # read the data from disk anyways, to build up the hash tree
890             skip = offset - self._offset
891             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
892                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
893             d = self._read_encrypted(skip, hash_only=True)
894         else:
895             d = defer.succeed(None)
896
897         def _at_correct_offset(res):
898             assert offset == self._offset, "%d != %d" % (offset, self._offset)
899             return self._read_encrypted(length, hash_only=False)
900         d.addCallback(_at_correct_offset)
901
902         def _read(strings):
903             size = sum([len(data) for data in strings])
904             self._bytes_sent += size
905             return strings
906         d.addCallback(_read)
907         return d
908
909     def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
910         log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
911                 (first, last-1, num_segments),
912                 level=log.NOISY)
913         d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
914         d.addCallback(list)
915         return d
916     def remote_get_plaintext_hash(self):
917         return self._eu.get_plaintext_hash()
918     def remote_close(self):
919         return self._eu.close()
920
921
922 class AssistedUploader:
923
924     def __init__(self, helper):
925         self._helper = helper
926         self._log_number = log.msg("AssistedUploader starting")
927         self._storage_index = None
928         self._upload_status = s = UploadStatus()
929         s.set_helper(True)
930         s.set_active(True)
931
932     def log(self, *args, **kwargs):
933         if "parent" not in kwargs:
934             kwargs["parent"] = self._log_number
935         return log.msg(*args, **kwargs)
936
937     def start(self, encrypted_uploadable, storage_index):
938         """Start uploading the file.
939
940         Returns a Deferred that will fire with the UploadResults instance.
941         """
942         precondition(isinstance(storage_index, str), storage_index)
943         self._started = time.time()
944         eu = IEncryptedUploadable(encrypted_uploadable)
945         eu.set_upload_status(self._upload_status)
946         self._encuploadable = eu
947         self._storage_index = storage_index
948         d = eu.get_size()
949         d.addCallback(self._got_size)
950         d.addCallback(lambda res: eu.get_all_encoding_parameters())
951         d.addCallback(self._got_all_encoding_parameters)
952         d.addCallback(self._contact_helper)
953         d.addCallback(self._build_verifycap)
954         def _done(res):
955             self._upload_status.set_active(False)
956             return res
957         d.addBoth(_done)
958         return d
959
960     def _got_size(self, size):
961         self._size = size
962         self._upload_status.set_size(size)
963
964     def _got_all_encoding_parameters(self, params):
965         k, happy, n, segment_size = params
966         # stash these for URI generation later
967         self._needed_shares = k
968         self._total_shares = n
969         self._segment_size = segment_size
970
971     def _contact_helper(self, res):
972         now = self._time_contacting_helper_start = time.time()
973         self._storage_index_elapsed = now - self._started
974         self.log(format="contacting helper for SI %(si)s..",
975                  si=si_b2a(self._storage_index))
976         self._upload_status.set_status("Contacting Helper")
977         d = self._helper.callRemote("upload_chk", self._storage_index)
978         d.addCallback(self._contacted_helper)
979         return d
980
981     def _contacted_helper(self, (upload_results, upload_helper)):
982         now = time.time()
983         elapsed = now - self._time_contacting_helper_start
984         self._elapsed_time_contacting_helper = elapsed
985         if upload_helper:
986             self.log("helper says we need to upload")
987             self._upload_status.set_status("Uploading Ciphertext")
988             # we need to upload the file
989             reu = RemoteEncryptedUploadable(self._encuploadable,
990                                             self._upload_status)
991             # let it pre-compute the size for progress purposes
992             d = reu.get_size()
993             d.addCallback(lambda ignored:
994                           upload_helper.callRemote("upload", reu))
995             # this Deferred will fire with the upload results
996             return d
997         self.log("helper says file is already uploaded")
998         self._upload_status.set_progress(1, 1.0)
999         self._upload_status.set_results(upload_results)
1000         return upload_results
1001
1002     def _convert_old_upload_results(self, upload_results):
1003         # pre-1.3.0 helpers return upload results which contain a mapping
1004         # from shnum to a single human-readable string, containing things
1005         # like "Found on [x],[y],[z]" (for healthy files that were already in
1006         # the grid), "Found on [x]" (for files that needed upload but which
1007         # discovered pre-existing shares), and "Placed on [x]" (for newly
1008         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1009         # set of binary serverid strings.
1010
1011         # the old results are too hard to deal with (they don't even contain
1012         # as much information as the new results, since the nodeids are
1013         # abbreviated), so if we detect old results, just clobber them.
1014
1015         sharemap = upload_results.sharemap
1016         if str in [type(v) for v in sharemap.values()]:
1017             upload_results.sharemap = None
1018
1019     def _build_verifycap(self, upload_results):
1020         self.log("upload finished, building readcap")
1021         self._convert_old_upload_results(upload_results)
1022         self._upload_status.set_status("Building Readcap")
1023         r = upload_results
1024         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1025         assert r.uri_extension_data["total_shares"] == self._total_shares
1026         assert r.uri_extension_data["segment_size"] == self._segment_size
1027         assert r.uri_extension_data["size"] == self._size
1028         r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1029                                              uri_extension_hash=r.uri_extension_hash,
1030                                              needed_shares=self._needed_shares,
1031                                              total_shares=self._total_shares, size=self._size
1032                                              ).to_string()
1033         now = time.time()
1034         r.file_size = self._size
1035         r.timings["storage_index"] = self._storage_index_elapsed
1036         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1037         if "total" in r.timings:
1038             r.timings["helper_total"] = r.timings["total"]
1039         r.timings["total"] = now - self._started
1040         self._upload_status.set_status("Done")
1041         self._upload_status.set_results(r)
1042         return r
1043
1044     def get_upload_status(self):
1045         return self._upload_status
1046
1047 class BaseUploadable:
1048     default_max_segment_size = 128*KiB # overridden by max_segment_size
1049     default_encoding_param_k = 3 # overridden by encoding_parameters
1050     default_encoding_param_happy = 7
1051     default_encoding_param_n = 10
1052
1053     max_segment_size = None
1054     encoding_param_k = None
1055     encoding_param_happy = None
1056     encoding_param_n = None
1057
1058     _all_encoding_parameters = None
1059     _status = None
1060
1061     def set_upload_status(self, upload_status):
1062         self._status = IUploadStatus(upload_status)
1063
1064     def set_default_encoding_parameters(self, default_params):
1065         assert isinstance(default_params, dict)
1066         for k,v in default_params.items():
1067             precondition(isinstance(k, str), k, v)
1068             precondition(isinstance(v, int), k, v)
1069         if "k" in default_params:
1070             self.default_encoding_param_k = default_params["k"]
1071         if "happy" in default_params:
1072             self.default_encoding_param_happy = default_params["happy"]
1073         if "n" in default_params:
1074             self.default_encoding_param_n = default_params["n"]
1075         if "max_segment_size" in default_params:
1076             self.default_max_segment_size = default_params["max_segment_size"]
1077
1078     def get_all_encoding_parameters(self):
1079         if self._all_encoding_parameters:
1080             return defer.succeed(self._all_encoding_parameters)
1081
1082         max_segsize = self.max_segment_size or self.default_max_segment_size
1083         k = self.encoding_param_k or self.default_encoding_param_k
1084         happy = self.encoding_param_happy or self.default_encoding_param_happy
1085         n = self.encoding_param_n or self.default_encoding_param_n
1086
1087         d = self.get_size()
1088         def _got_size(file_size):
1089             # for small files, shrink the segment size to avoid wasting space
1090             segsize = min(max_segsize, file_size)
1091             # this must be a multiple of 'required_shares'==k
1092             segsize = mathutil.next_multiple(segsize, k)
1093             encoding_parameters = (k, happy, n, segsize)
1094             self._all_encoding_parameters = encoding_parameters
1095             return encoding_parameters
1096         d.addCallback(_got_size)
1097         return d
1098
1099 class FileHandle(BaseUploadable):
1100     implements(IUploadable)
1101
1102     def __init__(self, filehandle, convergence):
1103         """
1104         Upload the data from the filehandle.  If convergence is None then a
1105         random encryption key will be used, else the plaintext will be hashed,
1106         then the hash will be hashed together with the string in the
1107         "convergence" argument to form the encryption key.
1108         """
1109         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1110         self._filehandle = filehandle
1111         self._key = None
1112         self.convergence = convergence
1113         self._size = None
1114
1115     def _get_encryption_key_convergent(self):
1116         if self._key is not None:
1117             return defer.succeed(self._key)
1118
1119         d = self.get_size()
1120         # that sets self._size as a side-effect
1121         d.addCallback(lambda size: self.get_all_encoding_parameters())
1122         def _got(params):
1123             k, happy, n, segsize = params
1124             f = self._filehandle
1125             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1126             f.seek(0)
1127             BLOCKSIZE = 64*1024
1128             bytes_read = 0
1129             while True:
1130                 data = f.read(BLOCKSIZE)
1131                 if not data:
1132                     break
1133                 enckey_hasher.update(data)
1134                 # TODO: setting progress in a non-yielding loop is kind of
1135                 # pointless, but I'm anticipating (perhaps prematurely) the
1136                 # day when we use a slowjob or twisted's CooperatorService to
1137                 # make this yield time to other jobs.
1138                 bytes_read += len(data)
1139                 if self._status:
1140                     self._status.set_progress(0, float(bytes_read)/self._size)
1141             f.seek(0)
1142             self._key = enckey_hasher.digest()
1143             if self._status:
1144                 self._status.set_progress(0, 1.0)
1145             assert len(self._key) == 16
1146             return self._key
1147         d.addCallback(_got)
1148         return d
1149
1150     def _get_encryption_key_random(self):
1151         if self._key is None:
1152             self._key = os.urandom(16)
1153         return defer.succeed(self._key)
1154
1155     def get_encryption_key(self):
1156         if self.convergence is not None:
1157             return self._get_encryption_key_convergent()
1158         else:
1159             return self._get_encryption_key_random()
1160
1161     def get_size(self):
1162         if self._size is not None:
1163             return defer.succeed(self._size)
1164         self._filehandle.seek(0,2)
1165         size = self._filehandle.tell()
1166         self._size = size
1167         self._filehandle.seek(0)
1168         return defer.succeed(size)
1169
1170     def read(self, length):
1171         return defer.succeed([self._filehandle.read(length)])
1172
1173     def close(self):
1174         # the originator of the filehandle reserves the right to close it
1175         pass
1176
1177 class FileName(FileHandle):
1178     def __init__(self, filename, convergence):
1179         """
1180         Upload the data from the filename.  If convergence is None then a
1181         random encryption key will be used, else the plaintext will be hashed,
1182         then the hash will be hashed together with the string in the
1183         "convergence" argument to form the encryption key.
1184         """
1185         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1186         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1187     def close(self):
1188         FileHandle.close(self)
1189         self._filehandle.close()
1190
1191 class Data(FileHandle):
1192     def __init__(self, data, convergence):
1193         """
1194         Upload the data from the data argument.  If convergence is None then a
1195         random encryption key will be used, else the plaintext will be hashed,
1196         then the hash will be hashed together with the string in the
1197         "convergence" argument to form the encryption key.
1198         """
1199         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1200         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1201
1202 class Uploader(service.MultiService, log.PrefixingLogMixin):
1203     """I am a service that allows file uploading. I am a service-child of the
1204     Client.
1205     """
1206     implements(IUploader)
1207     name = "uploader"
1208     URI_LIT_SIZE_THRESHOLD = 55
1209
1210     def __init__(self, helper_furl=None, stats_provider=None):
1211         self._helper_furl = helper_furl
1212         self.stats_provider = stats_provider
1213         self._helper = None
1214         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1215         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1216         service.MultiService.__init__(self)
1217
1218     def startService(self):
1219         service.MultiService.startService(self)
1220         if self._helper_furl:
1221             self.parent.tub.connectTo(self._helper_furl,
1222                                       self._got_helper)
1223
1224     def _got_helper(self, helper):
1225         self.log("got helper connection, getting versions")
1226         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1227                     { },
1228                     "application-version": "unknown: no get_version()",
1229                     }
1230         d = get_versioned_remote_reference(helper, default)
1231         d.addCallback(self._got_versioned_helper)
1232
1233     def _got_versioned_helper(self, helper):
1234         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1235         if needed not in helper.version:
1236             raise InsufficientVersionError(needed, helper.version)
1237         self._helper = helper
1238         helper.notifyOnDisconnect(self._lost_helper)
1239
1240     def _lost_helper(self):
1241         self._helper = None
1242
1243     def get_helper_info(self):
1244         # return a tuple of (helper_furl_or_None, connected_bool)
1245         return (self._helper_furl, bool(self._helper))
1246
1247
1248     def upload(self, uploadable, history=None):
1249         """
1250         Returns a Deferred that will fire with the UploadResults instance.
1251         """
1252         assert self.parent
1253         assert self.running
1254
1255         uploadable = IUploadable(uploadable)
1256         d = uploadable.get_size()
1257         def _got_size(size):
1258             default_params = self.parent.get_encoding_parameters()
1259             precondition(isinstance(default_params, dict), default_params)
1260             precondition("max_segment_size" in default_params, default_params)
1261             uploadable.set_default_encoding_parameters(default_params)
1262
1263             if self.stats_provider:
1264                 self.stats_provider.count('uploader.files_uploaded', 1)
1265                 self.stats_provider.count('uploader.bytes_uploaded', size)
1266
1267             if size <= self.URI_LIT_SIZE_THRESHOLD:
1268                 uploader = LiteralUploader(self.parent)
1269                 return uploader.start(uploadable)
1270             else:
1271                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1272                 d2 = defer.succeed(None)
1273                 if self._helper:
1274                     uploader = AssistedUploader(self._helper)
1275                     d2.addCallback(lambda x: eu.get_storage_index())
1276                     d2.addCallback(lambda si: uploader.start(eu, si))
1277                 else:
1278                     uploader = CHKUploader(self.parent)
1279                     d2.addCallback(lambda x: uploader.start(eu))
1280
1281                 self._all_uploads[uploader] = None
1282                 if history:
1283                     history.add_upload(uploader.get_upload_status())
1284                 def turn_verifycap_into_read_cap(uploadresults):
1285                     # Generate the uri from the verifycap plus the key.
1286                     d3 = uploadable.get_encryption_key()
1287                     def put_readcap_into_results(key):
1288                         v = uri.from_string(uploadresults.verifycapstr)
1289                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1290                         uploadresults.uri = r.to_string()
1291                         return uploadresults
1292                     d3.addCallback(put_readcap_into_results)
1293                     return d3
1294                 d2.addCallback(turn_verifycap_into_read_cap)
1295                 return d2
1296         d.addCallback(_got_size)
1297         def _done(res):
1298             uploadable.close()
1299             return res
1300         d.addBoth(_done)
1301         return d