]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
a53f12a26aee22c1e1fe1a40ca6e8571e3fdbc22
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17                                          shares_by_server, merge_peers, \
18                                          failure_message
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23      NoServersError, InsufficientVersionError, UploadUnhappinessError, \
24      DEFAULT_MAX_SEGMENT_SIZE
25 from allmydata.immutable import layout
26 from pycryptopp.cipher.aes import AES
27
28 from cStringIO import StringIO
29
30
31 KiB=1024
32 MiB=1024*KiB
33 GiB=1024*MiB
34 TiB=1024*GiB
35 PiB=1024*TiB
36
37 class HaveAllPeersError(Exception):
38     # we use this to jump out of the loop
39     pass
40
41 # this wants to live in storage, not here
42 class TooFullError(Exception):
43     pass
44
45 class UploadResults(Copyable, RemoteCopy):
46     implements(IUploadResults)
47     # note: don't change this string, it needs to match the value used on the
48     # helper, and it does *not* need to match the fully-qualified
49     # package/module/class name
50     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
51     copytype = typeToCopy
52
53     # also, think twice about changing the shape of any existing attribute,
54     # because instances of this class are sent from the helper to its client,
55     # so changing this may break compatibility. Consider adding new fields
56     # instead of modifying existing ones.
57
58     def __init__(self):
59         self.timings = {} # dict of name to number of seconds
60         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
61         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
62         self.file_size = None
63         self.ciphertext_fetched = None # how much the helper fetched
64         self.uri = None
65         self.preexisting_shares = None # count of shares already present
66         self.pushed_shares = None # count of shares we pushed
67
68
69 # our current uri_extension is 846 bytes for small files, a few bytes
70 # more for larger ones (since the filesize is encoded in decimal in a
71 # few places). Ask for a little bit more just in case we need it. If
72 # the extension changes size, we can change EXTENSION_SIZE to
73 # allocate a more accurate amount of space.
74 EXTENSION_SIZE = 1000
75 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
76 # this.
77
78 def pretty_print_shnum_to_servers(s):
79     return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
80
81 class PeerTracker:
82     def __init__(self, peerid, storage_server,
83                  sharesize, blocksize, num_segments, num_share_hashes,
84                  storage_index,
85                  bucket_renewal_secret, bucket_cancel_secret):
86         precondition(isinstance(peerid, str), peerid)
87         precondition(len(peerid) == 20, peerid)
88         self.peerid = peerid
89         self._storageserver = storage_server # to an RIStorageServer
90         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
91         self.sharesize = sharesize
92
93         wbp = layout.make_write_bucket_proxy(None, sharesize,
94                                              blocksize, num_segments,
95                                              num_share_hashes,
96                                              EXTENSION_SIZE, peerid)
97         self.wbp_class = wbp.__class__ # to create more of them
98         self.allocated_size = wbp.get_allocated_size()
99         self.blocksize = blocksize
100         self.num_segments = num_segments
101         self.num_share_hashes = num_share_hashes
102         self.storage_index = storage_index
103
104         self.renew_secret = bucket_renewal_secret
105         self.cancel_secret = bucket_cancel_secret
106
107     def __repr__(self):
108         return ("<PeerTracker for peer %s and SI %s>"
109                 % (idlib.shortnodeid_b2a(self.peerid),
110                    si_b2a(self.storage_index)[:5]))
111
112     def query(self, sharenums):
113         d = self._storageserver.callRemote("allocate_buckets",
114                                            self.storage_index,
115                                            self.renew_secret,
116                                            self.cancel_secret,
117                                            sharenums,
118                                            self.allocated_size,
119                                            canary=Referenceable())
120         d.addCallback(self._got_reply)
121         return d
122
123     def ask_about_existing_shares(self):
124         return self._storageserver.callRemote("get_buckets",
125                                               self.storage_index)
126
127     def _got_reply(self, (alreadygot, buckets)):
128         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
129         b = {}
130         for sharenum, rref in buckets.iteritems():
131             bp = self.wbp_class(rref, self.sharesize,
132                                 self.blocksize,
133                                 self.num_segments,
134                                 self.num_share_hashes,
135                                 EXTENSION_SIZE,
136                                 self.peerid)
137             b[sharenum] = bp
138         self.buckets.update(b)
139         return (alreadygot, set(b.keys()))
140
141
142     def abort(self):
143         """
144         I abort the remote bucket writers for all shares. This is a good idea
145         to conserve space on the storage server.
146         """
147         self.abort_some_buckets(self.buckets.keys())
148
149     def abort_some_buckets(self, sharenums):
150         """
151         I abort the remote bucket writers for the share numbers in sharenums.
152         """
153         for sharenum in sharenums:
154             if sharenum in self.buckets:
155                 self.buckets[sharenum].abort()
156                 del self.buckets[sharenum]
157
158
159 def str_shareloc(shnum, bucketwriter):
160     return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),)
161
162 class Tahoe2PeerSelector(log.PrefixingLogMixin):
163
164     def __init__(self, upload_id, logparent=None, upload_status=None):
165         self.upload_id = upload_id
166         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
167         # Peers that are working normally, but full.
168         self.full_count = 0
169         self.error_count = 0
170         self.num_peers_contacted = 0
171         self.last_failure_msg = None
172         self._status = IUploadStatus(upload_status)
173         log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
174         self.log("starting", level=log.OPERATIONAL)
175
176     def __repr__(self):
177         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
178
179     def get_shareholders(self, storage_broker, secret_holder,
180                          storage_index, share_size, block_size,
181                          num_segments, total_shares, needed_shares,
182                          servers_of_happiness):
183         """
184         @return: (upload_servers, already_peers), where upload_servers is a set of
185                  PeerTracker instances that have agreed to hold some shares
186                  for us (the shareids are stashed inside the PeerTracker),
187                  and already_peers is a dict mapping shnum to a set of peers
188                  which claim to already have the share.
189         """
190
191         if self._status:
192             self._status.set_status("Contacting Peers..")
193
194         self.total_shares = total_shares
195         self.servers_of_happiness = servers_of_happiness
196         self.needed_shares = needed_shares
197
198         self.homeless_shares = set(range(total_shares))
199         self.contacted_peers = [] # peers worth asking again
200         self.contacted_peers2 = [] # peers that we have asked again
201         self._started_second_pass = False
202         self.use_peers = set() # PeerTrackers that have shares assigned to them
203         self.preexisting_shares = {} # shareid => set(peerids) holding shareid
204         # We don't try to allocate shares to these servers, since they've said
205         # that they're incapable of storing shares of the size that we'd want
206         # to store. We keep them around because they may have existing shares
207         # for this storage index, which we want to know about for accurate
208         # servers_of_happiness accounting
209         # (this is eventually a list, but it is initialized later)
210         self.readonly_peers = None
211         # These peers have shares -- any shares -- for our SI. We keep
212         # track of these to write an error message with them later.
213         self.peers_with_shares = set()
214
215         # this needed_hashes computation should mirror
216         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
217         # (instead of a HashTree) because we don't require actual hashing
218         # just to count the levels.
219         ht = hashtree.IncompleteHashTree(total_shares)
220         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
221
222         # figure out how much space to ask for
223         wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
224                                              num_share_hashes, EXTENSION_SIZE,
225                                              None)
226         allocated_size = wbp.get_allocated_size()
227         all_peers = [(s.get_serverid(), s.get_rref())
228                      for s in storage_broker.get_servers_for_psi(storage_index)]
229         if not all_peers:
230             raise NoServersError("client gave us zero peers")
231
232         # filter the list of peers according to which ones can accomodate
233         # this request. This excludes older peers (which used a 4-byte size
234         # field) from getting large shares (for files larger than about
235         # 12GiB). See #439 for details.
236         def _get_maxsize(peer):
237             (peerid, conn) = peer
238             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
239             return v1["maximum-immutable-share-size"]
240         writable_peers = [peer for peer in all_peers
241                           if _get_maxsize(peer) >= allocated_size]
242         readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers)
243
244         # decide upon the renewal/cancel secrets, to include them in the
245         # allocate_buckets query.
246         client_renewal_secret = secret_holder.get_renewal_secret()
247         client_cancel_secret = secret_holder.get_cancel_secret()
248
249         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
250                                                        storage_index)
251         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
252                                                      storage_index)
253         def _make_trackers(peers):
254            return [PeerTracker(peerid, conn,
255                                share_size, block_size,
256                                num_segments, num_share_hashes,
257                                storage_index,
258                                bucket_renewal_secret_hash(file_renewal_secret,
259                                                           peerid),
260                                bucket_cancel_secret_hash(file_cancel_secret,
261                                                          peerid))
262                     for (peerid, conn) in peers]
263         self.uncontacted_peers = _make_trackers(writable_peers)
264         self.readonly_peers = _make_trackers(readonly_peers)
265         # We now ask peers that can't hold any new shares about existing
266         # shares that they might have for our SI. Once this is done, we
267         # start placing the shares that we haven't already accounted
268         # for.
269         ds = []
270         if self._status and self.readonly_peers:
271             self._status.set_status("Contacting readonly peers to find "
272                                     "any existing shares")
273         for peer in self.readonly_peers:
274             assert isinstance(peer, PeerTracker)
275             d = peer.ask_about_existing_shares()
276             d.addBoth(self._handle_existing_response, peer.peerid)
277             ds.append(d)
278             self.num_peers_contacted += 1
279             self.query_count += 1
280             self.log("asking peer %s for any existing shares" %
281                      (idlib.shortnodeid_b2a(peer.peerid),),
282                     level=log.NOISY)
283         dl = defer.DeferredList(ds)
284         dl.addCallback(lambda ign: self._loop())
285         return dl
286
287
288     def _handle_existing_response(self, res, peer):
289         """
290         I handle responses to the queries sent by
291         Tahoe2PeerSelector._existing_shares.
292         """
293         if isinstance(res, failure.Failure):
294             self.log("%s got error during existing shares check: %s"
295                     % (idlib.shortnodeid_b2a(peer), res),
296                     level=log.UNUSUAL)
297             self.error_count += 1
298             self.bad_query_count += 1
299         else:
300             buckets = res
301             if buckets:
302                 self.peers_with_shares.add(peer)
303             self.log("response to get_buckets() from peer %s: alreadygot=%s"
304                     % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
305                     level=log.NOISY)
306             for bucket in buckets:
307                 self.preexisting_shares.setdefault(bucket, set()).add(peer)
308                 self.homeless_shares.discard(bucket)
309             self.full_count += 1
310             self.bad_query_count += 1
311
312
313     def _get_progress_message(self):
314         if not self.homeless_shares:
315             msg = "placed all %d shares, " % (self.total_shares)
316         else:
317             msg = ("placed %d shares out of %d total (%d homeless), " %
318                    (self.total_shares - len(self.homeless_shares),
319                     self.total_shares,
320                     len(self.homeless_shares)))
321         return (msg + "want to place shares on at least %d servers such that "
322                       "any %d of them have enough shares to recover the file, "
323                       "sent %d queries to %d peers, "
324                       "%d queries placed some shares, %d placed none "
325                       "(of which %d placed none due to the server being"
326                       " full and %d placed none due to an error)" %
327                         (self.servers_of_happiness, self.needed_shares,
328                          self.query_count, self.num_peers_contacted,
329                          self.good_query_count, self.bad_query_count,
330                          self.full_count, self.error_count))
331
332
333     def _loop(self):
334         if not self.homeless_shares:
335             merged = merge_peers(self.preexisting_shares, self.use_peers)
336             effective_happiness = servers_of_happiness(merged)
337             if self.servers_of_happiness <= effective_happiness:
338                 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
339                     "self.use_peers: %s, self.preexisting_shares: %s") \
340                         % (self, self._get_progress_message(),
341                         pretty_print_shnum_to_servers(merged),
342                         [', '.join([str_shareloc(k,v) for k,v in p.buckets.iteritems()])
343                             for p in self.use_peers],
344                         pretty_print_shnum_to_servers(self.preexisting_shares))
345                 self.log(msg, level=log.OPERATIONAL)
346                 return (self.use_peers, self.preexisting_shares)
347             else:
348                 # We're not okay right now, but maybe we can fix it by
349                 # redistributing some shares. In cases where one or two
350                 # servers has, before the upload, all or most of the
351                 # shares for a given SI, this can work by allowing _loop
352                 # a chance to spread those out over the other peers,
353                 delta = self.servers_of_happiness - effective_happiness
354                 shares = shares_by_server(self.preexisting_shares)
355                 # Each server in shares maps to a set of shares stored on it.
356                 # Since we want to keep at least one share on each server
357                 # that has one (otherwise we'd only be making
358                 # the situation worse by removing distinct servers),
359                 # each server has len(its shares) - 1 to spread around.
360                 shares_to_spread = sum([len(list(sharelist)) - 1
361                                         for (server, sharelist)
362                                         in shares.items()])
363                 if delta <= len(self.uncontacted_peers) and \
364                    shares_to_spread >= delta:
365                     items = shares.items()
366                     while len(self.homeless_shares) < delta:
367                         # Loop through the allocated shares, removing
368                         # one from each server that has more than one
369                         # and putting it back into self.homeless_shares
370                         # until we've done this delta times.
371                         server, sharelist = items.pop()
372                         if len(sharelist) > 1:
373                             share = sharelist.pop()
374                             self.homeless_shares.add(share)
375                             self.preexisting_shares[share].remove(server)
376                             if not self.preexisting_shares[share]:
377                                 del self.preexisting_shares[share]
378                             items.append((server, sharelist))
379                         for writer in self.use_peers:
380                             writer.abort_some_buckets(self.homeless_shares)
381                     return self._loop()
382                 else:
383                     # Redistribution won't help us; fail.
384                     peer_count = len(self.peers_with_shares)
385                     failmsg = failure_message(peer_count,
386                                           self.needed_shares,
387                                           self.servers_of_happiness,
388                                           effective_happiness)
389                     servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
390                     servmsg = servmsgtempl % (
391                         self,
392                         failmsg,
393                         self._get_progress_message(),
394                         pretty_print_shnum_to_servers(merged)
395                         )
396                     self.log(servmsg, level=log.INFREQUENT)
397                     return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
398
399         if self.uncontacted_peers:
400             peer = self.uncontacted_peers.pop(0)
401             # TODO: don't pre-convert all peerids to PeerTrackers
402             assert isinstance(peer, PeerTracker)
403
404             shares_to_ask = set(sorted(self.homeless_shares)[:1])
405             self.homeless_shares -= shares_to_ask
406             self.query_count += 1
407             self.num_peers_contacted += 1
408             if self._status:
409                 self._status.set_status("Contacting Peers [%s] (first query),"
410                                         " %d shares left.."
411                                         % (idlib.shortnodeid_b2a(peer.peerid),
412                                            len(self.homeless_shares)))
413             d = peer.query(shares_to_ask)
414             d.addBoth(self._got_response, peer, shares_to_ask,
415                       self.contacted_peers)
416             return d
417         elif self.contacted_peers:
418             # ask a peer that we've already asked.
419             if not self._started_second_pass:
420                 self.log("starting second pass",
421                         level=log.NOISY)
422                 self._started_second_pass = True
423             num_shares = mathutil.div_ceil(len(self.homeless_shares),
424                                            len(self.contacted_peers))
425             peer = self.contacted_peers.pop(0)
426             shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
427             self.homeless_shares -= shares_to_ask
428             self.query_count += 1
429             if self._status:
430                 self._status.set_status("Contacting Peers [%s] (second query),"
431                                         " %d shares left.."
432                                         % (idlib.shortnodeid_b2a(peer.peerid),
433                                            len(self.homeless_shares)))
434             d = peer.query(shares_to_ask)
435             d.addBoth(self._got_response, peer, shares_to_ask,
436                       self.contacted_peers2)
437             return d
438         elif self.contacted_peers2:
439             # we've finished the second-or-later pass. Move all the remaining
440             # peers back into self.contacted_peers for the next pass.
441             self.contacted_peers.extend(self.contacted_peers2)
442             self.contacted_peers2[:] = []
443             return self._loop()
444         else:
445             # no more peers. If we haven't placed enough shares, we fail.
446             merged = merge_peers(self.preexisting_shares, self.use_peers)
447             effective_happiness = servers_of_happiness(merged)
448             if effective_happiness < self.servers_of_happiness:
449                 msg = failure_message(len(self.peers_with_shares),
450                                       self.needed_shares,
451                                       self.servers_of_happiness,
452                                       effective_happiness)
453                 msg = ("peer selection failed for %s: %s (%s)" % (self,
454                                 msg,
455                                 self._get_progress_message()))
456                 if self.last_failure_msg:
457                     msg += " (%s)" % (self.last_failure_msg,)
458                 self.log(msg, level=log.UNUSUAL)
459                 return self._failed(msg)
460             else:
461                 # we placed enough to be happy, so we're done
462                 if self._status:
463                     self._status.set_status("Placed all shares")
464                 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
465                             self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
466                 self.log(msg, level=log.OPERATIONAL)
467                 return (self.use_peers, self.preexisting_shares)
468
469     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
470         if isinstance(res, failure.Failure):
471             # This is unusual, and probably indicates a bug or a network
472             # problem.
473             self.log("%s got error during peer selection: %s" % (peer, res),
474                     level=log.UNUSUAL)
475             self.error_count += 1
476             self.bad_query_count += 1
477             self.homeless_shares |= shares_to_ask
478             if (self.uncontacted_peers
479                 or self.contacted_peers
480                 or self.contacted_peers2):
481                 # there is still hope, so just loop
482                 pass
483             else:
484                 # No more peers, so this upload might fail (it depends upon
485                 # whether we've hit servers_of_happiness or not). Log the last
486                 # failure we got: if a coding error causes all peers to fail
487                 # in the same way, this allows the common failure to be seen
488                 # by the uploader and should help with debugging
489                 msg = ("last failure (from %s) was: %s" % (peer, res))
490                 self.last_failure_msg = msg
491         else:
492             (alreadygot, allocated) = res
493             self.log("response to allocate_buckets() from peer %s: alreadygot=%s, allocated=%s"
494                     % (idlib.shortnodeid_b2a(peer.peerid),
495                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
496                     level=log.NOISY)
497             progress = False
498             for s in alreadygot:
499                 self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
500                 if s in self.homeless_shares:
501                     self.homeless_shares.remove(s)
502                     progress = True
503                 elif s in shares_to_ask:
504                     progress = True
505
506             # the PeerTracker will remember which shares were allocated on
507             # that peer. We just have to remember to use them.
508             if allocated:
509                 self.use_peers.add(peer)
510                 progress = True
511
512             if allocated or alreadygot:
513                 self.peers_with_shares.add(peer.peerid)
514
515             not_yet_present = set(shares_to_ask) - set(alreadygot)
516             still_homeless = not_yet_present - set(allocated)
517
518             if progress:
519                 # They accepted at least one of the shares that we asked
520                 # them to accept, or they had a share that we didn't ask
521                 # them to accept but that we hadn't placed yet, so this
522                 # was a productive query
523                 self.good_query_count += 1
524             else:
525                 self.bad_query_count += 1
526                 self.full_count += 1
527
528             if still_homeless:
529                 # In networks with lots of space, this is very unusual and
530                 # probably indicates an error. In networks with peers that
531                 # are full, it is merely unusual. In networks that are very
532                 # full, it is common, and many uploads will fail. In most
533                 # cases, this is obviously not fatal, and we'll just use some
534                 # other peers.
535
536                 # some shares are still homeless, keep trying to find them a
537                 # home. The ones that were rejected get first priority.
538                 self.homeless_shares |= still_homeless
539                 # Since they were unable to accept all of our requests, so it
540                 # is safe to assume that asking them again won't help.
541             else:
542                 # if they *were* able to accept everything, they might be
543                 # willing to accept even more.
544                 put_peer_here.append(peer)
545
546         # now loop
547         return self._loop()
548
549
550     def _failed(self, msg):
551         """
552         I am called when peer selection fails. I first abort all of the
553         remote buckets that I allocated during my unsuccessful attempt to
554         place shares for this file. I then raise an
555         UploadUnhappinessError with my msg argument.
556         """
557         for peer in self.use_peers:
558             assert isinstance(peer, PeerTracker)
559
560             peer.abort()
561
562         raise UploadUnhappinessError(msg)
563
564
565 class EncryptAnUploadable:
566     """This is a wrapper that takes an IUploadable and provides
567     IEncryptedUploadable."""
568     implements(IEncryptedUploadable)
569     CHUNKSIZE = 50*1024
570
571     def __init__(self, original, log_parent=None):
572         self.original = IUploadable(original)
573         self._log_number = log_parent
574         self._encryptor = None
575         self._plaintext_hasher = plaintext_hasher()
576         self._plaintext_segment_hasher = None
577         self._plaintext_segment_hashes = []
578         self._encoding_parameters = None
579         self._file_size = None
580         self._ciphertext_bytes_read = 0
581         self._status = None
582
583     def set_upload_status(self, upload_status):
584         self._status = IUploadStatus(upload_status)
585         self.original.set_upload_status(upload_status)
586
587     def log(self, *args, **kwargs):
588         if "facility" not in kwargs:
589             kwargs["facility"] = "upload.encryption"
590         if "parent" not in kwargs:
591             kwargs["parent"] = self._log_number
592         return log.msg(*args, **kwargs)
593
594     def get_size(self):
595         if self._file_size is not None:
596             return defer.succeed(self._file_size)
597         d = self.original.get_size()
598         def _got_size(size):
599             self._file_size = size
600             if self._status:
601                 self._status.set_size(size)
602             return size
603         d.addCallback(_got_size)
604         return d
605
606     def get_all_encoding_parameters(self):
607         if self._encoding_parameters is not None:
608             return defer.succeed(self._encoding_parameters)
609         d = self.original.get_all_encoding_parameters()
610         def _got(encoding_parameters):
611             (k, happy, n, segsize) = encoding_parameters
612             self._segment_size = segsize # used by segment hashers
613             self._encoding_parameters = encoding_parameters
614             self.log("my encoding parameters: %s" % (encoding_parameters,),
615                      level=log.NOISY)
616             return encoding_parameters
617         d.addCallback(_got)
618         return d
619
620     def _get_encryptor(self):
621         if self._encryptor:
622             return defer.succeed(self._encryptor)
623
624         d = self.original.get_encryption_key()
625         def _got(key):
626             e = AES(key)
627             self._encryptor = e
628
629             storage_index = storage_index_hash(key)
630             assert isinstance(storage_index, str)
631             # There's no point to having the SI be longer than the key, so we
632             # specify that it is truncated to the same 128 bits as the AES key.
633             assert len(storage_index) == 16  # SHA-256 truncated to 128b
634             self._storage_index = storage_index
635             if self._status:
636                 self._status.set_storage_index(storage_index)
637             return e
638         d.addCallback(_got)
639         return d
640
641     def get_storage_index(self):
642         d = self._get_encryptor()
643         d.addCallback(lambda res: self._storage_index)
644         return d
645
646     def _get_segment_hasher(self):
647         p = self._plaintext_segment_hasher
648         if p:
649             left = self._segment_size - self._plaintext_segment_hashed_bytes
650             return p, left
651         p = plaintext_segment_hasher()
652         self._plaintext_segment_hasher = p
653         self._plaintext_segment_hashed_bytes = 0
654         return p, self._segment_size
655
656     def _update_segment_hash(self, chunk):
657         offset = 0
658         while offset < len(chunk):
659             p, segment_left = self._get_segment_hasher()
660             chunk_left = len(chunk) - offset
661             this_segment = min(chunk_left, segment_left)
662             p.update(chunk[offset:offset+this_segment])
663             self._plaintext_segment_hashed_bytes += this_segment
664
665             if self._plaintext_segment_hashed_bytes == self._segment_size:
666                 # we've filled this segment
667                 self._plaintext_segment_hashes.append(p.digest())
668                 self._plaintext_segment_hasher = None
669                 self.log("closed hash [%d]: %dB" %
670                          (len(self._plaintext_segment_hashes)-1,
671                           self._plaintext_segment_hashed_bytes),
672                          level=log.NOISY)
673                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
674                          segnum=len(self._plaintext_segment_hashes)-1,
675                          hash=base32.b2a(p.digest()),
676                          level=log.NOISY)
677
678             offset += this_segment
679
680
681     def read_encrypted(self, length, hash_only):
682         # make sure our parameters have been set up first
683         d = self.get_all_encoding_parameters()
684         # and size
685         d.addCallback(lambda ignored: self.get_size())
686         d.addCallback(lambda ignored: self._get_encryptor())
687         # then fetch and encrypt the plaintext. The unusual structure here
688         # (passing a Deferred *into* a function) is needed to avoid
689         # overflowing the stack: Deferreds don't optimize out tail recursion.
690         # We also pass in a list, to which _read_encrypted will append
691         # ciphertext.
692         ciphertext = []
693         d2 = defer.Deferred()
694         d.addCallback(lambda ignored:
695                       self._read_encrypted(length, ciphertext, hash_only, d2))
696         d.addCallback(lambda ignored: d2)
697         return d
698
699     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
700         if not remaining:
701             fire_when_done.callback(ciphertext)
702             return None
703         # tolerate large length= values without consuming a lot of RAM by
704         # reading just a chunk (say 50kB) at a time. This only really matters
705         # when hash_only==True (i.e. resuming an interrupted upload), since
706         # that's the case where we will be skipping over a lot of data.
707         size = min(remaining, self.CHUNKSIZE)
708         remaining = remaining - size
709         # read a chunk of plaintext..
710         d = defer.maybeDeferred(self.original.read, size)
711         # N.B.: if read() is synchronous, then since everything else is
712         # actually synchronous too, we'd blow the stack unless we stall for a
713         # tick. Once you accept a Deferred from IUploadable.read(), you must
714         # be prepared to have it fire immediately too.
715         d.addCallback(fireEventually)
716         def _good(plaintext):
717             # and encrypt it..
718             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
719             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
720             ciphertext.extend(ct)
721             self._read_encrypted(remaining, ciphertext, hash_only,
722                                  fire_when_done)
723         def _err(why):
724             fire_when_done.errback(why)
725         d.addCallback(_good)
726         d.addErrback(_err)
727         return None
728
729     def _hash_and_encrypt_plaintext(self, data, hash_only):
730         assert isinstance(data, (tuple, list)), type(data)
731         data = list(data)
732         cryptdata = []
733         # we use data.pop(0) instead of 'for chunk in data' to save
734         # memory: each chunk is destroyed as soon as we're done with it.
735         bytes_processed = 0
736         while data:
737             chunk = data.pop(0)
738             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
739                      level=log.NOISY)
740             bytes_processed += len(chunk)
741             self._plaintext_hasher.update(chunk)
742             self._update_segment_hash(chunk)
743             # TODO: we have to encrypt the data (even if hash_only==True)
744             # because pycryptopp's AES-CTR implementation doesn't offer a
745             # way to change the counter value. Once pycryptopp acquires
746             # this ability, change this to simply update the counter
747             # before each call to (hash_only==False) _encryptor.process()
748             ciphertext = self._encryptor.process(chunk)
749             if hash_only:
750                 self.log("  skipping encryption", level=log.NOISY)
751             else:
752                 cryptdata.append(ciphertext)
753             del ciphertext
754             del chunk
755         self._ciphertext_bytes_read += bytes_processed
756         if self._status:
757             progress = float(self._ciphertext_bytes_read) / self._file_size
758             self._status.set_progress(1, progress)
759         return cryptdata
760
761
762     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
763         # this is currently unused, but will live again when we fix #453
764         if len(self._plaintext_segment_hashes) < num_segments:
765             # close out the last one
766             assert len(self._plaintext_segment_hashes) == num_segments-1
767             p, segment_left = self._get_segment_hasher()
768             self._plaintext_segment_hashes.append(p.digest())
769             del self._plaintext_segment_hasher
770             self.log("closing plaintext leaf hasher, hashed %d bytes" %
771                      self._plaintext_segment_hashed_bytes,
772                      level=log.NOISY)
773             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
774                      segnum=len(self._plaintext_segment_hashes)-1,
775                      hash=base32.b2a(p.digest()),
776                      level=log.NOISY)
777         assert len(self._plaintext_segment_hashes) == num_segments
778         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
779
780     def get_plaintext_hash(self):
781         h = self._plaintext_hasher.digest()
782         return defer.succeed(h)
783
784     def close(self):
785         return self.original.close()
786
787 class UploadStatus:
788     implements(IUploadStatus)
789     statusid_counter = itertools.count(0)
790
791     def __init__(self):
792         self.storage_index = None
793         self.size = None
794         self.helper = False
795         self.status = "Not started"
796         self.progress = [0.0, 0.0, 0.0]
797         self.active = True
798         self.results = None
799         self.counter = self.statusid_counter.next()
800         self.started = time.time()
801
802     def get_started(self):
803         return self.started
804     def get_storage_index(self):
805         return self.storage_index
806     def get_size(self):
807         return self.size
808     def using_helper(self):
809         return self.helper
810     def get_status(self):
811         return self.status
812     def get_progress(self):
813         return tuple(self.progress)
814     def get_active(self):
815         return self.active
816     def get_results(self):
817         return self.results
818     def get_counter(self):
819         return self.counter
820
821     def set_storage_index(self, si):
822         self.storage_index = si
823     def set_size(self, size):
824         self.size = size
825     def set_helper(self, helper):
826         self.helper = helper
827     def set_status(self, status):
828         self.status = status
829     def set_progress(self, which, value):
830         # [0]: chk, [1]: ciphertext, [2]: encode+push
831         self.progress[which] = value
832     def set_active(self, value):
833         self.active = value
834     def set_results(self, value):
835         self.results = value
836
837 class CHKUploader:
838     peer_selector_class = Tahoe2PeerSelector
839
840     def __init__(self, storage_broker, secret_holder):
841         # peer_selector needs storage_broker and secret_holder
842         self._storage_broker = storage_broker
843         self._secret_holder = secret_holder
844         self._log_number = self.log("CHKUploader starting", parent=None)
845         self._encoder = None
846         self._results = UploadResults()
847         self._storage_index = None
848         self._upload_status = UploadStatus()
849         self._upload_status.set_helper(False)
850         self._upload_status.set_active(True)
851         self._upload_status.set_results(self._results)
852
853         # locate_all_shareholders() will create the following attribute:
854         # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
855
856     def log(self, *args, **kwargs):
857         if "parent" not in kwargs:
858             kwargs["parent"] = self._log_number
859         if "facility" not in kwargs:
860             kwargs["facility"] = "tahoe.upload"
861         return log.msg(*args, **kwargs)
862
863     def start(self, encrypted_uploadable):
864         """Start uploading the file.
865
866         Returns a Deferred that will fire with the UploadResults instance.
867         """
868
869         self._started = time.time()
870         eu = IEncryptedUploadable(encrypted_uploadable)
871         self.log("starting upload of %s" % eu)
872
873         eu.set_upload_status(self._upload_status)
874         d = self.start_encrypted(eu)
875         def _done(uploadresults):
876             self._upload_status.set_active(False)
877             return uploadresults
878         d.addBoth(_done)
879         return d
880
881     def abort(self):
882         """Call this if the upload must be abandoned before it completes.
883         This will tell the shareholders to delete their partial shares. I
884         return a Deferred that fires when these messages have been acked."""
885         if not self._encoder:
886             # how did you call abort() before calling start() ?
887             return defer.succeed(None)
888         return self._encoder.abort()
889
890     def start_encrypted(self, encrypted):
891         """ Returns a Deferred that will fire with the UploadResults instance. """
892         eu = IEncryptedUploadable(encrypted)
893
894         started = time.time()
895         self._encoder = e = encode.Encoder(self._log_number,
896                                            self._upload_status)
897         d = e.set_encrypted_uploadable(eu)
898         d.addCallback(self.locate_all_shareholders, started)
899         d.addCallback(self.set_shareholders, e)
900         d.addCallback(lambda res: e.start())
901         d.addCallback(self._encrypted_done)
902         return d
903
904     def locate_all_shareholders(self, encoder, started):
905         peer_selection_started = now = time.time()
906         self._storage_index_elapsed = now - started
907         storage_broker = self._storage_broker
908         secret_holder = self._secret_holder
909         storage_index = encoder.get_param("storage_index")
910         self._storage_index = storage_index
911         upload_id = si_b2a(storage_index)[:5]
912         self.log("using storage index %s" % upload_id)
913         peer_selector = self.peer_selector_class(upload_id, self._log_number,
914                                                  self._upload_status)
915
916         share_size = encoder.get_param("share_size")
917         block_size = encoder.get_param("block_size")
918         num_segments = encoder.get_param("num_segments")
919         k,desired,n = encoder.get_param("share_counts")
920
921         self._peer_selection_started = time.time()
922         d = peer_selector.get_shareholders(storage_broker, secret_holder,
923                                            storage_index,
924                                            share_size, block_size,
925                                            num_segments, n, k, desired)
926         def _done(res):
927             self._peer_selection_elapsed = time.time() - peer_selection_started
928             return res
929         d.addCallback(_done)
930         return d
931
932     def set_shareholders(self, (upload_servers, already_peers), encoder):
933         """
934         @param upload_servers: a sequence of PeerTracker objects that have agreed to hold some
935             shares for us (the shareids are stashed inside the PeerTracker)
936         @paran already_peers: a dict mapping sharenum to a set of peerids
937                               that claim to already have this share
938         """
939         msgtempl = "set_shareholders; upload_servers is %s, already_peers is %s"
940         values = ([', '.join([str_shareloc(k,v) for k,v in p.buckets.iteritems()])
941             for p in upload_servers], already_peers)
942         self.log(msgtempl % values, level=log.OPERATIONAL)
943         # record already-present shares in self._results
944         self._results.preexisting_shares = len(already_peers)
945
946         self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
947         for peer in upload_servers:
948             assert isinstance(peer, PeerTracker)
949         buckets = {}
950         servermap = already_peers.copy()
951         for peer in upload_servers:
952             buckets.update(peer.buckets)
953             for shnum in peer.buckets:
954                 self._peer_trackers[shnum] = peer
955                 servermap.setdefault(shnum, set()).add(peer.peerid)
956         assert len(buckets) == sum([len(peer.buckets) for peer in upload_servers]), \
957             "%s (%s) != %s (%s)" % (
958                 len(buckets),
959                 buckets,
960                 sum([len(peer.buckets) for peer in upload_servers]),
961                 [(p.buckets, p.peerid) for p in upload_servers]
962                 )
963         encoder.set_shareholders(buckets, servermap)
964
965     def _encrypted_done(self, verifycap):
966         """ Returns a Deferred that will fire with the UploadResults instance. """
967         r = self._results
968         for shnum in self._encoder.get_shares_placed():
969             peer_tracker = self._peer_trackers[shnum]
970             peerid = peer_tracker.peerid
971             r.sharemap.add(shnum, peerid)
972             r.servermap.add(peerid, shnum)
973         r.pushed_shares = len(self._encoder.get_shares_placed())
974         now = time.time()
975         r.file_size = self._encoder.file_size
976         r.timings["total"] = now - self._started
977         r.timings["storage_index"] = self._storage_index_elapsed
978         r.timings["peer_selection"] = self._peer_selection_elapsed
979         r.timings.update(self._encoder.get_times())
980         r.uri_extension_data = self._encoder.get_uri_extension_data()
981         r.verifycapstr = verifycap.to_string()
982         return r
983
984     def get_upload_status(self):
985         return self._upload_status
986
987 def read_this_many_bytes(uploadable, size, prepend_data=[]):
988     if size == 0:
989         return defer.succeed([])
990     d = uploadable.read(size)
991     def _got(data):
992         assert isinstance(data, list)
993         bytes = sum([len(piece) for piece in data])
994         assert bytes > 0
995         assert bytes <= size
996         remaining = size - bytes
997         if remaining:
998             return read_this_many_bytes(uploadable, remaining,
999                                         prepend_data + data)
1000         return prepend_data + data
1001     d.addCallback(_got)
1002     return d
1003
1004 class LiteralUploader:
1005
1006     def __init__(self):
1007         self._results = UploadResults()
1008         self._status = s = UploadStatus()
1009         s.set_storage_index(None)
1010         s.set_helper(False)
1011         s.set_progress(0, 1.0)
1012         s.set_active(False)
1013         s.set_results(self._results)
1014
1015     def start(self, uploadable):
1016         uploadable = IUploadable(uploadable)
1017         d = uploadable.get_size()
1018         def _got_size(size):
1019             self._size = size
1020             self._status.set_size(size)
1021             self._results.file_size = size
1022             return read_this_many_bytes(uploadable, size)
1023         d.addCallback(_got_size)
1024         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
1025         d.addCallback(lambda u: u.to_string())
1026         d.addCallback(self._build_results)
1027         return d
1028
1029     def _build_results(self, uri):
1030         self._results.uri = uri
1031         self._status.set_status("Finished")
1032         self._status.set_progress(1, 1.0)
1033         self._status.set_progress(2, 1.0)
1034         return self._results
1035
1036     def close(self):
1037         pass
1038
1039     def get_upload_status(self):
1040         return self._status
1041
1042 class RemoteEncryptedUploadable(Referenceable):
1043     implements(RIEncryptedUploadable)
1044
1045     def __init__(self, encrypted_uploadable, upload_status):
1046         self._eu = IEncryptedUploadable(encrypted_uploadable)
1047         self._offset = 0
1048         self._bytes_sent = 0
1049         self._status = IUploadStatus(upload_status)
1050         # we are responsible for updating the status string while we run, and
1051         # for setting the ciphertext-fetch progress.
1052         self._size = None
1053
1054     def get_size(self):
1055         if self._size is not None:
1056             return defer.succeed(self._size)
1057         d = self._eu.get_size()
1058         def _got_size(size):
1059             self._size = size
1060             return size
1061         d.addCallback(_got_size)
1062         return d
1063
1064     def remote_get_size(self):
1065         return self.get_size()
1066     def remote_get_all_encoding_parameters(self):
1067         return self._eu.get_all_encoding_parameters()
1068
1069     def _read_encrypted(self, length, hash_only):
1070         d = self._eu.read_encrypted(length, hash_only)
1071         def _read(strings):
1072             if hash_only:
1073                 self._offset += length
1074             else:
1075                 size = sum([len(data) for data in strings])
1076                 self._offset += size
1077             return strings
1078         d.addCallback(_read)
1079         return d
1080
1081     def remote_read_encrypted(self, offset, length):
1082         # we don't support seek backwards, but we allow skipping forwards
1083         precondition(offset >= 0, offset)
1084         precondition(length >= 0, length)
1085         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1086                      level=log.NOISY)
1087         precondition(offset >= self._offset, offset, self._offset)
1088         if offset > self._offset:
1089             # read the data from disk anyways, to build up the hash tree
1090             skip = offset - self._offset
1091             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1092                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1093             d = self._read_encrypted(skip, hash_only=True)
1094         else:
1095             d = defer.succeed(None)
1096
1097         def _at_correct_offset(res):
1098             assert offset == self._offset, "%d != %d" % (offset, self._offset)
1099             return self._read_encrypted(length, hash_only=False)
1100         d.addCallback(_at_correct_offset)
1101
1102         def _read(strings):
1103             size = sum([len(data) for data in strings])
1104             self._bytes_sent += size
1105             return strings
1106         d.addCallback(_read)
1107         return d
1108
1109     def remote_close(self):
1110         return self._eu.close()
1111
1112
1113 class AssistedUploader:
1114
1115     def __init__(self, helper):
1116         self._helper = helper
1117         self._log_number = log.msg("AssistedUploader starting")
1118         self._storage_index = None
1119         self._upload_status = s = UploadStatus()
1120         s.set_helper(True)
1121         s.set_active(True)
1122
1123     def log(self, *args, **kwargs):
1124         if "parent" not in kwargs:
1125             kwargs["parent"] = self._log_number
1126         return log.msg(*args, **kwargs)
1127
1128     def start(self, encrypted_uploadable, storage_index):
1129         """Start uploading the file.
1130
1131         Returns a Deferred that will fire with the UploadResults instance.
1132         """
1133         precondition(isinstance(storage_index, str), storage_index)
1134         self._started = time.time()
1135         eu = IEncryptedUploadable(encrypted_uploadable)
1136         eu.set_upload_status(self._upload_status)
1137         self._encuploadable = eu
1138         self._storage_index = storage_index
1139         d = eu.get_size()
1140         d.addCallback(self._got_size)
1141         d.addCallback(lambda res: eu.get_all_encoding_parameters())
1142         d.addCallback(self._got_all_encoding_parameters)
1143         d.addCallback(self._contact_helper)
1144         d.addCallback(self._build_verifycap)
1145         def _done(res):
1146             self._upload_status.set_active(False)
1147             return res
1148         d.addBoth(_done)
1149         return d
1150
1151     def _got_size(self, size):
1152         self._size = size
1153         self._upload_status.set_size(size)
1154
1155     def _got_all_encoding_parameters(self, params):
1156         k, happy, n, segment_size = params
1157         # stash these for URI generation later
1158         self._needed_shares = k
1159         self._total_shares = n
1160         self._segment_size = segment_size
1161
1162     def _contact_helper(self, res):
1163         now = self._time_contacting_helper_start = time.time()
1164         self._storage_index_elapsed = now - self._started
1165         self.log(format="contacting helper for SI %(si)s..",
1166                  si=si_b2a(self._storage_index), level=log.NOISY)
1167         self._upload_status.set_status("Contacting Helper")
1168         d = self._helper.callRemote("upload_chk", self._storage_index)
1169         d.addCallback(self._contacted_helper)
1170         return d
1171
1172     def _contacted_helper(self, (upload_results, upload_helper)):
1173         now = time.time()
1174         elapsed = now - self._time_contacting_helper_start
1175         self._elapsed_time_contacting_helper = elapsed
1176         if upload_helper:
1177             self.log("helper says we need to upload", level=log.NOISY)
1178             self._upload_status.set_status("Uploading Ciphertext")
1179             # we need to upload the file
1180             reu = RemoteEncryptedUploadable(self._encuploadable,
1181                                             self._upload_status)
1182             # let it pre-compute the size for progress purposes
1183             d = reu.get_size()
1184             d.addCallback(lambda ignored:
1185                           upload_helper.callRemote("upload", reu))
1186             # this Deferred will fire with the upload results
1187             return d
1188         self.log("helper says file is already uploaded", level=log.OPERATIONAL)
1189         self._upload_status.set_progress(1, 1.0)
1190         self._upload_status.set_results(upload_results)
1191         return upload_results
1192
1193     def _convert_old_upload_results(self, upload_results):
1194         # pre-1.3.0 helpers return upload results which contain a mapping
1195         # from shnum to a single human-readable string, containing things
1196         # like "Found on [x],[y],[z]" (for healthy files that were already in
1197         # the grid), "Found on [x]" (for files that needed upload but which
1198         # discovered pre-existing shares), and "Placed on [x]" (for newly
1199         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1200         # set of binary serverid strings.
1201
1202         # the old results are too hard to deal with (they don't even contain
1203         # as much information as the new results, since the nodeids are
1204         # abbreviated), so if we detect old results, just clobber them.
1205
1206         sharemap = upload_results.sharemap
1207         if str in [type(v) for v in sharemap.values()]:
1208             upload_results.sharemap = None
1209
1210     def _build_verifycap(self, upload_results):
1211         self.log("upload finished, building readcap", level=log.OPERATIONAL)
1212         self._convert_old_upload_results(upload_results)
1213         self._upload_status.set_status("Building Readcap")
1214         r = upload_results
1215         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1216         assert r.uri_extension_data["total_shares"] == self._total_shares
1217         assert r.uri_extension_data["segment_size"] == self._segment_size
1218         assert r.uri_extension_data["size"] == self._size
1219         r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1220                                              uri_extension_hash=r.uri_extension_hash,
1221                                              needed_shares=self._needed_shares,
1222                                              total_shares=self._total_shares, size=self._size
1223                                              ).to_string()
1224         now = time.time()
1225         r.file_size = self._size
1226         r.timings["storage_index"] = self._storage_index_elapsed
1227         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1228         if "total" in r.timings:
1229             r.timings["helper_total"] = r.timings["total"]
1230         r.timings["total"] = now - self._started
1231         self._upload_status.set_status("Finished")
1232         self._upload_status.set_results(r)
1233         return r
1234
1235     def get_upload_status(self):
1236         return self._upload_status
1237
1238 class BaseUploadable:
1239     # this is overridden by max_segment_size
1240     default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
1241     default_encoding_param_k = 3 # overridden by encoding_parameters
1242     default_encoding_param_happy = 7
1243     default_encoding_param_n = 10
1244
1245     max_segment_size = None
1246     encoding_param_k = None
1247     encoding_param_happy = None
1248     encoding_param_n = None
1249
1250     _all_encoding_parameters = None
1251     _status = None
1252
1253     def set_upload_status(self, upload_status):
1254         self._status = IUploadStatus(upload_status)
1255
1256     def set_default_encoding_parameters(self, default_params):
1257         assert isinstance(default_params, dict)
1258         for k,v in default_params.items():
1259             precondition(isinstance(k, str), k, v)
1260             precondition(isinstance(v, int), k, v)
1261         if "k" in default_params:
1262             self.default_encoding_param_k = default_params["k"]
1263         if "happy" in default_params:
1264             self.default_encoding_param_happy = default_params["happy"]
1265         if "n" in default_params:
1266             self.default_encoding_param_n = default_params["n"]
1267         if "max_segment_size" in default_params:
1268             self.default_max_segment_size = default_params["max_segment_size"]
1269
1270     def get_all_encoding_parameters(self):
1271         if self._all_encoding_parameters:
1272             return defer.succeed(self._all_encoding_parameters)
1273
1274         max_segsize = self.max_segment_size or self.default_max_segment_size
1275         k = self.encoding_param_k or self.default_encoding_param_k
1276         happy = self.encoding_param_happy or self.default_encoding_param_happy
1277         n = self.encoding_param_n or self.default_encoding_param_n
1278
1279         d = self.get_size()
1280         def _got_size(file_size):
1281             # for small files, shrink the segment size to avoid wasting space
1282             segsize = min(max_segsize, file_size)
1283             # this must be a multiple of 'required_shares'==k
1284             segsize = mathutil.next_multiple(segsize, k)
1285             encoding_parameters = (k, happy, n, segsize)
1286             self._all_encoding_parameters = encoding_parameters
1287             return encoding_parameters
1288         d.addCallback(_got_size)
1289         return d
1290
1291 class FileHandle(BaseUploadable):
1292     implements(IUploadable)
1293
1294     def __init__(self, filehandle, convergence):
1295         """
1296         Upload the data from the filehandle.  If convergence is None then a
1297         random encryption key will be used, else the plaintext will be hashed,
1298         then the hash will be hashed together with the string in the
1299         "convergence" argument to form the encryption key.
1300         """
1301         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1302         self._filehandle = filehandle
1303         self._key = None
1304         self.convergence = convergence
1305         self._size = None
1306
1307     def _get_encryption_key_convergent(self):
1308         if self._key is not None:
1309             return defer.succeed(self._key)
1310
1311         d = self.get_size()
1312         # that sets self._size as a side-effect
1313         d.addCallback(lambda size: self.get_all_encoding_parameters())
1314         def _got(params):
1315             k, happy, n, segsize = params
1316             f = self._filehandle
1317             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1318             f.seek(0)
1319             BLOCKSIZE = 64*1024
1320             bytes_read = 0
1321             while True:
1322                 data = f.read(BLOCKSIZE)
1323                 if not data:
1324                     break
1325                 enckey_hasher.update(data)
1326                 # TODO: setting progress in a non-yielding loop is kind of
1327                 # pointless, but I'm anticipating (perhaps prematurely) the
1328                 # day when we use a slowjob or twisted's CooperatorService to
1329                 # make this yield time to other jobs.
1330                 bytes_read += len(data)
1331                 if self._status:
1332                     self._status.set_progress(0, float(bytes_read)/self._size)
1333             f.seek(0)
1334             self._key = enckey_hasher.digest()
1335             if self._status:
1336                 self._status.set_progress(0, 1.0)
1337             assert len(self._key) == 16
1338             return self._key
1339         d.addCallback(_got)
1340         return d
1341
1342     def _get_encryption_key_random(self):
1343         if self._key is None:
1344             self._key = os.urandom(16)
1345         return defer.succeed(self._key)
1346
1347     def get_encryption_key(self):
1348         if self.convergence is not None:
1349             return self._get_encryption_key_convergent()
1350         else:
1351             return self._get_encryption_key_random()
1352
1353     def get_size(self):
1354         if self._size is not None:
1355             return defer.succeed(self._size)
1356         self._filehandle.seek(0,2)
1357         size = self._filehandle.tell()
1358         self._size = size
1359         self._filehandle.seek(0)
1360         return defer.succeed(size)
1361
1362     def read(self, length):
1363         return defer.succeed([self._filehandle.read(length)])
1364
1365     def close(self):
1366         # the originator of the filehandle reserves the right to close it
1367         pass
1368
1369 class FileName(FileHandle):
1370     def __init__(self, filename, convergence):
1371         """
1372         Upload the data from the filename.  If convergence is None then a
1373         random encryption key will be used, else the plaintext will be hashed,
1374         then the hash will be hashed together with the string in the
1375         "convergence" argument to form the encryption key.
1376         """
1377         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1378         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1379     def close(self):
1380         FileHandle.close(self)
1381         self._filehandle.close()
1382
1383 class Data(FileHandle):
1384     def __init__(self, data, convergence):
1385         """
1386         Upload the data from the data argument.  If convergence is None then a
1387         random encryption key will be used, else the plaintext will be hashed,
1388         then the hash will be hashed together with the string in the
1389         "convergence" argument to form the encryption key.
1390         """
1391         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1392         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1393
1394 class Uploader(service.MultiService, log.PrefixingLogMixin):
1395     """I am a service that allows file uploading. I am a service-child of the
1396     Client.
1397     """
1398     implements(IUploader)
1399     name = "uploader"
1400     URI_LIT_SIZE_THRESHOLD = 55
1401
1402     def __init__(self, helper_furl=None, stats_provider=None):
1403         self._helper_furl = helper_furl
1404         self.stats_provider = stats_provider
1405         self._helper = None
1406         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1407         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1408         service.MultiService.__init__(self)
1409
1410     def startService(self):
1411         service.MultiService.startService(self)
1412         if self._helper_furl:
1413             self.parent.tub.connectTo(self._helper_furl,
1414                                       self._got_helper)
1415
1416     def _got_helper(self, helper):
1417         self.log("got helper connection, getting versions")
1418         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1419                     { },
1420                     "application-version": "unknown: no get_version()",
1421                     }
1422         d = add_version_to_remote_reference(helper, default)
1423         d.addCallback(self._got_versioned_helper)
1424
1425     def _got_versioned_helper(self, helper):
1426         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1427         if needed not in helper.version:
1428             raise InsufficientVersionError(needed, helper.version)
1429         self._helper = helper
1430         helper.notifyOnDisconnect(self._lost_helper)
1431
1432     def _lost_helper(self):
1433         self._helper = None
1434
1435     def get_helper_info(self):
1436         # return a tuple of (helper_furl_or_None, connected_bool)
1437         return (self._helper_furl, bool(self._helper))
1438
1439
1440     def upload(self, uploadable, history=None):
1441         """
1442         Returns a Deferred that will fire with the UploadResults instance.
1443         """
1444         assert self.parent
1445         assert self.running
1446
1447         uploadable = IUploadable(uploadable)
1448         d = uploadable.get_size()
1449         def _got_size(size):
1450             default_params = self.parent.get_encoding_parameters()
1451             precondition(isinstance(default_params, dict), default_params)
1452             precondition("max_segment_size" in default_params, default_params)
1453             uploadable.set_default_encoding_parameters(default_params)
1454
1455             if self.stats_provider:
1456                 self.stats_provider.count('uploader.files_uploaded', 1)
1457                 self.stats_provider.count('uploader.bytes_uploaded', size)
1458
1459             if size <= self.URI_LIT_SIZE_THRESHOLD:
1460                 uploader = LiteralUploader()
1461                 return uploader.start(uploadable)
1462             else:
1463                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1464                 d2 = defer.succeed(None)
1465                 if self._helper:
1466                     uploader = AssistedUploader(self._helper)
1467                     d2.addCallback(lambda x: eu.get_storage_index())
1468                     d2.addCallback(lambda si: uploader.start(eu, si))
1469                 else:
1470                     storage_broker = self.parent.get_storage_broker()
1471                     secret_holder = self.parent._secret_holder
1472                     uploader = CHKUploader(storage_broker, secret_holder)
1473                     d2.addCallback(lambda x: uploader.start(eu))
1474
1475                 self._all_uploads[uploader] = None
1476                 if history:
1477                     history.add_upload(uploader.get_upload_status())
1478                 def turn_verifycap_into_read_cap(uploadresults):
1479                     # Generate the uri from the verifycap plus the key.
1480                     d3 = uploadable.get_encryption_key()
1481                     def put_readcap_into_results(key):
1482                         v = uri.from_string(uploadresults.verifycapstr)
1483                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1484                         uploadresults.uri = r.to_string()
1485                         return uploadresults
1486                     d3.addCallback(put_readcap_into_results)
1487                     return d3
1488                 d2.addCallback(turn_verifycap_into_read_cap)
1489                 return d2
1490         d.addCallback(_got_size)
1491         def _done(res):
1492             uploadable.close()
1493             return res
1494         d.addBoth(_done)
1495         return d