]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
immutable: tidy-up some code by using a set instead of list to hold homeless_shares
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17                                          shares_by_server, merge_peers, \
18                                          failure_message
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23      NoServersError, InsufficientVersionError, UploadUnhappinessError
24 from allmydata.immutable import layout
25 from pycryptopp.cipher.aes import AES
26
27 from cStringIO import StringIO
28
29
30 KiB=1024
31 MiB=1024*KiB
32 GiB=1024*MiB
33 TiB=1024*GiB
34 PiB=1024*TiB
35
36 class HaveAllPeersError(Exception):
37     # we use this to jump out of the loop
38     pass
39
40 # this wants to live in storage, not here
41 class TooFullError(Exception):
42     pass
43
44 class UploadResults(Copyable, RemoteCopy):
45     implements(IUploadResults)
46     # note: don't change this string, it needs to match the value used on the
47     # helper, and it does *not* need to match the fully-qualified
48     # package/module/class name
49     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
50     copytype = typeToCopy
51
52     # also, think twice about changing the shape of any existing attribute,
53     # because instances of this class are sent from the helper to its client,
54     # so changing this may break compatibility. Consider adding new fields
55     # instead of modifying existing ones.
56
57     def __init__(self):
58         self.timings = {} # dict of name to number of seconds
59         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
60         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
61         self.file_size = None
62         self.ciphertext_fetched = None # how much the helper fetched
63         self.uri = None
64         self.preexisting_shares = None # count of shares already present
65         self.pushed_shares = None # count of shares we pushed
66
67
68 # our current uri_extension is 846 bytes for small files, a few bytes
69 # more for larger ones (since the filesize is encoded in decimal in a
70 # few places). Ask for a little bit more just in case we need it. If
71 # the extension changes size, we can change EXTENSION_SIZE to
72 # allocate a more accurate amount of space.
73 EXTENSION_SIZE = 1000
74 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
75 # this.
76
77 def pretty_print_shnum_to_servers(s):
78     return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
79
80 class PeerTracker:
81     def __init__(self, peerid, storage_server,
82                  sharesize, blocksize, num_segments, num_share_hashes,
83                  storage_index,
84                  bucket_renewal_secret, bucket_cancel_secret):
85         precondition(isinstance(peerid, str), peerid)
86         precondition(len(peerid) == 20, peerid)
87         self.peerid = peerid
88         self._storageserver = storage_server # to an RIStorageServer
89         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
90         self.sharesize = sharesize
91
92         wbp = layout.make_write_bucket_proxy(None, sharesize,
93                                              blocksize, num_segments,
94                                              num_share_hashes,
95                                              EXTENSION_SIZE, peerid)
96         self.wbp_class = wbp.__class__ # to create more of them
97         self.allocated_size = wbp.get_allocated_size()
98         self.blocksize = blocksize
99         self.num_segments = num_segments
100         self.num_share_hashes = num_share_hashes
101         self.storage_index = storage_index
102
103         self.renew_secret = bucket_renewal_secret
104         self.cancel_secret = bucket_cancel_secret
105
106     def __repr__(self):
107         return ("<PeerTracker for peer %s and SI %s>"
108                 % (idlib.shortnodeid_b2a(self.peerid),
109                    si_b2a(self.storage_index)[:5]))
110
111     def query(self, sharenums):
112         d = self._storageserver.callRemote("allocate_buckets",
113                                            self.storage_index,
114                                            self.renew_secret,
115                                            self.cancel_secret,
116                                            sharenums,
117                                            self.allocated_size,
118                                            canary=Referenceable())
119         d.addCallback(self._got_reply)
120         return d
121
122     def ask_about_existing_shares(self):
123         return self._storageserver.callRemote("get_buckets",
124                                               self.storage_index)
125
126     def _got_reply(self, (alreadygot, buckets)):
127         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
128         b = {}
129         for sharenum, rref in buckets.iteritems():
130             bp = self.wbp_class(rref, self.sharesize,
131                                 self.blocksize,
132                                 self.num_segments,
133                                 self.num_share_hashes,
134                                 EXTENSION_SIZE,
135                                 self.peerid)
136             b[sharenum] = bp
137         self.buckets.update(b)
138         return (alreadygot, set(b.keys()))
139
140
141     def abort(self):
142         """
143         I abort the remote bucket writers for all shares. This is a good idea
144         to conserve space on the storage server.
145         """
146         self.abort_some_buckets(self.buckets.keys())
147
148     def abort_some_buckets(self, sharenums):
149         """
150         I abort the remote bucket writers for the share numbers in sharenums.
151         """
152         for sharenum in sharenums:
153             if sharenum in self.buckets:
154                 self.buckets[sharenum].abort()
155                 del self.buckets[sharenum]
156
157
158 class Tahoe2PeerSelector(log.PrefixingLogMixin):
159
160     def __init__(self, upload_id, logparent=None, upload_status=None):
161         self.upload_id = upload_id
162         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
163         # Peers that are working normally, but full.
164         self.full_count = 0
165         self.error_count = 0
166         self.num_peers_contacted = 0
167         self.last_failure_msg = None
168         self._status = IUploadStatus(upload_status)
169         log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
170         self.log("starting", level=log.OPERATIONAL)
171
172     def __repr__(self):
173         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
174
175     def get_shareholders(self, storage_broker, secret_holder,
176                          storage_index, share_size, block_size,
177                          num_segments, total_shares, needed_shares,
178                          servers_of_happiness):
179         """
180         @return: (upload_servers, already_peers), where upload_servers is a set of
181                  PeerTracker instances that have agreed to hold some shares
182                  for us (the shareids are stashed inside the PeerTracker),
183                  and already_peers is a dict mapping shnum to a set of peers
184                  which claim to already have the share.
185         """
186
187         if self._status:
188             self._status.set_status("Contacting Peers..")
189
190         self.total_shares = total_shares
191         self.servers_of_happiness = servers_of_happiness
192         self.needed_shares = needed_shares
193
194         self.homeless_shares = set(range(total_shares))
195         self.contacted_peers = [] # peers worth asking again
196         self.contacted_peers2 = [] # peers that we have asked again
197         self._started_second_pass = False
198         self.use_peers = set() # PeerTrackers that have shares assigned to them
199         self.preexisting_shares = {} # shareid => set(peerids) holding shareid
200         # We don't try to allocate shares to these servers, since they've said
201         # that they're incapable of storing shares of the size that we'd want
202         # to store. We keep them around because they may have existing shares
203         # for this storage index, which we want to know about for accurate
204         # servers_of_happiness accounting
205         # (this is eventually a list, but it is initialized later)
206         self.readonly_peers = None
207         # These peers have shares -- any shares -- for our SI. We keep
208         # track of these to write an error message with them later.
209         self.peers_with_shares = set()
210
211         # this needed_hashes computation should mirror
212         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
213         # (instead of a HashTree) because we don't require actual hashing
214         # just to count the levels.
215         ht = hashtree.IncompleteHashTree(total_shares)
216         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
217
218         # figure out how much space to ask for
219         wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
220                                              num_share_hashes, EXTENSION_SIZE,
221                                              None)
222         allocated_size = wbp.get_allocated_size()
223         all_peers = storage_broker.get_servers_for_index(storage_index)
224         if not all_peers:
225             raise NoServersError("client gave us zero peers")
226
227         # filter the list of peers according to which ones can accomodate
228         # this request. This excludes older peers (which used a 4-byte size
229         # field) from getting large shares (for files larger than about
230         # 12GiB). See #439 for details.
231         def _get_maxsize(peer):
232             (peerid, conn) = peer
233             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
234             return v1["maximum-immutable-share-size"]
235         writable_peers = [peer for peer in all_peers
236                           if _get_maxsize(peer) >= allocated_size]
237         readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers)
238
239         # decide upon the renewal/cancel secrets, to include them in the
240         # allocate_buckets query.
241         client_renewal_secret = secret_holder.get_renewal_secret()
242         client_cancel_secret = secret_holder.get_cancel_secret()
243
244         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
245                                                        storage_index)
246         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
247                                                      storage_index)
248         def _make_trackers(peers):
249            return [PeerTracker(peerid, conn,
250                                share_size, block_size,
251                                num_segments, num_share_hashes,
252                                storage_index,
253                                bucket_renewal_secret_hash(file_renewal_secret,
254                                                           peerid),
255                                bucket_cancel_secret_hash(file_cancel_secret,
256                                                          peerid))
257                     for (peerid, conn) in peers]
258         self.uncontacted_peers = _make_trackers(writable_peers)
259         self.readonly_peers = _make_trackers(readonly_peers)
260         # We now ask peers that can't hold any new shares about existing
261         # shares that they might have for our SI. Once this is done, we
262         # start placing the shares that we haven't already accounted
263         # for.
264         ds = []
265         if self._status and self.readonly_peers:
266             self._status.set_status("Contacting readonly peers to find "
267                                     "any existing shares")
268         for peer in self.readonly_peers:
269             assert isinstance(peer, PeerTracker)
270             d = peer.ask_about_existing_shares()
271             d.addBoth(self._handle_existing_response, peer.peerid)
272             ds.append(d)
273             self.num_peers_contacted += 1
274             self.query_count += 1
275             self.log("asking peer %s for any existing shares" %
276                      (idlib.shortnodeid_b2a(peer.peerid),),
277                     level=log.NOISY)
278         dl = defer.DeferredList(ds)
279         dl.addCallback(lambda ign: self._loop())
280         return dl
281
282
283     def _handle_existing_response(self, res, peer):
284         """
285         I handle responses to the queries sent by
286         Tahoe2PeerSelector._existing_shares.
287         """
288         if isinstance(res, failure.Failure):
289             self.log("%s got error during existing shares check: %s"
290                     % (idlib.shortnodeid_b2a(peer), res),
291                     level=log.UNUSUAL)
292             self.error_count += 1
293             self.bad_query_count += 1
294         else:
295             buckets = res
296             if buckets:
297                 self.peers_with_shares.add(peer)
298             self.log("response to get_buckets() from peer %s: alreadygot=%s"
299                     % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
300                     level=log.NOISY)
301             for bucket in buckets:
302                 self.preexisting_shares.setdefault(bucket, set()).add(peer)
303                 self.homeless_shares.discard(bucket)
304             self.full_count += 1
305             self.bad_query_count += 1
306
307
308     def _get_progress_message(self):
309         if not self.homeless_shares:
310             msg = "placed all %d shares, " % (self.total_shares)
311         else:
312             msg = ("placed %d shares out of %d total (%d homeless), " %
313                    (self.total_shares - len(self.homeless_shares),
314                     self.total_shares,
315                     len(self.homeless_shares)))
316         return (msg + "want to place shares on at least %d servers such that "
317                       "any %d of them have enough shares to recover the file, "
318                       "sent %d queries to %d peers, "
319                       "%d queries placed some shares, %d placed none "
320                       "(of which %d placed none due to the server being"
321                       " full and %d placed none due to an error)" %
322                         (self.servers_of_happiness, self.needed_shares,
323                          self.query_count, self.num_peers_contacted,
324                          self.good_query_count, self.bad_query_count,
325                          self.full_count, self.error_count))
326
327
328     def _loop(self):
329         if not self.homeless_shares:
330             merged = merge_peers(self.preexisting_shares, self.use_peers)
331             effective_happiness = servers_of_happiness(merged)
332             if self.servers_of_happiness <= effective_happiness:
333                 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, self.use_peers: %s, self.preexisting_shares: %s" % (self, self._get_progress_message(), pretty_print_shnum_to_servers(merged), [', '.join(["%s: %s" % (k, idlib.shortnodeid_b2a(v._nodeid),) for k,v in p.buckets.iteritems()]) for p in self.use_peers], pretty_print_shnum_to_servers(self.preexisting_shares)))
334                 self.log(msg, level=log.OPERATIONAL)
335                 return (self.use_peers, self.preexisting_shares)
336             else:
337                 # We're not okay right now, but maybe we can fix it by
338                 # redistributing some shares. In cases where one or two
339                 # servers has, before the upload, all or most of the
340                 # shares for a given SI, this can work by allowing _loop
341                 # a chance to spread those out over the other peers, 
342                 delta = self.servers_of_happiness - effective_happiness
343                 shares = shares_by_server(self.preexisting_shares)
344                 # Each server in shares maps to a set of shares stored on it.
345                 # Since we want to keep at least one share on each server 
346                 # that has one (otherwise we'd only be making
347                 # the situation worse by removing distinct servers),
348                 # each server has len(its shares) - 1 to spread around.
349                 shares_to_spread = sum([len(list(sharelist)) - 1
350                                         for (server, sharelist)
351                                         in shares.items()])
352                 if delta <= len(self.uncontacted_peers) and \
353                    shares_to_spread >= delta:
354                     items = shares.items()
355                     while len(self.homeless_shares) < delta:
356                         # Loop through the allocated shares, removing
357                         # one from each server that has more than one
358                         # and putting it back into self.homeless_shares
359                         # until we've done this delta times.
360                         server, sharelist = items.pop()
361                         if len(sharelist) > 1:
362                             share = sharelist.pop()
363                             self.homeless_shares.add(share)
364                             self.preexisting_shares[share].remove(server)
365                             if not self.preexisting_shares[share]:
366                                 del self.preexisting_shares[share]
367                             items.append((server, sharelist))
368                         for writer in self.use_peers:
369                             writer.abort_some_buckets(self.homeless_shares)
370                     return self._loop()
371                 else:
372                     # Redistribution won't help us; fail.
373                     peer_count = len(self.peers_with_shares)
374                     msg = failure_message(peer_count,
375                                           self.needed_shares,
376                                           self.servers_of_happiness,
377                                           effective_happiness)
378                     self.log("server selection unsuccessful for %r: %s (%s), merged=%s" % (self, msg, self._get_progress_message(), pretty_print_shnum_to_servers(merged)), level=log.INFREQUENT)
379                     return self._failed("%s (%s)" % (msg, self._get_progress_message()))
380
381         if self.uncontacted_peers:
382             peer = self.uncontacted_peers.pop(0)
383             # TODO: don't pre-convert all peerids to PeerTrackers
384             assert isinstance(peer, PeerTracker)
385
386             shares_to_ask = set(sorted(self.homeless_shares)[:1])
387             self.homeless_shares -= shares_to_ask
388             self.query_count += 1
389             self.num_peers_contacted += 1
390             if self._status:
391                 self._status.set_status("Contacting Peers [%s] (first query),"
392                                         " %d shares left.."
393                                         % (idlib.shortnodeid_b2a(peer.peerid),
394                                            len(self.homeless_shares)))
395             d = peer.query(shares_to_ask)
396             d.addBoth(self._got_response, peer, shares_to_ask,
397                       self.contacted_peers)
398             return d
399         elif self.contacted_peers:
400             # ask a peer that we've already asked.
401             if not self._started_second_pass:
402                 self.log("starting second pass",
403                         level=log.NOISY)
404                 self._started_second_pass = True
405             num_shares = mathutil.div_ceil(len(self.homeless_shares),
406                                            len(self.contacted_peers))
407             peer = self.contacted_peers.pop(0)
408             shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
409             self.homeless_shares -= shares_to_ask
410             self.query_count += 1
411             if self._status:
412                 self._status.set_status("Contacting Peers [%s] (second query),"
413                                         " %d shares left.."
414                                         % (idlib.shortnodeid_b2a(peer.peerid),
415                                            len(self.homeless_shares)))
416             d = peer.query(shares_to_ask)
417             d.addBoth(self._got_response, peer, shares_to_ask,
418                       self.contacted_peers2)
419             return d
420         elif self.contacted_peers2:
421             # we've finished the second-or-later pass. Move all the remaining
422             # peers back into self.contacted_peers for the next pass.
423             self.contacted_peers.extend(self.contacted_peers2)
424             self.contacted_peers2[:] = []
425             return self._loop()
426         else:
427             # no more peers. If we haven't placed enough shares, we fail.
428             merged = merge_peers(self.preexisting_shares, self.use_peers)
429             effective_happiness = servers_of_happiness(merged)
430             if effective_happiness < self.servers_of_happiness:
431                 msg = failure_message(len(self.peers_with_shares),
432                                       self.needed_shares,
433                                       self.servers_of_happiness,
434                                       effective_happiness)
435                 msg = ("peer selection failed for %s: %s (%s)" % (self,
436                                 msg,
437                                 self._get_progress_message()))
438                 if self.last_failure_msg:
439                     msg += " (%s)" % (self.last_failure_msg,)
440                 self.log(msg, level=log.UNUSUAL)
441                 return self._failed(msg)
442             else:
443                 # we placed enough to be happy, so we're done
444                 if self._status:
445                     self._status.set_status("Placed all shares")
446                 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
447                             self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
448                 self.log(msg, level=log.OPERATIONAL)
449                 return (self.use_peers, self.preexisting_shares)
450
451     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
452         if isinstance(res, failure.Failure):
453             # This is unusual, and probably indicates a bug or a network
454             # problem.
455             self.log("%s got error during peer selection: %s" % (peer, res),
456                     level=log.UNUSUAL)
457             self.error_count += 1
458             self.bad_query_count += 1
459             self.homeless_shares |= shares_to_ask
460             if (self.uncontacted_peers
461                 or self.contacted_peers
462                 or self.contacted_peers2):
463                 # there is still hope, so just loop
464                 pass
465             else:
466                 # No more peers, so this upload might fail (it depends upon
467                 # whether we've hit servers_of_happiness or not). Log the last
468                 # failure we got: if a coding error causes all peers to fail
469                 # in the same way, this allows the common failure to be seen
470                 # by the uploader and should help with debugging
471                 msg = ("last failure (from %s) was: %s" % (peer, res))
472                 self.last_failure_msg = msg
473         else:
474             (alreadygot, allocated) = res
475             self.log("response to allocate_buckets() from peer %s: alreadygot=%s, allocated=%s"
476                     % (idlib.shortnodeid_b2a(peer.peerid),
477                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
478                     level=log.NOISY)
479             progress = False
480             for s in alreadygot:
481                 self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
482                 if s in self.homeless_shares:
483                     self.homeless_shares.remove(s)
484                     progress = True
485                 elif s in shares_to_ask:
486                     progress = True
487
488             # the PeerTracker will remember which shares were allocated on
489             # that peer. We just have to remember to use them.
490             if allocated:
491                 self.use_peers.add(peer)
492                 progress = True
493
494             if allocated or alreadygot:
495                 self.peers_with_shares.add(peer.peerid)
496
497             not_yet_present = set(shares_to_ask) - set(alreadygot)
498             still_homeless = not_yet_present - set(allocated)
499
500             if progress:
501                 # They accepted at least one of the shares that we asked
502                 # them to accept, or they had a share that we didn't ask
503                 # them to accept but that we hadn't placed yet, so this
504                 # was a productive query
505                 self.good_query_count += 1
506             else:
507                 self.bad_query_count += 1
508                 self.full_count += 1
509
510             if still_homeless:
511                 # In networks with lots of space, this is very unusual and
512                 # probably indicates an error. In networks with peers that
513                 # are full, it is merely unusual. In networks that are very
514                 # full, it is common, and many uploads will fail. In most
515                 # cases, this is obviously not fatal, and we'll just use some
516                 # other peers.
517
518                 # some shares are still homeless, keep trying to find them a
519                 # home. The ones that were rejected get first priority.
520                 self.homeless_shares |= still_homeless
521                 # Since they were unable to accept all of our requests, so it
522                 # is safe to assume that asking them again won't help.
523             else:
524                 # if they *were* able to accept everything, they might be
525                 # willing to accept even more.
526                 put_peer_here.append(peer)
527
528         # now loop
529         return self._loop()
530
531
532     def _failed(self, msg):
533         """
534         I am called when peer selection fails. I first abort all of the
535         remote buckets that I allocated during my unsuccessful attempt to
536         place shares for this file. I then raise an
537         UploadUnhappinessError with my msg argument.
538         """
539         for peer in self.use_peers:
540             assert isinstance(peer, PeerTracker)
541
542             peer.abort()
543
544         raise UploadUnhappinessError(msg)
545
546
547 class EncryptAnUploadable:
548     """This is a wrapper that takes an IUploadable and provides
549     IEncryptedUploadable."""
550     implements(IEncryptedUploadable)
551     CHUNKSIZE = 50*1024
552
553     def __init__(self, original, log_parent=None):
554         self.original = IUploadable(original)
555         self._log_number = log_parent
556         self._encryptor = None
557         self._plaintext_hasher = plaintext_hasher()
558         self._plaintext_segment_hasher = None
559         self._plaintext_segment_hashes = []
560         self._encoding_parameters = None
561         self._file_size = None
562         self._ciphertext_bytes_read = 0
563         self._status = None
564
565     def set_upload_status(self, upload_status):
566         self._status = IUploadStatus(upload_status)
567         self.original.set_upload_status(upload_status)
568
569     def log(self, *args, **kwargs):
570         if "facility" not in kwargs:
571             kwargs["facility"] = "upload.encryption"
572         if "parent" not in kwargs:
573             kwargs["parent"] = self._log_number
574         return log.msg(*args, **kwargs)
575
576     def get_size(self):
577         if self._file_size is not None:
578             return defer.succeed(self._file_size)
579         d = self.original.get_size()
580         def _got_size(size):
581             self._file_size = size
582             if self._status:
583                 self._status.set_size(size)
584             return size
585         d.addCallback(_got_size)
586         return d
587
588     def get_all_encoding_parameters(self):
589         if self._encoding_parameters is not None:
590             return defer.succeed(self._encoding_parameters)
591         d = self.original.get_all_encoding_parameters()
592         def _got(encoding_parameters):
593             (k, happy, n, segsize) = encoding_parameters
594             self._segment_size = segsize # used by segment hashers
595             self._encoding_parameters = encoding_parameters
596             self.log("my encoding parameters: %s" % (encoding_parameters,),
597                      level=log.NOISY)
598             return encoding_parameters
599         d.addCallback(_got)
600         return d
601
602     def _get_encryptor(self):
603         if self._encryptor:
604             return defer.succeed(self._encryptor)
605
606         d = self.original.get_encryption_key()
607         def _got(key):
608             e = AES(key)
609             self._encryptor = e
610
611             storage_index = storage_index_hash(key)
612             assert isinstance(storage_index, str)
613             # There's no point to having the SI be longer than the key, so we
614             # specify that it is truncated to the same 128 bits as the AES key.
615             assert len(storage_index) == 16  # SHA-256 truncated to 128b
616             self._storage_index = storage_index
617             if self._status:
618                 self._status.set_storage_index(storage_index)
619             return e
620         d.addCallback(_got)
621         return d
622
623     def get_storage_index(self):
624         d = self._get_encryptor()
625         d.addCallback(lambda res: self._storage_index)
626         return d
627
628     def _get_segment_hasher(self):
629         p = self._plaintext_segment_hasher
630         if p:
631             left = self._segment_size - self._plaintext_segment_hashed_bytes
632             return p, left
633         p = plaintext_segment_hasher()
634         self._plaintext_segment_hasher = p
635         self._plaintext_segment_hashed_bytes = 0
636         return p, self._segment_size
637
638     def _update_segment_hash(self, chunk):
639         offset = 0
640         while offset < len(chunk):
641             p, segment_left = self._get_segment_hasher()
642             chunk_left = len(chunk) - offset
643             this_segment = min(chunk_left, segment_left)
644             p.update(chunk[offset:offset+this_segment])
645             self._plaintext_segment_hashed_bytes += this_segment
646
647             if self._plaintext_segment_hashed_bytes == self._segment_size:
648                 # we've filled this segment
649                 self._plaintext_segment_hashes.append(p.digest())
650                 self._plaintext_segment_hasher = None
651                 self.log("closed hash [%d]: %dB" %
652                          (len(self._plaintext_segment_hashes)-1,
653                           self._plaintext_segment_hashed_bytes),
654                          level=log.NOISY)
655                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
656                          segnum=len(self._plaintext_segment_hashes)-1,
657                          hash=base32.b2a(p.digest()),
658                          level=log.NOISY)
659
660             offset += this_segment
661
662
663     def read_encrypted(self, length, hash_only):
664         # make sure our parameters have been set up first
665         d = self.get_all_encoding_parameters()
666         # and size
667         d.addCallback(lambda ignored: self.get_size())
668         d.addCallback(lambda ignored: self._get_encryptor())
669         # then fetch and encrypt the plaintext. The unusual structure here
670         # (passing a Deferred *into* a function) is needed to avoid
671         # overflowing the stack: Deferreds don't optimize out tail recursion.
672         # We also pass in a list, to which _read_encrypted will append
673         # ciphertext.
674         ciphertext = []
675         d2 = defer.Deferred()
676         d.addCallback(lambda ignored:
677                       self._read_encrypted(length, ciphertext, hash_only, d2))
678         d.addCallback(lambda ignored: d2)
679         return d
680
681     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
682         if not remaining:
683             fire_when_done.callback(ciphertext)
684             return None
685         # tolerate large length= values without consuming a lot of RAM by
686         # reading just a chunk (say 50kB) at a time. This only really matters
687         # when hash_only==True (i.e. resuming an interrupted upload), since
688         # that's the case where we will be skipping over a lot of data.
689         size = min(remaining, self.CHUNKSIZE)
690         remaining = remaining - size
691         # read a chunk of plaintext..
692         d = defer.maybeDeferred(self.original.read, size)
693         # N.B.: if read() is synchronous, then since everything else is
694         # actually synchronous too, we'd blow the stack unless we stall for a
695         # tick. Once you accept a Deferred from IUploadable.read(), you must
696         # be prepared to have it fire immediately too.
697         d.addCallback(fireEventually)
698         def _good(plaintext):
699             # and encrypt it..
700             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
701             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
702             ciphertext.extend(ct)
703             self._read_encrypted(remaining, ciphertext, hash_only,
704                                  fire_when_done)
705         def _err(why):
706             fire_when_done.errback(why)
707         d.addCallback(_good)
708         d.addErrback(_err)
709         return None
710
711     def _hash_and_encrypt_plaintext(self, data, hash_only):
712         assert isinstance(data, (tuple, list)), type(data)
713         data = list(data)
714         cryptdata = []
715         # we use data.pop(0) instead of 'for chunk in data' to save
716         # memory: each chunk is destroyed as soon as we're done with it.
717         bytes_processed = 0
718         while data:
719             chunk = data.pop(0)
720             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
721                      level=log.NOISY)
722             bytes_processed += len(chunk)
723             self._plaintext_hasher.update(chunk)
724             self._update_segment_hash(chunk)
725             # TODO: we have to encrypt the data (even if hash_only==True)
726             # because pycryptopp's AES-CTR implementation doesn't offer a
727             # way to change the counter value. Once pycryptopp acquires
728             # this ability, change this to simply update the counter
729             # before each call to (hash_only==False) _encryptor.process()
730             ciphertext = self._encryptor.process(chunk)
731             if hash_only:
732                 self.log("  skipping encryption", level=log.NOISY)
733             else:
734                 cryptdata.append(ciphertext)
735             del ciphertext
736             del chunk
737         self._ciphertext_bytes_read += bytes_processed
738         if self._status:
739             progress = float(self._ciphertext_bytes_read) / self._file_size
740             self._status.set_progress(1, progress)
741         return cryptdata
742
743
744     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
745         # this is currently unused, but will live again when we fix #453
746         if len(self._plaintext_segment_hashes) < num_segments:
747             # close out the last one
748             assert len(self._plaintext_segment_hashes) == num_segments-1
749             p, segment_left = self._get_segment_hasher()
750             self._plaintext_segment_hashes.append(p.digest())
751             del self._plaintext_segment_hasher
752             self.log("closing plaintext leaf hasher, hashed %d bytes" %
753                      self._plaintext_segment_hashed_bytes,
754                      level=log.NOISY)
755             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
756                      segnum=len(self._plaintext_segment_hashes)-1,
757                      hash=base32.b2a(p.digest()),
758                      level=log.NOISY)
759         assert len(self._plaintext_segment_hashes) == num_segments
760         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
761
762     def get_plaintext_hash(self):
763         h = self._plaintext_hasher.digest()
764         return defer.succeed(h)
765
766     def close(self):
767         return self.original.close()
768
769 class UploadStatus:
770     implements(IUploadStatus)
771     statusid_counter = itertools.count(0)
772
773     def __init__(self):
774         self.storage_index = None
775         self.size = None
776         self.helper = False
777         self.status = "Not started"
778         self.progress = [0.0, 0.0, 0.0]
779         self.active = True
780         self.results = None
781         self.counter = self.statusid_counter.next()
782         self.started = time.time()
783
784     def get_started(self):
785         return self.started
786     def get_storage_index(self):
787         return self.storage_index
788     def get_size(self):
789         return self.size
790     def using_helper(self):
791         return self.helper
792     def get_status(self):
793         return self.status
794     def get_progress(self):
795         return tuple(self.progress)
796     def get_active(self):
797         return self.active
798     def get_results(self):
799         return self.results
800     def get_counter(self):
801         return self.counter
802
803     def set_storage_index(self, si):
804         self.storage_index = si
805     def set_size(self, size):
806         self.size = size
807     def set_helper(self, helper):
808         self.helper = helper
809     def set_status(self, status):
810         self.status = status
811     def set_progress(self, which, value):
812         # [0]: chk, [1]: ciphertext, [2]: encode+push
813         self.progress[which] = value
814     def set_active(self, value):
815         self.active = value
816     def set_results(self, value):
817         self.results = value
818
819 class CHKUploader:
820     peer_selector_class = Tahoe2PeerSelector
821
822     def __init__(self, storage_broker, secret_holder):
823         # peer_selector needs storage_broker and secret_holder
824         self._storage_broker = storage_broker
825         self._secret_holder = secret_holder
826         self._log_number = self.log("CHKUploader starting", parent=None)
827         self._encoder = None
828         self._results = UploadResults()
829         self._storage_index = None
830         self._upload_status = UploadStatus()
831         self._upload_status.set_helper(False)
832         self._upload_status.set_active(True)
833         self._upload_status.set_results(self._results)
834
835         # locate_all_shareholders() will create the following attribute:
836         # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
837
838     def log(self, *args, **kwargs):
839         if "parent" not in kwargs:
840             kwargs["parent"] = self._log_number
841         if "facility" not in kwargs:
842             kwargs["facility"] = "tahoe.upload"
843         return log.msg(*args, **kwargs)
844
845     def start(self, encrypted_uploadable):
846         """Start uploading the file.
847
848         Returns a Deferred that will fire with the UploadResults instance.
849         """
850
851         self._started = time.time()
852         eu = IEncryptedUploadable(encrypted_uploadable)
853         self.log("starting upload of %s" % eu)
854
855         eu.set_upload_status(self._upload_status)
856         d = self.start_encrypted(eu)
857         def _done(uploadresults):
858             self._upload_status.set_active(False)
859             return uploadresults
860         d.addBoth(_done)
861         return d
862
863     def abort(self):
864         """Call this if the upload must be abandoned before it completes.
865         This will tell the shareholders to delete their partial shares. I
866         return a Deferred that fires when these messages have been acked."""
867         if not self._encoder:
868             # how did you call abort() before calling start() ?
869             return defer.succeed(None)
870         return self._encoder.abort()
871
872     def start_encrypted(self, encrypted):
873         """ Returns a Deferred that will fire with the UploadResults instance. """
874         eu = IEncryptedUploadable(encrypted)
875
876         started = time.time()
877         self._encoder = e = encode.Encoder(self._log_number,
878                                            self._upload_status)
879         d = e.set_encrypted_uploadable(eu)
880         d.addCallback(self.locate_all_shareholders, started)
881         d.addCallback(self.set_shareholders, e)
882         d.addCallback(lambda res: e.start())
883         d.addCallback(self._encrypted_done)
884         return d
885
886     def locate_all_shareholders(self, encoder, started):
887         peer_selection_started = now = time.time()
888         self._storage_index_elapsed = now - started
889         storage_broker = self._storage_broker
890         secret_holder = self._secret_holder
891         storage_index = encoder.get_param("storage_index")
892         self._storage_index = storage_index
893         upload_id = si_b2a(storage_index)[:5]
894         self.log("using storage index %s" % upload_id)
895         peer_selector = self.peer_selector_class(upload_id, self._log_number,
896                                                  self._upload_status)
897
898         share_size = encoder.get_param("share_size")
899         block_size = encoder.get_param("block_size")
900         num_segments = encoder.get_param("num_segments")
901         k,desired,n = encoder.get_param("share_counts")
902
903         self._peer_selection_started = time.time()
904         d = peer_selector.get_shareholders(storage_broker, secret_holder,
905                                            storage_index,
906                                            share_size, block_size,
907                                            num_segments, n, k, desired)
908         def _done(res):
909             self._peer_selection_elapsed = time.time() - peer_selection_started
910             return res
911         d.addCallback(_done)
912         return d
913
914     def set_shareholders(self, (upload_servers, already_peers), encoder):
915         """
916         @param upload_servers: a sequence of PeerTracker objects that have agreed to hold some shares for us (the shareids are stashed inside the PeerTracker)
917         @paran already_peers: a dict mapping sharenum to a set of peerids
918                               that claim to already have this share
919         """
920         self.log("set_shareholders; upload_servers is %s, already_peers is %s" % ([', '.join(["%s: %s" % (k, idlib.shortnodeid_b2a(v._nodeid),) for k,v in p.buckets.iteritems()]) for p in upload_servers], already_peers))
921         # record already-present shares in self._results
922         self._results.preexisting_shares = len(already_peers)
923
924         self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
925         for peer in upload_servers:
926             assert isinstance(peer, PeerTracker)
927         buckets = {}
928         servermap = already_peers.copy()
929         for peer in upload_servers:
930             buckets.update(peer.buckets)
931             for shnum in peer.buckets:
932                 self._peer_trackers[shnum] = peer
933                 servermap.setdefault(shnum, set()).add(peer.peerid)
934         self.log("set_shareholders; %s (%s) == %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers]))
935         assert len(buckets) == sum([len(peer.buckets) for peer in upload_servers]), "%s (%s) != %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers])
936         encoder.set_shareholders(buckets, servermap)
937
938     def _encrypted_done(self, verifycap):
939         """ Returns a Deferred that will fire with the UploadResults instance. """
940         r = self._results
941         for shnum in self._encoder.get_shares_placed():
942             peer_tracker = self._peer_trackers[shnum]
943             peerid = peer_tracker.peerid
944             r.sharemap.add(shnum, peerid)
945             r.servermap.add(peerid, shnum)
946         r.pushed_shares = len(self._encoder.get_shares_placed())
947         now = time.time()
948         r.file_size = self._encoder.file_size
949         r.timings["total"] = now - self._started
950         r.timings["storage_index"] = self._storage_index_elapsed
951         r.timings["peer_selection"] = self._peer_selection_elapsed
952         r.timings.update(self._encoder.get_times())
953         r.uri_extension_data = self._encoder.get_uri_extension_data()
954         r.verifycapstr = verifycap.to_string()
955         return r
956
957     def get_upload_status(self):
958         return self._upload_status
959
960 def read_this_many_bytes(uploadable, size, prepend_data=[]):
961     if size == 0:
962         return defer.succeed([])
963     d = uploadable.read(size)
964     def _got(data):
965         assert isinstance(data, list)
966         bytes = sum([len(piece) for piece in data])
967         assert bytes > 0
968         assert bytes <= size
969         remaining = size - bytes
970         if remaining:
971             return read_this_many_bytes(uploadable, remaining,
972                                         prepend_data + data)
973         return prepend_data + data
974     d.addCallback(_got)
975     return d
976
977 class LiteralUploader:
978
979     def __init__(self):
980         self._results = UploadResults()
981         self._status = s = UploadStatus()
982         s.set_storage_index(None)
983         s.set_helper(False)
984         s.set_progress(0, 1.0)
985         s.set_active(False)
986         s.set_results(self._results)
987
988     def start(self, uploadable):
989         uploadable = IUploadable(uploadable)
990         d = uploadable.get_size()
991         def _got_size(size):
992             self._size = size
993             self._status.set_size(size)
994             self._results.file_size = size
995             return read_this_many_bytes(uploadable, size)
996         d.addCallback(_got_size)
997         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
998         d.addCallback(lambda u: u.to_string())
999         d.addCallback(self._build_results)
1000         return d
1001
1002     def _build_results(self, uri):
1003         self._results.uri = uri
1004         self._status.set_status("Finished")
1005         self._status.set_progress(1, 1.0)
1006         self._status.set_progress(2, 1.0)
1007         return self._results
1008
1009     def close(self):
1010         pass
1011
1012     def get_upload_status(self):
1013         return self._status
1014
1015 class RemoteEncryptedUploadable(Referenceable):
1016     implements(RIEncryptedUploadable)
1017
1018     def __init__(self, encrypted_uploadable, upload_status):
1019         self._eu = IEncryptedUploadable(encrypted_uploadable)
1020         self._offset = 0
1021         self._bytes_sent = 0
1022         self._status = IUploadStatus(upload_status)
1023         # we are responsible for updating the status string while we run, and
1024         # for setting the ciphertext-fetch progress.
1025         self._size = None
1026
1027     def get_size(self):
1028         if self._size is not None:
1029             return defer.succeed(self._size)
1030         d = self._eu.get_size()
1031         def _got_size(size):
1032             self._size = size
1033             return size
1034         d.addCallback(_got_size)
1035         return d
1036
1037     def remote_get_size(self):
1038         return self.get_size()
1039     def remote_get_all_encoding_parameters(self):
1040         return self._eu.get_all_encoding_parameters()
1041
1042     def _read_encrypted(self, length, hash_only):
1043         d = self._eu.read_encrypted(length, hash_only)
1044         def _read(strings):
1045             if hash_only:
1046                 self._offset += length
1047             else:
1048                 size = sum([len(data) for data in strings])
1049                 self._offset += size
1050             return strings
1051         d.addCallback(_read)
1052         return d
1053
1054     def remote_read_encrypted(self, offset, length):
1055         # we don't support seek backwards, but we allow skipping forwards
1056         precondition(offset >= 0, offset)
1057         precondition(length >= 0, length)
1058         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1059                      level=log.NOISY)
1060         precondition(offset >= self._offset, offset, self._offset)
1061         if offset > self._offset:
1062             # read the data from disk anyways, to build up the hash tree
1063             skip = offset - self._offset
1064             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1065                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1066             d = self._read_encrypted(skip, hash_only=True)
1067         else:
1068             d = defer.succeed(None)
1069
1070         def _at_correct_offset(res):
1071             assert offset == self._offset, "%d != %d" % (offset, self._offset)
1072             return self._read_encrypted(length, hash_only=False)
1073         d.addCallback(_at_correct_offset)
1074
1075         def _read(strings):
1076             size = sum([len(data) for data in strings])
1077             self._bytes_sent += size
1078             return strings
1079         d.addCallback(_read)
1080         return d
1081
1082     def remote_close(self):
1083         return self._eu.close()
1084
1085
1086 class AssistedUploader:
1087
1088     def __init__(self, helper):
1089         self._helper = helper
1090         self._log_number = log.msg("AssistedUploader starting")
1091         self._storage_index = None
1092         self._upload_status = s = UploadStatus()
1093         s.set_helper(True)
1094         s.set_active(True)
1095
1096     def log(self, *args, **kwargs):
1097         if "parent" not in kwargs:
1098             kwargs["parent"] = self._log_number
1099         return log.msg(*args, **kwargs)
1100
1101     def start(self, encrypted_uploadable, storage_index):
1102         """Start uploading the file.
1103
1104         Returns a Deferred that will fire with the UploadResults instance.
1105         """
1106         precondition(isinstance(storage_index, str), storage_index)
1107         self._started = time.time()
1108         eu = IEncryptedUploadable(encrypted_uploadable)
1109         eu.set_upload_status(self._upload_status)
1110         self._encuploadable = eu
1111         self._storage_index = storage_index
1112         d = eu.get_size()
1113         d.addCallback(self._got_size)
1114         d.addCallback(lambda res: eu.get_all_encoding_parameters())
1115         d.addCallback(self._got_all_encoding_parameters)
1116         d.addCallback(self._contact_helper)
1117         d.addCallback(self._build_verifycap)
1118         def _done(res):
1119             self._upload_status.set_active(False)
1120             return res
1121         d.addBoth(_done)
1122         return d
1123
1124     def _got_size(self, size):
1125         self._size = size
1126         self._upload_status.set_size(size)
1127
1128     def _got_all_encoding_parameters(self, params):
1129         k, happy, n, segment_size = params
1130         # stash these for URI generation later
1131         self._needed_shares = k
1132         self._total_shares = n
1133         self._segment_size = segment_size
1134
1135     def _contact_helper(self, res):
1136         now = self._time_contacting_helper_start = time.time()
1137         self._storage_index_elapsed = now - self._started
1138         self.log(format="contacting helper for SI %(si)s..",
1139                  si=si_b2a(self._storage_index))
1140         self._upload_status.set_status("Contacting Helper")
1141         d = self._helper.callRemote("upload_chk", self._storage_index)
1142         d.addCallback(self._contacted_helper)
1143         return d
1144
1145     def _contacted_helper(self, (upload_results, upload_helper)):
1146         now = time.time()
1147         elapsed = now - self._time_contacting_helper_start
1148         self._elapsed_time_contacting_helper = elapsed
1149         if upload_helper:
1150             self.log("helper says we need to upload")
1151             self._upload_status.set_status("Uploading Ciphertext")
1152             # we need to upload the file
1153             reu = RemoteEncryptedUploadable(self._encuploadable,
1154                                             self._upload_status)
1155             # let it pre-compute the size for progress purposes
1156             d = reu.get_size()
1157             d.addCallback(lambda ignored:
1158                           upload_helper.callRemote("upload", reu))
1159             # this Deferred will fire with the upload results
1160             return d
1161         self.log("helper says file is already uploaded")
1162         self._upload_status.set_progress(1, 1.0)
1163         self._upload_status.set_results(upload_results)
1164         return upload_results
1165
1166     def _convert_old_upload_results(self, upload_results):
1167         # pre-1.3.0 helpers return upload results which contain a mapping
1168         # from shnum to a single human-readable string, containing things
1169         # like "Found on [x],[y],[z]" (for healthy files that were already in
1170         # the grid), "Found on [x]" (for files that needed upload but which
1171         # discovered pre-existing shares), and "Placed on [x]" (for newly
1172         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1173         # set of binary serverid strings.
1174
1175         # the old results are too hard to deal with (they don't even contain
1176         # as much information as the new results, since the nodeids are
1177         # abbreviated), so if we detect old results, just clobber them.
1178
1179         sharemap = upload_results.sharemap
1180         if str in [type(v) for v in sharemap.values()]:
1181             upload_results.sharemap = None
1182
1183     def _build_verifycap(self, upload_results):
1184         self.log("upload finished, building readcap")
1185         self._convert_old_upload_results(upload_results)
1186         self._upload_status.set_status("Building Readcap")
1187         r = upload_results
1188         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1189         assert r.uri_extension_data["total_shares"] == self._total_shares
1190         assert r.uri_extension_data["segment_size"] == self._segment_size
1191         assert r.uri_extension_data["size"] == self._size
1192         r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1193                                              uri_extension_hash=r.uri_extension_hash,
1194                                              needed_shares=self._needed_shares,
1195                                              total_shares=self._total_shares, size=self._size
1196                                              ).to_string()
1197         now = time.time()
1198         r.file_size = self._size
1199         r.timings["storage_index"] = self._storage_index_elapsed
1200         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1201         if "total" in r.timings:
1202             r.timings["helper_total"] = r.timings["total"]
1203         r.timings["total"] = now - self._started
1204         self._upload_status.set_status("Finished")
1205         self._upload_status.set_results(r)
1206         return r
1207
1208     def get_upload_status(self):
1209         return self._upload_status
1210
1211 class BaseUploadable:
1212     default_max_segment_size = 128*KiB # overridden by max_segment_size
1213     default_encoding_param_k = 3 # overridden by encoding_parameters
1214     default_encoding_param_happy = 7
1215     default_encoding_param_n = 10
1216
1217     max_segment_size = None
1218     encoding_param_k = None
1219     encoding_param_happy = None
1220     encoding_param_n = None
1221
1222     _all_encoding_parameters = None
1223     _status = None
1224
1225     def set_upload_status(self, upload_status):
1226         self._status = IUploadStatus(upload_status)
1227
1228     def set_default_encoding_parameters(self, default_params):
1229         assert isinstance(default_params, dict)
1230         for k,v in default_params.items():
1231             precondition(isinstance(k, str), k, v)
1232             precondition(isinstance(v, int), k, v)
1233         if "k" in default_params:
1234             self.default_encoding_param_k = default_params["k"]
1235         if "happy" in default_params:
1236             self.default_encoding_param_happy = default_params["happy"]
1237         if "n" in default_params:
1238             self.default_encoding_param_n = default_params["n"]
1239         if "max_segment_size" in default_params:
1240             self.default_max_segment_size = default_params["max_segment_size"]
1241
1242     def get_all_encoding_parameters(self):
1243         if self._all_encoding_parameters:
1244             return defer.succeed(self._all_encoding_parameters)
1245
1246         max_segsize = self.max_segment_size or self.default_max_segment_size
1247         k = self.encoding_param_k or self.default_encoding_param_k
1248         happy = self.encoding_param_happy or self.default_encoding_param_happy
1249         n = self.encoding_param_n or self.default_encoding_param_n
1250
1251         d = self.get_size()
1252         def _got_size(file_size):
1253             # for small files, shrink the segment size to avoid wasting space
1254             segsize = min(max_segsize, file_size)
1255             # this must be a multiple of 'required_shares'==k
1256             segsize = mathutil.next_multiple(segsize, k)
1257             encoding_parameters = (k, happy, n, segsize)
1258             self._all_encoding_parameters = encoding_parameters
1259             return encoding_parameters
1260         d.addCallback(_got_size)
1261         return d
1262
1263 class FileHandle(BaseUploadable):
1264     implements(IUploadable)
1265
1266     def __init__(self, filehandle, convergence):
1267         """
1268         Upload the data from the filehandle.  If convergence is None then a
1269         random encryption key will be used, else the plaintext will be hashed,
1270         then the hash will be hashed together with the string in the
1271         "convergence" argument to form the encryption key.
1272         """
1273         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1274         self._filehandle = filehandle
1275         self._key = None
1276         self.convergence = convergence
1277         self._size = None
1278
1279     def _get_encryption_key_convergent(self):
1280         if self._key is not None:
1281             return defer.succeed(self._key)
1282
1283         d = self.get_size()
1284         # that sets self._size as a side-effect
1285         d.addCallback(lambda size: self.get_all_encoding_parameters())
1286         def _got(params):
1287             k, happy, n, segsize = params
1288             f = self._filehandle
1289             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1290             f.seek(0)
1291             BLOCKSIZE = 64*1024
1292             bytes_read = 0
1293             while True:
1294                 data = f.read(BLOCKSIZE)
1295                 if not data:
1296                     break
1297                 enckey_hasher.update(data)
1298                 # TODO: setting progress in a non-yielding loop is kind of
1299                 # pointless, but I'm anticipating (perhaps prematurely) the
1300                 # day when we use a slowjob or twisted's CooperatorService to
1301                 # make this yield time to other jobs.
1302                 bytes_read += len(data)
1303                 if self._status:
1304                     self._status.set_progress(0, float(bytes_read)/self._size)
1305             f.seek(0)
1306             self._key = enckey_hasher.digest()
1307             if self._status:
1308                 self._status.set_progress(0, 1.0)
1309             assert len(self._key) == 16
1310             return self._key
1311         d.addCallback(_got)
1312         return d
1313
1314     def _get_encryption_key_random(self):
1315         if self._key is None:
1316             self._key = os.urandom(16)
1317         return defer.succeed(self._key)
1318
1319     def get_encryption_key(self):
1320         if self.convergence is not None:
1321             return self._get_encryption_key_convergent()
1322         else:
1323             return self._get_encryption_key_random()
1324
1325     def get_size(self):
1326         if self._size is not None:
1327             return defer.succeed(self._size)
1328         self._filehandle.seek(0,2)
1329         size = self._filehandle.tell()
1330         self._size = size
1331         self._filehandle.seek(0)
1332         return defer.succeed(size)
1333
1334     def read(self, length):
1335         return defer.succeed([self._filehandle.read(length)])
1336
1337     def close(self):
1338         # the originator of the filehandle reserves the right to close it
1339         pass
1340
1341 class FileName(FileHandle):
1342     def __init__(self, filename, convergence):
1343         """
1344         Upload the data from the filename.  If convergence is None then a
1345         random encryption key will be used, else the plaintext will be hashed,
1346         then the hash will be hashed together with the string in the
1347         "convergence" argument to form the encryption key.
1348         """
1349         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1350         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1351     def close(self):
1352         FileHandle.close(self)
1353         self._filehandle.close()
1354
1355 class Data(FileHandle):
1356     def __init__(self, data, convergence):
1357         """
1358         Upload the data from the data argument.  If convergence is None then a
1359         random encryption key will be used, else the plaintext will be hashed,
1360         then the hash will be hashed together with the string in the
1361         "convergence" argument to form the encryption key.
1362         """
1363         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1364         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1365
1366 class Uploader(service.MultiService, log.PrefixingLogMixin):
1367     """I am a service that allows file uploading. I am a service-child of the
1368     Client.
1369     """
1370     implements(IUploader)
1371     name = "uploader"
1372     URI_LIT_SIZE_THRESHOLD = 55
1373
1374     def __init__(self, helper_furl=None, stats_provider=None):
1375         self._helper_furl = helper_furl
1376         self.stats_provider = stats_provider
1377         self._helper = None
1378         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1379         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1380         service.MultiService.__init__(self)
1381
1382     def startService(self):
1383         service.MultiService.startService(self)
1384         if self._helper_furl:
1385             self.parent.tub.connectTo(self._helper_furl,
1386                                       self._got_helper)
1387
1388     def _got_helper(self, helper):
1389         self.log("got helper connection, getting versions")
1390         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1391                     { },
1392                     "application-version": "unknown: no get_version()",
1393                     }
1394         d = add_version_to_remote_reference(helper, default)
1395         d.addCallback(self._got_versioned_helper)
1396
1397     def _got_versioned_helper(self, helper):
1398         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1399         if needed not in helper.version:
1400             raise InsufficientVersionError(needed, helper.version)
1401         self._helper = helper
1402         helper.notifyOnDisconnect(self._lost_helper)
1403
1404     def _lost_helper(self):
1405         self._helper = None
1406
1407     def get_helper_info(self):
1408         # return a tuple of (helper_furl_or_None, connected_bool)
1409         return (self._helper_furl, bool(self._helper))
1410
1411
1412     def upload(self, uploadable, history=None):
1413         """
1414         Returns a Deferred that will fire with the UploadResults instance.
1415         """
1416         assert self.parent
1417         assert self.running
1418
1419         uploadable = IUploadable(uploadable)
1420         d = uploadable.get_size()
1421         def _got_size(size):
1422             default_params = self.parent.get_encoding_parameters()
1423             precondition(isinstance(default_params, dict), default_params)
1424             precondition("max_segment_size" in default_params, default_params)
1425             uploadable.set_default_encoding_parameters(default_params)
1426
1427             if self.stats_provider:
1428                 self.stats_provider.count('uploader.files_uploaded', 1)
1429                 self.stats_provider.count('uploader.bytes_uploaded', size)
1430
1431             if size <= self.URI_LIT_SIZE_THRESHOLD:
1432                 uploader = LiteralUploader()
1433                 return uploader.start(uploadable)
1434             else:
1435                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1436                 d2 = defer.succeed(None)
1437                 if self._helper:
1438                     uploader = AssistedUploader(self._helper)
1439                     d2.addCallback(lambda x: eu.get_storage_index())
1440                     d2.addCallback(lambda si: uploader.start(eu, si))
1441                 else:
1442                     storage_broker = self.parent.get_storage_broker()
1443                     secret_holder = self.parent._secret_holder
1444                     uploader = CHKUploader(storage_broker, secret_holder)
1445                     d2.addCallback(lambda x: uploader.start(eu))
1446
1447                 self._all_uploads[uploader] = None
1448                 if history:
1449                     history.add_upload(uploader.get_upload_status())
1450                 def turn_verifycap_into_read_cap(uploadresults):
1451                     # Generate the uri from the verifycap plus the key.
1452                     d3 = uploadable.get_encryption_key()
1453                     def put_readcap_into_results(key):
1454                         v = uri.from_string(uploadresults.verifycapstr)
1455                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1456                         uploadresults.uri = r.to_string()
1457                         return uploadresults
1458                     d3.addCallback(put_readcap_into_results)
1459                     return d3
1460                 d2.addCallback(turn_verifycap_into_read_cap)
1461                 return d2
1462         d.addCallback(_got_size)
1463         def _done(res):
1464             uploadable.close()
1465             return res
1466         d.addBoth(_done)
1467         return d