]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
Alter 'immutable/encode.py' and 'immutable/upload.py' to use servers_of_happiness...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.rrefutil import add_version_to_remote_reference
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
20      NotEnoughSharesError, NoSharesError, NoServersError, \
21      InsufficientVersionError
22 from allmydata.immutable import layout
23 from pycryptopp.cipher.aes import AES
24
25 from cStringIO import StringIO
26
27
28 KiB=1024
29 MiB=1024*KiB
30 GiB=1024*MiB
31 TiB=1024*GiB
32 PiB=1024*TiB
33
34 class HaveAllPeersError(Exception):
35     # we use this to jump out of the loop
36     pass
37
38 # this wants to live in storage, not here
39 class TooFullError(Exception):
40     pass
41
42 class UploadResults(Copyable, RemoteCopy):
43     implements(IUploadResults)
44     # note: don't change this string, it needs to match the value used on the
45     # helper, and it does *not* need to match the fully-qualified
46     # package/module/class name
47     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
48     copytype = typeToCopy
49
50     # also, think twice about changing the shape of any existing attribute,
51     # because instances of this class are sent from the helper to its client,
52     # so changing this may break compatibility. Consider adding new fields
53     # instead of modifying existing ones.
54
55     def __init__(self):
56         self.timings = {} # dict of name to number of seconds
57         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
58         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
59         self.file_size = None
60         self.ciphertext_fetched = None # how much the helper fetched
61         self.uri = None
62         self.preexisting_shares = None # count of shares already present
63         self.pushed_shares = None # count of shares we pushed
64
65
66 # our current uri_extension is 846 bytes for small files, a few bytes
67 # more for larger ones (since the filesize is encoded in decimal in a
68 # few places). Ask for a little bit more just in case we need it. If
69 # the extension changes size, we can change EXTENSION_SIZE to
70 # allocate a more accurate amount of space.
71 EXTENSION_SIZE = 1000
72 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
73 # this.
74
75 class PeerTracker:
76     def __init__(self, peerid, storage_server,
77                  sharesize, blocksize, num_segments, num_share_hashes,
78                  storage_index,
79                  bucket_renewal_secret, bucket_cancel_secret):
80         precondition(isinstance(peerid, str), peerid)
81         precondition(len(peerid) == 20, peerid)
82         self.peerid = peerid
83         self._storageserver = storage_server # to an RIStorageServer
84         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
85         self.sharesize = sharesize
86
87         wbp = layout.make_write_bucket_proxy(None, sharesize,
88                                              blocksize, num_segments,
89                                              num_share_hashes,
90                                              EXTENSION_SIZE, peerid)
91         self.wbp_class = wbp.__class__ # to create more of them
92         self.allocated_size = wbp.get_allocated_size()
93         self.blocksize = blocksize
94         self.num_segments = num_segments
95         self.num_share_hashes = num_share_hashes
96         self.storage_index = storage_index
97
98         self.renew_secret = bucket_renewal_secret
99         self.cancel_secret = bucket_cancel_secret
100
101     def __repr__(self):
102         return ("<PeerTracker for peer %s and SI %s>"
103                 % (idlib.shortnodeid_b2a(self.peerid),
104                    si_b2a(self.storage_index)[:5]))
105
106     def query(self, sharenums):
107         d = self._storageserver.callRemote("allocate_buckets",
108                                            self.storage_index,
109                                            self.renew_secret,
110                                            self.cancel_secret,
111                                            sharenums,
112                                            self.allocated_size,
113                                            canary=Referenceable())
114         d.addCallback(self._got_reply)
115         return d
116
117     def _got_reply(self, (alreadygot, buckets)):
118         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
119         b = {}
120         for sharenum, rref in buckets.iteritems():
121             bp = self.wbp_class(rref, self.sharesize,
122                                 self.blocksize,
123                                 self.num_segments,
124                                 self.num_share_hashes,
125                                 EXTENSION_SIZE,
126                                 self.peerid)
127             b[sharenum] = bp
128         self.buckets.update(b)
129         return (alreadygot, set(b.keys()))
130
131 def servers_with_unique_shares(existing_shares, used_peers=None):
132     servers = []
133     if used_peers:
134         peers = list(used_peers.copy())
135         # We do this because the preexisting shares list goes by peerid.
136         peers = [x.peerid for x in peers]
137         servers.extend(peers)
138     servers.extend(existing_shares.values())
139     return list(set(servers))
140
141 def shares_by_server(existing_shares):
142     servers = {}
143     for server in set(existing_shares.values()):
144         servers[server] = set([x for x in existing_shares.keys()
145                                if existing_shares[x] == server])
146     return servers
147
148 class Tahoe2PeerSelector:
149
150     def __init__(self, upload_id, logparent=None, upload_status=None):
151         self.upload_id = upload_id
152         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
153         self.error_count = 0
154         self.num_peers_contacted = 0
155         self.last_failure_msg = None
156         self._status = IUploadStatus(upload_status)
157         self._log_parent = log.msg("%s starting" % self, parent=logparent)
158
159     def __repr__(self):
160         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
161
162     def get_shareholders(self, storage_broker, secret_holder,
163                          storage_index, share_size, block_size,
164                          num_segments, total_shares, servers_of_happiness):
165         """
166         @return: (used_peers, already_peers), where used_peers is a set of
167                  PeerTracker instances that have agreed to hold some shares
168                  for us (the shnum is stashed inside the PeerTracker),
169                  and already_peers is a dict mapping shnum to a peer
170                  which claims to already have the share.
171         """
172
173         if self._status:
174             self._status.set_status("Contacting Peers..")
175
176         self.total_shares = total_shares
177         self.servers_of_happiness = servers_of_happiness
178
179         self.homeless_shares = range(total_shares)
180         # self.uncontacted_peers = list() # peers we haven't asked yet
181         self.contacted_peers = [] # peers worth asking again
182         self.contacted_peers2 = [] # peers that we have asked again
183         self._started_second_pass = False
184         self.use_peers = set() # PeerTrackers that have shares assigned to them
185         self.preexisting_shares = {} # sharenum -> peerid holding the share
186
187         peers = storage_broker.get_servers_for_index(storage_index)
188         if not peers:
189             raise NoServersError("client gave us zero peers")
190
191         # this needed_hashes computation should mirror
192         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
193         # (instead of a HashTree) because we don't require actual hashing
194         # just to count the levels.
195         ht = hashtree.IncompleteHashTree(total_shares)
196         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
197
198         # figure out how much space to ask for
199         wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
200                                              num_share_hashes, EXTENSION_SIZE,
201                                              None)
202         allocated_size = wbp.get_allocated_size()
203
204         # filter the list of peers according to which ones can accomodate
205         # this request. This excludes older peers (which used a 4-byte size
206         # field) from getting large shares (for files larger than about
207         # 12GiB). See #439 for details.
208         def _get_maxsize(peer):
209             (peerid, conn) = peer
210             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
211             return v1["maximum-immutable-share-size"]
212         peers = [peer for peer in peers
213                  if _get_maxsize(peer) >= allocated_size]
214         if not peers:
215             raise NoServersError("no peers could accept an allocated_size of %d" % allocated_size)
216
217         # decide upon the renewal/cancel secrets, to include them in the
218         # allocate_buckets query.
219         client_renewal_secret = secret_holder.get_renewal_secret()
220         client_cancel_secret = secret_holder.get_cancel_secret()
221
222         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
223                                                        storage_index)
224         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
225                                                      storage_index)
226
227         trackers = [ PeerTracker(peerid, conn,
228                                  share_size, block_size,
229                                  num_segments, num_share_hashes,
230                                  storage_index,
231                                  bucket_renewal_secret_hash(file_renewal_secret,
232                                                             peerid),
233                                  bucket_cancel_secret_hash(file_cancel_secret,
234                                                            peerid),
235                                  )
236                      for (peerid, conn) in peers ]
237         self.uncontacted_peers = trackers
238
239         d = defer.maybeDeferred(self._loop)
240         return d
241
242
243     def _loop(self):
244         if not self.homeless_shares:
245             effective_happiness = servers_with_unique_shares(
246                                                    self.preexisting_shares,
247                                                    self.use_peers)
248             if self.servers_of_happiness <= len(effective_happiness):
249                 msg = ("placed all %d shares, "
250                        "sent %d queries to %d peers, "
251                        "%d queries placed some shares, %d placed none, "
252                        "got %d errors" %
253                        (self.total_shares,
254                         self.query_count, self.num_peers_contacted,
255                         self.good_query_count, self.bad_query_count,
256                         self.error_count))
257                 log.msg("peer selection successful for %s: %s" % (self, msg),
258                     parent=self._log_parent)
259                 return (self.use_peers, self.preexisting_shares)
260             else:
261                 delta = self.servers_of_happiness - len(effective_happiness)
262                 shares = shares_by_server(self.preexisting_shares)
263                 # Each server in shares maps to a set of shares stored on it.
264                 # Since we want to keep at least one share on each server 
265                 # that has one (otherwise we'd only be making
266                 # the situation worse by removing distinct servers),
267                 # each server has len(its shares) - 1 to spread around.
268                 shares_to_spread = sum([len(list(sharelist)) - 1
269                                         for (server, sharelist)
270                                         in shares.items()])
271                 if delta <= len(self.uncontacted_peers) and \
272                    shares_to_spread >= delta:
273                     # Loop through the allocated shares, removing 
274                     items = shares.items()
275                     while len(self.homeless_shares) < delta:
276                         servernum, sharelist = items.pop()
277                         if len(sharelist) > 1:
278                             share = sharelist.pop()
279                             self.homeless_shares.append(share)
280                             del(self.preexisting_shares[share])
281                             items.append((servernum, sharelist))
282                     return self._loop()
283                 else:
284                     raise NotEnoughSharesError("shares could only be placed on %d "
285                                             "servers (%d were requested)" %
286                                             (len(effective_happiness),
287                                              self.servers_of_happiness))
288
289         if self.uncontacted_peers:
290             peer = self.uncontacted_peers.pop(0)
291             # TODO: don't pre-convert all peerids to PeerTrackers
292             assert isinstance(peer, PeerTracker)
293
294             shares_to_ask = set([self.homeless_shares.pop(0)])
295             self.query_count += 1
296             self.num_peers_contacted += 1
297             if self._status:
298                 self._status.set_status("Contacting Peers [%s] (first query),"
299                                         " %d shares left.."
300                                         % (idlib.shortnodeid_b2a(peer.peerid),
301                                            len(self.homeless_shares)))
302             d = peer.query(shares_to_ask)
303             d.addBoth(self._got_response, peer, shares_to_ask,
304                       self.contacted_peers)
305             return d
306         elif self.contacted_peers:
307             # ask a peer that we've already asked.
308             if not self._started_second_pass:
309                 log.msg("starting second pass", parent=self._log_parent,
310                         level=log.NOISY)
311                 self._started_second_pass = True
312             num_shares = mathutil.div_ceil(len(self.homeless_shares),
313                                            len(self.contacted_peers))
314             peer = self.contacted_peers.pop(0)
315             shares_to_ask = set(self.homeless_shares[:num_shares])
316             self.homeless_shares[:num_shares] = []
317             self.query_count += 1
318             if self._status:
319                 self._status.set_status("Contacting Peers [%s] (second query),"
320                                         " %d shares left.."
321                                         % (idlib.shortnodeid_b2a(peer.peerid),
322                                            len(self.homeless_shares)))
323             d = peer.query(shares_to_ask)
324             d.addBoth(self._got_response, peer, shares_to_ask,
325                       self.contacted_peers2)
326             return d
327         elif self.contacted_peers2:
328             # we've finished the second-or-later pass. Move all the remaining
329             # peers back into self.contacted_peers for the next pass.
330             self.contacted_peers.extend(self.contacted_peers2)
331             self.contacted_peers2[:] = []
332             return self._loop()
333         else:
334             # no more peers. If we haven't placed enough shares, we fail.
335             placed_shares = self.total_shares - len(self.homeless_shares)
336             effective_happiness = servers_with_unique_shares(
337                                                    self.preexisting_shares,
338                                                    self.use_peers)
339             if len(effective_happiness) < self.servers_of_happiness:
340                 msg = ("placed %d shares out of %d total (%d homeless), "
341                        "want to place on %d servers, "
342                        "sent %d queries to %d peers, "
343                        "%d queries placed some shares, %d placed none, "
344                        "got %d errors" %
345                        (self.total_shares - len(self.homeless_shares),
346                         self.total_shares, len(self.homeless_shares),
347                         self.servers_of_happiness,
348                         self.query_count, self.num_peers_contacted,
349                         self.good_query_count, self.bad_query_count,
350                         self.error_count))
351                 msg = "peer selection failed for %s: %s" % (self, msg)
352                 if self.last_failure_msg:
353                     msg += " (%s)" % (self.last_failure_msg,)
354                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
355                 if placed_shares:
356                     raise NotEnoughSharesError(msg)
357                 else:
358                     raise NoSharesError(msg)
359             else:
360                 # we placed enough to be happy, so we're done
361                 if self._status:
362                     self._status.set_status("Placed all shares")
363                 return self.use_peers
364
365     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
366         if isinstance(res, failure.Failure):
367             # This is unusual, and probably indicates a bug or a network
368             # problem.
369             log.msg("%s got error during peer selection: %s" % (peer, res),
370                     level=log.UNUSUAL, parent=self._log_parent)
371             self.error_count += 1
372             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
373             if (self.uncontacted_peers
374                 or self.contacted_peers
375                 or self.contacted_peers2):
376                 # there is still hope, so just loop
377                 pass
378             else:
379                 # No more peers, so this upload might fail (it depends upon
380                 # whether we've hit shares_of_happiness or not). Log the last
381                 # failure we got: if a coding error causes all peers to fail
382                 # in the same way, this allows the common failure to be seen
383                 # by the uploader and should help with debugging
384                 msg = ("last failure (from %s) was: %s" % (peer, res))
385                 self.last_failure_msg = msg
386         else:
387             (alreadygot, allocated) = res
388             log.msg("response from peer %s: alreadygot=%s, allocated=%s"
389                     % (idlib.shortnodeid_b2a(peer.peerid),
390                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
391                     level=log.NOISY, parent=self._log_parent)
392             progress = False
393             for s in alreadygot:
394                 if self.preexisting_shares.has_key(s):
395                     old_size = len(servers_with_unique_shares(self.preexisting_shares))
396                     new_candidate = self.preexisting_shares.copy()
397                     new_candidate[s] = peer.peerid
398                     new_size = len(servers_with_unique_shares(new_candidate))
399                     if old_size >= new_size: continue
400                 self.preexisting_shares[s] = peer.peerid
401                 if s in self.homeless_shares:
402                     self.homeless_shares.remove(s)
403                     progress = True
404
405             # the PeerTracker will remember which shares were allocated on
406             # that peer. We just have to remember to use them.
407             if allocated:
408                 self.use_peers.add(peer)
409                 progress = True
410
411             not_yet_present = set(shares_to_ask) - set(alreadygot)
412             still_homeless = not_yet_present - set(allocated)
413
414             if progress:
415                 # they accepted or already had at least one share, so
416                 # progress has been made
417                 self.good_query_count += 1
418             else:
419                 self.bad_query_count += 1
420
421             if still_homeless:
422                 # In networks with lots of space, this is very unusual and
423                 # probably indicates an error. In networks with peers that
424                 # are full, it is merely unusual. In networks that are very
425                 # full, it is common, and many uploads will fail. In most
426                 # cases, this is obviously not fatal, and we'll just use some
427                 # other peers.
428
429                 # some shares are still homeless, keep trying to find them a
430                 # home. The ones that were rejected get first priority.
431                 self.homeless_shares = (list(still_homeless)
432                                         + self.homeless_shares)
433                 # Since they were unable to accept all of our requests, so it
434                 # is safe to assume that asking them again won't help.
435             else:
436                 # if they *were* able to accept everything, they might be
437                 # willing to accept even more.
438                 put_peer_here.append(peer)
439
440         # now loop
441         return self._loop()
442
443
444 class EncryptAnUploadable:
445     """This is a wrapper that takes an IUploadable and provides
446     IEncryptedUploadable."""
447     implements(IEncryptedUploadable)
448     CHUNKSIZE = 50*1024
449
450     def __init__(self, original, log_parent=None):
451         self.original = IUploadable(original)
452         self._log_number = log_parent
453         self._encryptor = None
454         self._plaintext_hasher = plaintext_hasher()
455         self._plaintext_segment_hasher = None
456         self._plaintext_segment_hashes = []
457         self._encoding_parameters = None
458         self._file_size = None
459         self._ciphertext_bytes_read = 0
460         self._status = None
461
462     def set_upload_status(self, upload_status):
463         self._status = IUploadStatus(upload_status)
464         self.original.set_upload_status(upload_status)
465
466     def log(self, *args, **kwargs):
467         if "facility" not in kwargs:
468             kwargs["facility"] = "upload.encryption"
469         if "parent" not in kwargs:
470             kwargs["parent"] = self._log_number
471         return log.msg(*args, **kwargs)
472
473     def get_size(self):
474         if self._file_size is not None:
475             return defer.succeed(self._file_size)
476         d = self.original.get_size()
477         def _got_size(size):
478             self._file_size = size
479             if self._status:
480                 self._status.set_size(size)
481             return size
482         d.addCallback(_got_size)
483         return d
484
485     def get_all_encoding_parameters(self):
486         if self._encoding_parameters is not None:
487             return defer.succeed(self._encoding_parameters)
488         d = self.original.get_all_encoding_parameters()
489         def _got(encoding_parameters):
490             (k, happy, n, segsize) = encoding_parameters
491             self._segment_size = segsize # used by segment hashers
492             self._encoding_parameters = encoding_parameters
493             self.log("my encoding parameters: %s" % (encoding_parameters,),
494                      level=log.NOISY)
495             return encoding_parameters
496         d.addCallback(_got)
497         return d
498
499     def _get_encryptor(self):
500         if self._encryptor:
501             return defer.succeed(self._encryptor)
502
503         d = self.original.get_encryption_key()
504         def _got(key):
505             e = AES(key)
506             self._encryptor = e
507
508             storage_index = storage_index_hash(key)
509             assert isinstance(storage_index, str)
510             # There's no point to having the SI be longer than the key, so we
511             # specify that it is truncated to the same 128 bits as the AES key.
512             assert len(storage_index) == 16  # SHA-256 truncated to 128b
513             self._storage_index = storage_index
514             if self._status:
515                 self._status.set_storage_index(storage_index)
516             return e
517         d.addCallback(_got)
518         return d
519
520     def get_storage_index(self):
521         d = self._get_encryptor()
522         d.addCallback(lambda res: self._storage_index)
523         return d
524
525     def _get_segment_hasher(self):
526         p = self._plaintext_segment_hasher
527         if p:
528             left = self._segment_size - self._plaintext_segment_hashed_bytes
529             return p, left
530         p = plaintext_segment_hasher()
531         self._plaintext_segment_hasher = p
532         self._plaintext_segment_hashed_bytes = 0
533         return p, self._segment_size
534
535     def _update_segment_hash(self, chunk):
536         offset = 0
537         while offset < len(chunk):
538             p, segment_left = self._get_segment_hasher()
539             chunk_left = len(chunk) - offset
540             this_segment = min(chunk_left, segment_left)
541             p.update(chunk[offset:offset+this_segment])
542             self._plaintext_segment_hashed_bytes += this_segment
543
544             if self._plaintext_segment_hashed_bytes == self._segment_size:
545                 # we've filled this segment
546                 self._plaintext_segment_hashes.append(p.digest())
547                 self._plaintext_segment_hasher = None
548                 self.log("closed hash [%d]: %dB" %
549                          (len(self._plaintext_segment_hashes)-1,
550                           self._plaintext_segment_hashed_bytes),
551                          level=log.NOISY)
552                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
553                          segnum=len(self._plaintext_segment_hashes)-1,
554                          hash=base32.b2a(p.digest()),
555                          level=log.NOISY)
556
557             offset += this_segment
558
559
560     def read_encrypted(self, length, hash_only):
561         # make sure our parameters have been set up first
562         d = self.get_all_encoding_parameters()
563         # and size
564         d.addCallback(lambda ignored: self.get_size())
565         d.addCallback(lambda ignored: self._get_encryptor())
566         # then fetch and encrypt the plaintext. The unusual structure here
567         # (passing a Deferred *into* a function) is needed to avoid
568         # overflowing the stack: Deferreds don't optimize out tail recursion.
569         # We also pass in a list, to which _read_encrypted will append
570         # ciphertext.
571         ciphertext = []
572         d2 = defer.Deferred()
573         d.addCallback(lambda ignored:
574                       self._read_encrypted(length, ciphertext, hash_only, d2))
575         d.addCallback(lambda ignored: d2)
576         return d
577
578     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
579         if not remaining:
580             fire_when_done.callback(ciphertext)
581             return None
582         # tolerate large length= values without consuming a lot of RAM by
583         # reading just a chunk (say 50kB) at a time. This only really matters
584         # when hash_only==True (i.e. resuming an interrupted upload), since
585         # that's the case where we will be skipping over a lot of data.
586         size = min(remaining, self.CHUNKSIZE)
587         remaining = remaining - size
588         # read a chunk of plaintext..
589         d = defer.maybeDeferred(self.original.read, size)
590         # N.B.: if read() is synchronous, then since everything else is
591         # actually synchronous too, we'd blow the stack unless we stall for a
592         # tick. Once you accept a Deferred from IUploadable.read(), you must
593         # be prepared to have it fire immediately too.
594         d.addCallback(fireEventually)
595         def _good(plaintext):
596             # and encrypt it..
597             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
598             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
599             ciphertext.extend(ct)
600             self._read_encrypted(remaining, ciphertext, hash_only,
601                                  fire_when_done)
602         def _err(why):
603             fire_when_done.errback(why)
604         d.addCallback(_good)
605         d.addErrback(_err)
606         return None
607
608     def _hash_and_encrypt_plaintext(self, data, hash_only):
609         assert isinstance(data, (tuple, list)), type(data)
610         data = list(data)
611         cryptdata = []
612         # we use data.pop(0) instead of 'for chunk in data' to save
613         # memory: each chunk is destroyed as soon as we're done with it.
614         bytes_processed = 0
615         while data:
616             chunk = data.pop(0)
617             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
618                      level=log.NOISY)
619             bytes_processed += len(chunk)
620             self._plaintext_hasher.update(chunk)
621             self._update_segment_hash(chunk)
622             # TODO: we have to encrypt the data (even if hash_only==True)
623             # because pycryptopp's AES-CTR implementation doesn't offer a
624             # way to change the counter value. Once pycryptopp acquires
625             # this ability, change this to simply update the counter
626             # before each call to (hash_only==False) _encryptor.process()
627             ciphertext = self._encryptor.process(chunk)
628             if hash_only:
629                 self.log("  skipping encryption", level=log.NOISY)
630             else:
631                 cryptdata.append(ciphertext)
632             del ciphertext
633             del chunk
634         self._ciphertext_bytes_read += bytes_processed
635         if self._status:
636             progress = float(self._ciphertext_bytes_read) / self._file_size
637             self._status.set_progress(1, progress)
638         return cryptdata
639
640
641     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
642         # this is currently unused, but will live again when we fix #453
643         if len(self._plaintext_segment_hashes) < num_segments:
644             # close out the last one
645             assert len(self._plaintext_segment_hashes) == num_segments-1
646             p, segment_left = self._get_segment_hasher()
647             self._plaintext_segment_hashes.append(p.digest())
648             del self._plaintext_segment_hasher
649             self.log("closing plaintext leaf hasher, hashed %d bytes" %
650                      self._plaintext_segment_hashed_bytes,
651                      level=log.NOISY)
652             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
653                      segnum=len(self._plaintext_segment_hashes)-1,
654                      hash=base32.b2a(p.digest()),
655                      level=log.NOISY)
656         assert len(self._plaintext_segment_hashes) == num_segments
657         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
658
659     def get_plaintext_hash(self):
660         h = self._plaintext_hasher.digest()
661         return defer.succeed(h)
662
663     def close(self):
664         return self.original.close()
665
666 class UploadStatus:
667     implements(IUploadStatus)
668     statusid_counter = itertools.count(0)
669
670     def __init__(self):
671         self.storage_index = None
672         self.size = None
673         self.helper = False
674         self.status = "Not started"
675         self.progress = [0.0, 0.0, 0.0]
676         self.active = True
677         self.results = None
678         self.counter = self.statusid_counter.next()
679         self.started = time.time()
680
681     def get_started(self):
682         return self.started
683     def get_storage_index(self):
684         return self.storage_index
685     def get_size(self):
686         return self.size
687     def using_helper(self):
688         return self.helper
689     def get_status(self):
690         return self.status
691     def get_progress(self):
692         return tuple(self.progress)
693     def get_active(self):
694         return self.active
695     def get_results(self):
696         return self.results
697     def get_counter(self):
698         return self.counter
699
700     def set_storage_index(self, si):
701         self.storage_index = si
702     def set_size(self, size):
703         self.size = size
704     def set_helper(self, helper):
705         self.helper = helper
706     def set_status(self, status):
707         self.status = status
708     def set_progress(self, which, value):
709         # [0]: chk, [1]: ciphertext, [2]: encode+push
710         self.progress[which] = value
711     def set_active(self, value):
712         self.active = value
713     def set_results(self, value):
714         self.results = value
715
716 class CHKUploader:
717     peer_selector_class = Tahoe2PeerSelector
718
719     def __init__(self, storage_broker, secret_holder):
720         # peer_selector needs storage_broker and secret_holder
721         self._storage_broker = storage_broker
722         self._secret_holder = secret_holder
723         self._log_number = self.log("CHKUploader starting", parent=None)
724         self._encoder = None
725         self._results = UploadResults()
726         self._storage_index = None
727         self._upload_status = UploadStatus()
728         self._upload_status.set_helper(False)
729         self._upload_status.set_active(True)
730         self._upload_status.set_results(self._results)
731
732         # locate_all_shareholders() will create the following attribute:
733         # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
734
735     def log(self, *args, **kwargs):
736         if "parent" not in kwargs:
737             kwargs["parent"] = self._log_number
738         if "facility" not in kwargs:
739             kwargs["facility"] = "tahoe.upload"
740         return log.msg(*args, **kwargs)
741
742     def start(self, encrypted_uploadable):
743         """Start uploading the file.
744
745         Returns a Deferred that will fire with the UploadResults instance.
746         """
747
748         self._started = time.time()
749         eu = IEncryptedUploadable(encrypted_uploadable)
750         self.log("starting upload of %s" % eu)
751
752         eu.set_upload_status(self._upload_status)
753         d = self.start_encrypted(eu)
754         def _done(uploadresults):
755             self._upload_status.set_active(False)
756             return uploadresults
757         d.addBoth(_done)
758         return d
759
760     def abort(self):
761         """Call this if the upload must be abandoned before it completes.
762         This will tell the shareholders to delete their partial shares. I
763         return a Deferred that fires when these messages have been acked."""
764         if not self._encoder:
765             # how did you call abort() before calling start() ?
766             return defer.succeed(None)
767         return self._encoder.abort()
768
769     def start_encrypted(self, encrypted):
770         """ Returns a Deferred that will fire with the UploadResults instance. """
771         eu = IEncryptedUploadable(encrypted)
772
773         started = time.time()
774         self._encoder = e = encode.Encoder(self._log_number,
775                                            self._upload_status)
776         d = e.set_encrypted_uploadable(eu)
777         d.addCallback(self.locate_all_shareholders, started)
778         d.addCallback(self.set_shareholders, e)
779         d.addCallback(lambda res: e.start())
780         d.addCallback(self._encrypted_done)
781         return d
782
783     def locate_all_shareholders(self, encoder, started):
784         peer_selection_started = now = time.time()
785         self._storage_index_elapsed = now - started
786         storage_broker = self._storage_broker
787         secret_holder = self._secret_holder
788         storage_index = encoder.get_param("storage_index")
789         self._storage_index = storage_index
790         upload_id = si_b2a(storage_index)[:5]
791         self.log("using storage index %s" % upload_id)
792         peer_selector = self.peer_selector_class(upload_id, self._log_number,
793                                                  self._upload_status)
794
795         share_size = encoder.get_param("share_size")
796         block_size = encoder.get_param("block_size")
797         num_segments = encoder.get_param("num_segments")
798         k,desired,n = encoder.get_param("share_counts")
799
800         self._peer_selection_started = time.time()
801         d = peer_selector.get_shareholders(storage_broker, secret_holder,
802                                            storage_index,
803                                            share_size, block_size,
804                                            num_segments, n, desired)
805         def _done(res):
806             self._peer_selection_elapsed = time.time() - peer_selection_started
807             return res
808         d.addCallback(_done)
809         return d
810
811     def set_shareholders(self, (used_peers, already_peers), encoder):
812         """
813         @param used_peers: a sequence of PeerTracker objects
814         @paran already_peers: a dict mapping sharenum to a peerid that
815                               claims to already have this share
816         """
817         self.log("_send_shares, used_peers is %s" % (used_peers,))
818         # record already-present shares in self._results
819         self._results.preexisting_shares = len(already_peers)
820
821         self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
822         for peer in used_peers:
823             assert isinstance(peer, PeerTracker)
824         buckets = {}
825         servermap = already_peers.copy()
826         for peer in used_peers:
827             buckets.update(peer.buckets)
828             for shnum in peer.buckets:
829                 self._peer_trackers[shnum] = peer
830                 servermap[shnum] = peer.peerid
831         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
832         encoder.set_shareholders(buckets, servermap)
833
834     def _encrypted_done(self, verifycap):
835         """ Returns a Deferred that will fire with the UploadResults instance. """
836         r = self._results
837         for shnum in self._encoder.get_shares_placed():
838             peer_tracker = self._peer_trackers[shnum]
839             peerid = peer_tracker.peerid
840             r.sharemap.add(shnum, peerid)
841             r.servermap.add(peerid, shnum)
842         r.pushed_shares = len(self._encoder.get_shares_placed())
843         now = time.time()
844         r.file_size = self._encoder.file_size
845         r.timings["total"] = now - self._started
846         r.timings["storage_index"] = self._storage_index_elapsed
847         r.timings["peer_selection"] = self._peer_selection_elapsed
848         r.timings.update(self._encoder.get_times())
849         r.uri_extension_data = self._encoder.get_uri_extension_data()
850         r.verifycapstr = verifycap.to_string()
851         return r
852
853     def get_upload_status(self):
854         return self._upload_status
855
856 def read_this_many_bytes(uploadable, size, prepend_data=[]):
857     if size == 0:
858         return defer.succeed([])
859     d = uploadable.read(size)
860     def _got(data):
861         assert isinstance(data, list)
862         bytes = sum([len(piece) for piece in data])
863         assert bytes > 0
864         assert bytes <= size
865         remaining = size - bytes
866         if remaining:
867             return read_this_many_bytes(uploadable, remaining,
868                                         prepend_data + data)
869         return prepend_data + data
870     d.addCallback(_got)
871     return d
872
873 class LiteralUploader:
874
875     def __init__(self):
876         self._results = UploadResults()
877         self._status = s = UploadStatus()
878         s.set_storage_index(None)
879         s.set_helper(False)
880         s.set_progress(0, 1.0)
881         s.set_active(False)
882         s.set_results(self._results)
883
884     def start(self, uploadable):
885         uploadable = IUploadable(uploadable)
886         d = uploadable.get_size()
887         def _got_size(size):
888             self._size = size
889             self._status.set_size(size)
890             self._results.file_size = size
891             return read_this_many_bytes(uploadable, size)
892         d.addCallback(_got_size)
893         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
894         d.addCallback(lambda u: u.to_string())
895         d.addCallback(self._build_results)
896         return d
897
898     def _build_results(self, uri):
899         self._results.uri = uri
900         self._status.set_status("Finished")
901         self._status.set_progress(1, 1.0)
902         self._status.set_progress(2, 1.0)
903         return self._results
904
905     def close(self):
906         pass
907
908     def get_upload_status(self):
909         return self._status
910
911 class RemoteEncryptedUploadable(Referenceable):
912     implements(RIEncryptedUploadable)
913
914     def __init__(self, encrypted_uploadable, upload_status):
915         self._eu = IEncryptedUploadable(encrypted_uploadable)
916         self._offset = 0
917         self._bytes_sent = 0
918         self._status = IUploadStatus(upload_status)
919         # we are responsible for updating the status string while we run, and
920         # for setting the ciphertext-fetch progress.
921         self._size = None
922
923     def get_size(self):
924         if self._size is not None:
925             return defer.succeed(self._size)
926         d = self._eu.get_size()
927         def _got_size(size):
928             self._size = size
929             return size
930         d.addCallback(_got_size)
931         return d
932
933     def remote_get_size(self):
934         return self.get_size()
935     def remote_get_all_encoding_parameters(self):
936         return self._eu.get_all_encoding_parameters()
937
938     def _read_encrypted(self, length, hash_only):
939         d = self._eu.read_encrypted(length, hash_only)
940         def _read(strings):
941             if hash_only:
942                 self._offset += length
943             else:
944                 size = sum([len(data) for data in strings])
945                 self._offset += size
946             return strings
947         d.addCallback(_read)
948         return d
949
950     def remote_read_encrypted(self, offset, length):
951         # we don't support seek backwards, but we allow skipping forwards
952         precondition(offset >= 0, offset)
953         precondition(length >= 0, length)
954         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
955                      level=log.NOISY)
956         precondition(offset >= self._offset, offset, self._offset)
957         if offset > self._offset:
958             # read the data from disk anyways, to build up the hash tree
959             skip = offset - self._offset
960             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
961                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
962             d = self._read_encrypted(skip, hash_only=True)
963         else:
964             d = defer.succeed(None)
965
966         def _at_correct_offset(res):
967             assert offset == self._offset, "%d != %d" % (offset, self._offset)
968             return self._read_encrypted(length, hash_only=False)
969         d.addCallback(_at_correct_offset)
970
971         def _read(strings):
972             size = sum([len(data) for data in strings])
973             self._bytes_sent += size
974             return strings
975         d.addCallback(_read)
976         return d
977
978     def remote_close(self):
979         return self._eu.close()
980
981
982 class AssistedUploader:
983
984     def __init__(self, helper):
985         self._helper = helper
986         self._log_number = log.msg("AssistedUploader starting")
987         self._storage_index = None
988         self._upload_status = s = UploadStatus()
989         s.set_helper(True)
990         s.set_active(True)
991
992     def log(self, *args, **kwargs):
993         if "parent" not in kwargs:
994             kwargs["parent"] = self._log_number
995         return log.msg(*args, **kwargs)
996
997     def start(self, encrypted_uploadable, storage_index):
998         """Start uploading the file.
999
1000         Returns a Deferred that will fire with the UploadResults instance.
1001         """
1002         precondition(isinstance(storage_index, str), storage_index)
1003         self._started = time.time()
1004         eu = IEncryptedUploadable(encrypted_uploadable)
1005         eu.set_upload_status(self._upload_status)
1006         self._encuploadable = eu
1007         self._storage_index = storage_index
1008         d = eu.get_size()
1009         d.addCallback(self._got_size)
1010         d.addCallback(lambda res: eu.get_all_encoding_parameters())
1011         d.addCallback(self._got_all_encoding_parameters)
1012         d.addCallback(self._contact_helper)
1013         d.addCallback(self._build_verifycap)
1014         def _done(res):
1015             self._upload_status.set_active(False)
1016             return res
1017         d.addBoth(_done)
1018         return d
1019
1020     def _got_size(self, size):
1021         self._size = size
1022         self._upload_status.set_size(size)
1023
1024     def _got_all_encoding_parameters(self, params):
1025         k, happy, n, segment_size = params
1026         # stash these for URI generation later
1027         self._needed_shares = k
1028         self._total_shares = n
1029         self._segment_size = segment_size
1030
1031     def _contact_helper(self, res):
1032         now = self._time_contacting_helper_start = time.time()
1033         self._storage_index_elapsed = now - self._started
1034         self.log(format="contacting helper for SI %(si)s..",
1035                  si=si_b2a(self._storage_index))
1036         self._upload_status.set_status("Contacting Helper")
1037         d = self._helper.callRemote("upload_chk", self._storage_index)
1038         d.addCallback(self._contacted_helper)
1039         return d
1040
1041     def _contacted_helper(self, (upload_results, upload_helper)):
1042         now = time.time()
1043         elapsed = now - self._time_contacting_helper_start
1044         self._elapsed_time_contacting_helper = elapsed
1045         if upload_helper:
1046             self.log("helper says we need to upload")
1047             self._upload_status.set_status("Uploading Ciphertext")
1048             # we need to upload the file
1049             reu = RemoteEncryptedUploadable(self._encuploadable,
1050                                             self._upload_status)
1051             # let it pre-compute the size for progress purposes
1052             d = reu.get_size()
1053             d.addCallback(lambda ignored:
1054                           upload_helper.callRemote("upload", reu))
1055             # this Deferred will fire with the upload results
1056             return d
1057         self.log("helper says file is already uploaded")
1058         self._upload_status.set_progress(1, 1.0)
1059         self._upload_status.set_results(upload_results)
1060         return upload_results
1061
1062     def _convert_old_upload_results(self, upload_results):
1063         # pre-1.3.0 helpers return upload results which contain a mapping
1064         # from shnum to a single human-readable string, containing things
1065         # like "Found on [x],[y],[z]" (for healthy files that were already in
1066         # the grid), "Found on [x]" (for files that needed upload but which
1067         # discovered pre-existing shares), and "Placed on [x]" (for newly
1068         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1069         # set of binary serverid strings.
1070
1071         # the old results are too hard to deal with (they don't even contain
1072         # as much information as the new results, since the nodeids are
1073         # abbreviated), so if we detect old results, just clobber them.
1074
1075         sharemap = upload_results.sharemap
1076         if str in [type(v) for v in sharemap.values()]:
1077             upload_results.sharemap = None
1078
1079     def _build_verifycap(self, upload_results):
1080         self.log("upload finished, building readcap")
1081         self._convert_old_upload_results(upload_results)
1082         self._upload_status.set_status("Building Readcap")
1083         r = upload_results
1084         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1085         assert r.uri_extension_data["total_shares"] == self._total_shares
1086         assert r.uri_extension_data["segment_size"] == self._segment_size
1087         assert r.uri_extension_data["size"] == self._size
1088         r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1089                                              uri_extension_hash=r.uri_extension_hash,
1090                                              needed_shares=self._needed_shares,
1091                                              total_shares=self._total_shares, size=self._size
1092                                              ).to_string()
1093         now = time.time()
1094         r.file_size = self._size
1095         r.timings["storage_index"] = self._storage_index_elapsed
1096         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1097         if "total" in r.timings:
1098             r.timings["helper_total"] = r.timings["total"]
1099         r.timings["total"] = now - self._started
1100         self._upload_status.set_status("Finished")
1101         self._upload_status.set_results(r)
1102         return r
1103
1104     def get_upload_status(self):
1105         return self._upload_status
1106
1107 class BaseUploadable:
1108     default_max_segment_size = 128*KiB # overridden by max_segment_size
1109     default_encoding_param_k = 3 # overridden by encoding_parameters
1110     default_encoding_param_happy = 7
1111     default_encoding_param_n = 10
1112
1113     max_segment_size = None
1114     encoding_param_k = None
1115     encoding_param_happy = None
1116     encoding_param_n = None
1117
1118     _all_encoding_parameters = None
1119     _status = None
1120
1121     def set_upload_status(self, upload_status):
1122         self._status = IUploadStatus(upload_status)
1123
1124     def set_default_encoding_parameters(self, default_params):
1125         assert isinstance(default_params, dict)
1126         for k,v in default_params.items():
1127             precondition(isinstance(k, str), k, v)
1128             precondition(isinstance(v, int), k, v)
1129         if "k" in default_params:
1130             self.default_encoding_param_k = default_params["k"]
1131         if "happy" in default_params:
1132             self.default_encoding_param_happy = default_params["happy"]
1133         if "n" in default_params:
1134             self.default_encoding_param_n = default_params["n"]
1135         if "max_segment_size" in default_params:
1136             self.default_max_segment_size = default_params["max_segment_size"]
1137
1138     def get_all_encoding_parameters(self):
1139         if self._all_encoding_parameters:
1140             return defer.succeed(self._all_encoding_parameters)
1141
1142         max_segsize = self.max_segment_size or self.default_max_segment_size
1143         k = self.encoding_param_k or self.default_encoding_param_k
1144         happy = self.encoding_param_happy or self.default_encoding_param_happy
1145         n = self.encoding_param_n or self.default_encoding_param_n
1146
1147         d = self.get_size()
1148         def _got_size(file_size):
1149             # for small files, shrink the segment size to avoid wasting space
1150             segsize = min(max_segsize, file_size)
1151             # this must be a multiple of 'required_shares'==k
1152             segsize = mathutil.next_multiple(segsize, k)
1153             encoding_parameters = (k, happy, n, segsize)
1154             self._all_encoding_parameters = encoding_parameters
1155             return encoding_parameters
1156         d.addCallback(_got_size)
1157         return d
1158
1159 class FileHandle(BaseUploadable):
1160     implements(IUploadable)
1161
1162     def __init__(self, filehandle, convergence):
1163         """
1164         Upload the data from the filehandle.  If convergence is None then a
1165         random encryption key will be used, else the plaintext will be hashed,
1166         then the hash will be hashed together with the string in the
1167         "convergence" argument to form the encryption key.
1168         """
1169         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1170         self._filehandle = filehandle
1171         self._key = None
1172         self.convergence = convergence
1173         self._size = None
1174
1175     def _get_encryption_key_convergent(self):
1176         if self._key is not None:
1177             return defer.succeed(self._key)
1178
1179         d = self.get_size()
1180         # that sets self._size as a side-effect
1181         d.addCallback(lambda size: self.get_all_encoding_parameters())
1182         def _got(params):
1183             k, happy, n, segsize = params
1184             f = self._filehandle
1185             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1186             f.seek(0)
1187             BLOCKSIZE = 64*1024
1188             bytes_read = 0
1189             while True:
1190                 data = f.read(BLOCKSIZE)
1191                 if not data:
1192                     break
1193                 enckey_hasher.update(data)
1194                 # TODO: setting progress in a non-yielding loop is kind of
1195                 # pointless, but I'm anticipating (perhaps prematurely) the
1196                 # day when we use a slowjob or twisted's CooperatorService to
1197                 # make this yield time to other jobs.
1198                 bytes_read += len(data)
1199                 if self._status:
1200                     self._status.set_progress(0, float(bytes_read)/self._size)
1201             f.seek(0)
1202             self._key = enckey_hasher.digest()
1203             if self._status:
1204                 self._status.set_progress(0, 1.0)
1205             assert len(self._key) == 16
1206             return self._key
1207         d.addCallback(_got)
1208         return d
1209
1210     def _get_encryption_key_random(self):
1211         if self._key is None:
1212             self._key = os.urandom(16)
1213         return defer.succeed(self._key)
1214
1215     def get_encryption_key(self):
1216         if self.convergence is not None:
1217             return self._get_encryption_key_convergent()
1218         else:
1219             return self._get_encryption_key_random()
1220
1221     def get_size(self):
1222         if self._size is not None:
1223             return defer.succeed(self._size)
1224         self._filehandle.seek(0,2)
1225         size = self._filehandle.tell()
1226         self._size = size
1227         self._filehandle.seek(0)
1228         return defer.succeed(size)
1229
1230     def read(self, length):
1231         return defer.succeed([self._filehandle.read(length)])
1232
1233     def close(self):
1234         # the originator of the filehandle reserves the right to close it
1235         pass
1236
1237 class FileName(FileHandle):
1238     def __init__(self, filename, convergence):
1239         """
1240         Upload the data from the filename.  If convergence is None then a
1241         random encryption key will be used, else the plaintext will be hashed,
1242         then the hash will be hashed together with the string in the
1243         "convergence" argument to form the encryption key.
1244         """
1245         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1246         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1247     def close(self):
1248         FileHandle.close(self)
1249         self._filehandle.close()
1250
1251 class Data(FileHandle):
1252     def __init__(self, data, convergence):
1253         """
1254         Upload the data from the data argument.  If convergence is None then a
1255         random encryption key will be used, else the plaintext will be hashed,
1256         then the hash will be hashed together with the string in the
1257         "convergence" argument to form the encryption key.
1258         """
1259         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1260         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1261
1262 class Uploader(service.MultiService, log.PrefixingLogMixin):
1263     """I am a service that allows file uploading. I am a service-child of the
1264     Client.
1265     """
1266     implements(IUploader)
1267     name = "uploader"
1268     URI_LIT_SIZE_THRESHOLD = 55
1269
1270     def __init__(self, helper_furl=None, stats_provider=None):
1271         self._helper_furl = helper_furl
1272         self.stats_provider = stats_provider
1273         self._helper = None
1274         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1275         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1276         service.MultiService.__init__(self)
1277
1278     def startService(self):
1279         service.MultiService.startService(self)
1280         if self._helper_furl:
1281             self.parent.tub.connectTo(self._helper_furl,
1282                                       self._got_helper)
1283
1284     def _got_helper(self, helper):
1285         self.log("got helper connection, getting versions")
1286         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1287                     { },
1288                     "application-version": "unknown: no get_version()",
1289                     }
1290         d = add_version_to_remote_reference(helper, default)
1291         d.addCallback(self._got_versioned_helper)
1292
1293     def _got_versioned_helper(self, helper):
1294         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1295         if needed not in helper.version:
1296             raise InsufficientVersionError(needed, helper.version)
1297         self._helper = helper
1298         helper.notifyOnDisconnect(self._lost_helper)
1299
1300     def _lost_helper(self):
1301         self._helper = None
1302
1303     def get_helper_info(self):
1304         # return a tuple of (helper_furl_or_None, connected_bool)
1305         return (self._helper_furl, bool(self._helper))
1306
1307
1308     def upload(self, uploadable, history=None):
1309         """
1310         Returns a Deferred that will fire with the UploadResults instance.
1311         """
1312         assert self.parent
1313         assert self.running
1314
1315         uploadable = IUploadable(uploadable)
1316         d = uploadable.get_size()
1317         def _got_size(size):
1318             default_params = self.parent.get_encoding_parameters()
1319             precondition(isinstance(default_params, dict), default_params)
1320             precondition("max_segment_size" in default_params, default_params)
1321             uploadable.set_default_encoding_parameters(default_params)
1322
1323             if self.stats_provider:
1324                 self.stats_provider.count('uploader.files_uploaded', 1)
1325                 self.stats_provider.count('uploader.bytes_uploaded', size)
1326
1327             if size <= self.URI_LIT_SIZE_THRESHOLD:
1328                 uploader = LiteralUploader()
1329                 return uploader.start(uploadable)
1330             else:
1331                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1332                 d2 = defer.succeed(None)
1333                 if self._helper:
1334                     uploader = AssistedUploader(self._helper)
1335                     d2.addCallback(lambda x: eu.get_storage_index())
1336                     d2.addCallback(lambda si: uploader.start(eu, si))
1337                 else:
1338                     storage_broker = self.parent.get_storage_broker()
1339                     secret_holder = self.parent._secret_holder
1340                     uploader = CHKUploader(storage_broker, secret_holder)
1341                     d2.addCallback(lambda x: uploader.start(eu))
1342
1343                 self._all_uploads[uploader] = None
1344                 if history:
1345                     history.add_upload(uploader.get_upload_status())
1346                 def turn_verifycap_into_read_cap(uploadresults):
1347                     # Generate the uri from the verifycap plus the key.
1348                     d3 = uploadable.get_encryption_key()
1349                     def put_readcap_into_results(key):
1350                         v = uri.from_string(uploadresults.verifycapstr)
1351                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1352                         uploadresults.uri = r.to_string()
1353                         return uploadresults
1354                     d3.addCallback(put_readcap_into_results)
1355                     return d3
1356                 d2.addCallback(turn_verifycap_into_read_cap)
1357                 return d2
1358         d.addCallback(_got_size)
1359         def _done(res):
1360             uploadable.close()
1361             return res
1362         d.addBoth(_done)
1363         return d