]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
trivial: rename and add in-line doc to clarify "used_peers" => "upload_servers"
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17                                          shares_by_server, merge_peers, \
18                                          failure_message
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23      NoServersError, InsufficientVersionError, UploadUnhappinessError
24 from allmydata.immutable import layout
25 from pycryptopp.cipher.aes import AES
26
27 from cStringIO import StringIO
28
29
30 KiB=1024
31 MiB=1024*KiB
32 GiB=1024*MiB
33 TiB=1024*GiB
34 PiB=1024*TiB
35
36 class HaveAllPeersError(Exception):
37     # we use this to jump out of the loop
38     pass
39
40 # this wants to live in storage, not here
41 class TooFullError(Exception):
42     pass
43
44 class UploadResults(Copyable, RemoteCopy):
45     implements(IUploadResults)
46     # note: don't change this string, it needs to match the value used on the
47     # helper, and it does *not* need to match the fully-qualified
48     # package/module/class name
49     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
50     copytype = typeToCopy
51
52     # also, think twice about changing the shape of any existing attribute,
53     # because instances of this class are sent from the helper to its client,
54     # so changing this may break compatibility. Consider adding new fields
55     # instead of modifying existing ones.
56
57     def __init__(self):
58         self.timings = {} # dict of name to number of seconds
59         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
60         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
61         self.file_size = None
62         self.ciphertext_fetched = None # how much the helper fetched
63         self.uri = None
64         self.preexisting_shares = None # count of shares already present
65         self.pushed_shares = None # count of shares we pushed
66
67
68 # our current uri_extension is 846 bytes for small files, a few bytes
69 # more for larger ones (since the filesize is encoded in decimal in a
70 # few places). Ask for a little bit more just in case we need it. If
71 # the extension changes size, we can change EXTENSION_SIZE to
72 # allocate a more accurate amount of space.
73 EXTENSION_SIZE = 1000
74 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
75 # this.
76
77 class PeerTracker:
78     def __init__(self, peerid, storage_server,
79                  sharesize, blocksize, num_segments, num_share_hashes,
80                  storage_index,
81                  bucket_renewal_secret, bucket_cancel_secret):
82         precondition(isinstance(peerid, str), peerid)
83         precondition(len(peerid) == 20, peerid)
84         self.peerid = peerid
85         self._storageserver = storage_server # to an RIStorageServer
86         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
87         self.sharesize = sharesize
88
89         wbp = layout.make_write_bucket_proxy(None, sharesize,
90                                              blocksize, num_segments,
91                                              num_share_hashes,
92                                              EXTENSION_SIZE, peerid)
93         self.wbp_class = wbp.__class__ # to create more of them
94         self.allocated_size = wbp.get_allocated_size()
95         self.blocksize = blocksize
96         self.num_segments = num_segments
97         self.num_share_hashes = num_share_hashes
98         self.storage_index = storage_index
99
100         self.renew_secret = bucket_renewal_secret
101         self.cancel_secret = bucket_cancel_secret
102
103     def __repr__(self):
104         return ("<PeerTracker for peer %s and SI %s>"
105                 % (idlib.shortnodeid_b2a(self.peerid),
106                    si_b2a(self.storage_index)[:5]))
107
108     def query(self, sharenums):
109         d = self._storageserver.callRemote("allocate_buckets",
110                                            self.storage_index,
111                                            self.renew_secret,
112                                            self.cancel_secret,
113                                            sharenums,
114                                            self.allocated_size,
115                                            canary=Referenceable())
116         d.addCallback(self._got_reply)
117         return d
118
119     def ask_about_existing_shares(self):
120         return self._storageserver.callRemote("get_buckets",
121                                               self.storage_index)
122
123     def _got_reply(self, (alreadygot, buckets)):
124         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
125         b = {}
126         for sharenum, rref in buckets.iteritems():
127             bp = self.wbp_class(rref, self.sharesize,
128                                 self.blocksize,
129                                 self.num_segments,
130                                 self.num_share_hashes,
131                                 EXTENSION_SIZE,
132                                 self.peerid)
133             b[sharenum] = bp
134         self.buckets.update(b)
135         return (alreadygot, set(b.keys()))
136
137
138     def abort(self):
139         """
140         I abort the remote bucket writers for all shares. This is a good idea
141         to conserve space on the storage server.
142         """
143         self.abort_some_buckets(self.buckets.keys())
144
145     def abort_some_buckets(self, sharenums):
146         """
147         I abort the remote bucket writers for the share numbers in sharenums.
148         """
149         for sharenum in sharenums:
150             if sharenum in self.buckets:
151                 self.buckets[sharenum].abort()
152                 del self.buckets[sharenum]
153
154
155 class Tahoe2PeerSelector:
156
157     def __init__(self, upload_id, logparent=None, upload_status=None):
158         self.upload_id = upload_id
159         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
160         # Peers that are working normally, but full.
161         self.full_count = 0
162         self.error_count = 0
163         self.num_peers_contacted = 0
164         self.last_failure_msg = None
165         self._status = IUploadStatus(upload_status)
166         self._log_parent = log.msg("%s starting" % self, parent=logparent)
167
168     def __repr__(self):
169         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
170
171     def get_shareholders(self, storage_broker, secret_holder,
172                          storage_index, share_size, block_size,
173                          num_segments, total_shares, needed_shares,
174                          servers_of_happiness):
175         """
176         @return: (upload_servers, already_peers), where upload_servers is a set of
177                  PeerTracker instances that have agreed to hold some shares
178                  for us (the shareids are stashed inside the PeerTracker),
179                  and already_peers is a dict mapping shnum to a set of peers
180                  which claim to already have the share.
181         """
182
183         if self._status:
184             self._status.set_status("Contacting Peers..")
185
186         self.total_shares = total_shares
187         self.servers_of_happiness = servers_of_happiness
188         self.needed_shares = needed_shares
189
190         self.homeless_shares = range(total_shares)
191         self.contacted_peers = [] # peers worth asking again
192         self.contacted_peers2 = [] # peers that we have asked again
193         self._started_second_pass = False
194         self.use_peers = set() # PeerTrackers that have shares assigned to them
195         self.preexisting_shares = {} # shareid => set(peerids) holding shareid
196         # We don't try to allocate shares to these servers, since they've said
197         # that they're incapable of storing shares of the size that we'd want
198         # to store. We keep them around because they may have existing shares
199         # for this storage index, which we want to know about for accurate
200         # servers_of_happiness accounting
201         # (this is eventually a list, but it is initialized later)
202         self.readonly_peers = None
203         # These peers have shares -- any shares -- for our SI. We keep
204         # track of these to write an error message with them later.
205         self.peers_with_shares = set()
206
207         # this needed_hashes computation should mirror
208         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
209         # (instead of a HashTree) because we don't require actual hashing
210         # just to count the levels.
211         ht = hashtree.IncompleteHashTree(total_shares)
212         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
213
214         # figure out how much space to ask for
215         wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
216                                              num_share_hashes, EXTENSION_SIZE,
217                                              None)
218         allocated_size = wbp.get_allocated_size()
219         all_peers = storage_broker.get_servers_for_index(storage_index)
220         if not all_peers:
221             raise NoServersError("client gave us zero peers")
222
223         # filter the list of peers according to which ones can accomodate
224         # this request. This excludes older peers (which used a 4-byte size
225         # field) from getting large shares (for files larger than about
226         # 12GiB). See #439 for details.
227         def _get_maxsize(peer):
228             (peerid, conn) = peer
229             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
230             return v1["maximum-immutable-share-size"]
231         writable_peers = [peer for peer in all_peers
232                           if _get_maxsize(peer) >= allocated_size]
233         readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers)
234
235         # decide upon the renewal/cancel secrets, to include them in the
236         # allocate_buckets query.
237         client_renewal_secret = secret_holder.get_renewal_secret()
238         client_cancel_secret = secret_holder.get_cancel_secret()
239
240         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
241                                                        storage_index)
242         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
243                                                      storage_index)
244         def _make_trackers(peers):
245            return [PeerTracker(peerid, conn,
246                                share_size, block_size,
247                                num_segments, num_share_hashes,
248                                storage_index,
249                                bucket_renewal_secret_hash(file_renewal_secret,
250                                                           peerid),
251                                bucket_cancel_secret_hash(file_cancel_secret,
252                                                          peerid))
253                     for (peerid, conn) in peers]
254         self.uncontacted_peers = _make_trackers(writable_peers)
255         self.readonly_peers = _make_trackers(readonly_peers)
256         # We now ask peers that can't hold any new shares about existing
257         # shares that they might have for our SI. Once this is done, we
258         # start placing the shares that we haven't already accounted
259         # for.
260         ds = []
261         if self._status and self.readonly_peers:
262             self._status.set_status("Contacting readonly peers to find "
263                                     "any existing shares")
264         for peer in self.readonly_peers:
265             assert isinstance(peer, PeerTracker)
266             d = peer.ask_about_existing_shares()
267             d.addBoth(self._handle_existing_response, peer.peerid)
268             ds.append(d)
269             self.num_peers_contacted += 1
270             self.query_count += 1
271             log.msg("asking peer %s for any existing shares for "
272                     "upload id %s"
273                     % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
274                     level=log.NOISY, parent=self._log_parent)
275         dl = defer.DeferredList(ds)
276         dl.addCallback(lambda ign: self._loop())
277         return dl
278
279
280     def _handle_existing_response(self, res, peer):
281         """
282         I handle responses to the queries sent by
283         Tahoe2PeerSelector._existing_shares.
284         """
285         if isinstance(res, failure.Failure):
286             log.msg("%s got error during existing shares check: %s"
287                     % (idlib.shortnodeid_b2a(peer), res),
288                     level=log.UNUSUAL, parent=self._log_parent)
289             self.error_count += 1
290             self.bad_query_count += 1
291         else:
292             buckets = res
293             if buckets:
294                 self.peers_with_shares.add(peer)
295             log.msg("response from peer %s: alreadygot=%s"
296                     % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
297                     level=log.NOISY, parent=self._log_parent)
298             for bucket in buckets:
299                 self.preexisting_shares.setdefault(bucket, set()).add(peer)
300                 if self.homeless_shares and bucket in self.homeless_shares:
301                     self.homeless_shares.remove(bucket)
302             self.full_count += 1
303             self.bad_query_count += 1
304
305
306     def _get_progress_message(self):
307         if not self.homeless_shares:
308             msg = "placed all %d shares, " % (self.total_shares)
309         else:
310             msg = ("placed %d shares out of %d total (%d homeless), " %
311                    (self.total_shares - len(self.homeless_shares),
312                     self.total_shares,
313                     len(self.homeless_shares)))
314         return (msg + "want to place shares on at least %d servers such that "
315                       "any %d of them have enough shares to recover the file, "
316                       "sent %d queries to %d peers, "
317                       "%d queries placed some shares, %d placed none "
318                       "(of which %d placed none due to the server being"
319                       " full and %d placed none due to an error)" %
320                         (self.servers_of_happiness, self.needed_shares,
321                          self.query_count, self.num_peers_contacted,
322                          self.good_query_count, self.bad_query_count,
323                          self.full_count, self.error_count))
324
325
326     def _loop(self):
327         if not self.homeless_shares:
328             merged = merge_peers(self.preexisting_shares, self.use_peers)
329             effective_happiness = servers_of_happiness(merged)
330             if self.servers_of_happiness <= effective_happiness:
331                 msg = ("peer selection successful for %s: %s" % (self,
332                             self._get_progress_message()))
333                 log.msg(msg, parent=self._log_parent)
334                 return (self.use_peers, self.preexisting_shares)
335             else:
336                 # We're not okay right now, but maybe we can fix it by
337                 # redistributing some shares. In cases where one or two
338                 # servers has, before the upload, all or most of the
339                 # shares for a given SI, this can work by allowing _loop
340                 # a chance to spread those out over the other peers, 
341                 delta = self.servers_of_happiness - effective_happiness
342                 shares = shares_by_server(self.preexisting_shares)
343                 # Each server in shares maps to a set of shares stored on it.
344                 # Since we want to keep at least one share on each server 
345                 # that has one (otherwise we'd only be making
346                 # the situation worse by removing distinct servers),
347                 # each server has len(its shares) - 1 to spread around.
348                 shares_to_spread = sum([len(list(sharelist)) - 1
349                                         for (server, sharelist)
350                                         in shares.items()])
351                 if delta <= len(self.uncontacted_peers) and \
352                    shares_to_spread >= delta:
353                     items = shares.items()
354                     while len(self.homeless_shares) < delta:
355                         # Loop through the allocated shares, removing
356                         # one from each server that has more than one
357                         # and putting it back into self.homeless_shares
358                         # until we've done this delta times.
359                         server, sharelist = items.pop()
360                         if len(sharelist) > 1:
361                             share = sharelist.pop()
362                             self.homeless_shares.append(share)
363                             self.preexisting_shares[share].remove(server)
364                             if not self.preexisting_shares[share]:
365                                 del self.preexisting_shares[share]
366                             items.append((server, sharelist))
367                         for writer in self.use_peers:
368                             writer.abort_some_buckets(self.homeless_shares)
369                     return self._loop()
370                 else:
371                     # Redistribution won't help us; fail.
372                     peer_count = len(self.peers_with_shares)
373                     msg = failure_message(peer_count,
374                                           self.needed_shares,
375                                           self.servers_of_happiness,
376                                           effective_happiness)
377                     log.msg("server selection unsuccessful for %r: %s (%s), merged=%r"
378                             % (self, msg, self._get_progress_message(), merged), level=log.INFREQUENT)
379                     return self._failed("%s (%s)" % (msg, self._get_progress_message()))
380
381         if self.uncontacted_peers:
382             peer = self.uncontacted_peers.pop(0)
383             # TODO: don't pre-convert all peerids to PeerTrackers
384             assert isinstance(peer, PeerTracker)
385
386             shares_to_ask = set([self.homeless_shares.pop(0)])
387             self.query_count += 1
388             self.num_peers_contacted += 1
389             if self._status:
390                 self._status.set_status("Contacting Peers [%s] (first query),"
391                                         " %d shares left.."
392                                         % (idlib.shortnodeid_b2a(peer.peerid),
393                                            len(self.homeless_shares)))
394             d = peer.query(shares_to_ask)
395             d.addBoth(self._got_response, peer, shares_to_ask,
396                       self.contacted_peers)
397             return d
398         elif self.contacted_peers:
399             # ask a peer that we've already asked.
400             if not self._started_second_pass:
401                 log.msg("starting second pass", parent=self._log_parent,
402                         level=log.NOISY)
403                 self._started_second_pass = True
404             num_shares = mathutil.div_ceil(len(self.homeless_shares),
405                                            len(self.contacted_peers))
406             peer = self.contacted_peers.pop(0)
407             shares_to_ask = set(self.homeless_shares[:num_shares])
408             self.homeless_shares[:num_shares] = []
409             self.query_count += 1
410             if self._status:
411                 self._status.set_status("Contacting Peers [%s] (second query),"
412                                         " %d shares left.."
413                                         % (idlib.shortnodeid_b2a(peer.peerid),
414                                            len(self.homeless_shares)))
415             d = peer.query(shares_to_ask)
416             d.addBoth(self._got_response, peer, shares_to_ask,
417                       self.contacted_peers2)
418             return d
419         elif self.contacted_peers2:
420             # we've finished the second-or-later pass. Move all the remaining
421             # peers back into self.contacted_peers for the next pass.
422             self.contacted_peers.extend(self.contacted_peers2)
423             self.contacted_peers2[:] = []
424             return self._loop()
425         else:
426             # no more peers. If we haven't placed enough shares, we fail.
427             merged = merge_peers(self.preexisting_shares, self.use_peers)
428             effective_happiness = servers_of_happiness(merged)
429             if effective_happiness < self.servers_of_happiness:
430                 msg = failure_message(len(self.peers_with_shares),
431                                       self.needed_shares,
432                                       self.servers_of_happiness,
433                                       effective_happiness)
434                 msg = ("peer selection failed for %s: %s (%s)" % (self,
435                                 msg,
436                                 self._get_progress_message()))
437                 if self.last_failure_msg:
438                     msg += " (%s)" % (self.last_failure_msg,)
439                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
440                 return self._failed(msg)
441             else:
442                 # we placed enough to be happy, so we're done
443                 if self._status:
444                     self._status.set_status("Placed all shares")
445                 return (self.use_peers, self.preexisting_shares)
446
447     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
448         if isinstance(res, failure.Failure):
449             # This is unusual, and probably indicates a bug or a network
450             # problem.
451             log.msg("%s got error during peer selection: %s" % (peer, res),
452                     level=log.UNUSUAL, parent=self._log_parent)
453             self.error_count += 1
454             self.bad_query_count += 1
455             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
456             if (self.uncontacted_peers
457                 or self.contacted_peers
458                 or self.contacted_peers2):
459                 # there is still hope, so just loop
460                 pass
461             else:
462                 # No more peers, so this upload might fail (it depends upon
463                 # whether we've hit servers_of_happiness or not). Log the last
464                 # failure we got: if a coding error causes all peers to fail
465                 # in the same way, this allows the common failure to be seen
466                 # by the uploader and should help with debugging
467                 msg = ("last failure (from %s) was: %s" % (peer, res))
468                 self.last_failure_msg = msg
469         else:
470             (alreadygot, allocated) = res
471             log.msg("response from peer %s: alreadygot=%s, allocated=%s"
472                     % (idlib.shortnodeid_b2a(peer.peerid),
473                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
474                     level=log.NOISY, parent=self._log_parent)
475             progress = False
476             for s in alreadygot:
477                 self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
478                 if s in self.homeless_shares:
479                     self.homeless_shares.remove(s)
480                     progress = True
481                 elif s in shares_to_ask:
482                     progress = True
483
484             # the PeerTracker will remember which shares were allocated on
485             # that peer. We just have to remember to use them.
486             if allocated:
487                 self.use_peers.add(peer)
488                 progress = True
489
490             if allocated or alreadygot:
491                 self.peers_with_shares.add(peer.peerid)
492
493             not_yet_present = set(shares_to_ask) - set(alreadygot)
494             still_homeless = not_yet_present - set(allocated)
495
496             if progress:
497                 # They accepted at least one of the shares that we asked
498                 # them to accept, or they had a share that we didn't ask
499                 # them to accept but that we hadn't placed yet, so this
500                 # was a productive query
501                 self.good_query_count += 1
502             else:
503                 self.bad_query_count += 1
504                 self.full_count += 1
505
506             if still_homeless:
507                 # In networks with lots of space, this is very unusual and
508                 # probably indicates an error. In networks with peers that
509                 # are full, it is merely unusual. In networks that are very
510                 # full, it is common, and many uploads will fail. In most
511                 # cases, this is obviously not fatal, and we'll just use some
512                 # other peers.
513
514                 # some shares are still homeless, keep trying to find them a
515                 # home. The ones that were rejected get first priority.
516                 self.homeless_shares = (list(still_homeless)
517                                         + self.homeless_shares)
518                 # Since they were unable to accept all of our requests, so it
519                 # is safe to assume that asking them again won't help.
520             else:
521                 # if they *were* able to accept everything, they might be
522                 # willing to accept even more.
523                 put_peer_here.append(peer)
524
525         # now loop
526         return self._loop()
527
528
529     def _failed(self, msg):
530         """
531         I am called when peer selection fails. I first abort all of the
532         remote buckets that I allocated during my unsuccessful attempt to
533         place shares for this file. I then raise an
534         UploadUnhappinessError with my msg argument.
535         """
536         for peer in self.use_peers:
537             assert isinstance(peer, PeerTracker)
538
539             peer.abort()
540
541         raise UploadUnhappinessError(msg)
542
543
544 class EncryptAnUploadable:
545     """This is a wrapper that takes an IUploadable and provides
546     IEncryptedUploadable."""
547     implements(IEncryptedUploadable)
548     CHUNKSIZE = 50*1024
549
550     def __init__(self, original, log_parent=None):
551         self.original = IUploadable(original)
552         self._log_number = log_parent
553         self._encryptor = None
554         self._plaintext_hasher = plaintext_hasher()
555         self._plaintext_segment_hasher = None
556         self._plaintext_segment_hashes = []
557         self._encoding_parameters = None
558         self._file_size = None
559         self._ciphertext_bytes_read = 0
560         self._status = None
561
562     def set_upload_status(self, upload_status):
563         self._status = IUploadStatus(upload_status)
564         self.original.set_upload_status(upload_status)
565
566     def log(self, *args, **kwargs):
567         if "facility" not in kwargs:
568             kwargs["facility"] = "upload.encryption"
569         if "parent" not in kwargs:
570             kwargs["parent"] = self._log_number
571         return log.msg(*args, **kwargs)
572
573     def get_size(self):
574         if self._file_size is not None:
575             return defer.succeed(self._file_size)
576         d = self.original.get_size()
577         def _got_size(size):
578             self._file_size = size
579             if self._status:
580                 self._status.set_size(size)
581             return size
582         d.addCallback(_got_size)
583         return d
584
585     def get_all_encoding_parameters(self):
586         if self._encoding_parameters is not None:
587             return defer.succeed(self._encoding_parameters)
588         d = self.original.get_all_encoding_parameters()
589         def _got(encoding_parameters):
590             (k, happy, n, segsize) = encoding_parameters
591             self._segment_size = segsize # used by segment hashers
592             self._encoding_parameters = encoding_parameters
593             self.log("my encoding parameters: %s" % (encoding_parameters,),
594                      level=log.NOISY)
595             return encoding_parameters
596         d.addCallback(_got)
597         return d
598
599     def _get_encryptor(self):
600         if self._encryptor:
601             return defer.succeed(self._encryptor)
602
603         d = self.original.get_encryption_key()
604         def _got(key):
605             e = AES(key)
606             self._encryptor = e
607
608             storage_index = storage_index_hash(key)
609             assert isinstance(storage_index, str)
610             # There's no point to having the SI be longer than the key, so we
611             # specify that it is truncated to the same 128 bits as the AES key.
612             assert len(storage_index) == 16  # SHA-256 truncated to 128b
613             self._storage_index = storage_index
614             if self._status:
615                 self._status.set_storage_index(storage_index)
616             return e
617         d.addCallback(_got)
618         return d
619
620     def get_storage_index(self):
621         d = self._get_encryptor()
622         d.addCallback(lambda res: self._storage_index)
623         return d
624
625     def _get_segment_hasher(self):
626         p = self._plaintext_segment_hasher
627         if p:
628             left = self._segment_size - self._plaintext_segment_hashed_bytes
629             return p, left
630         p = plaintext_segment_hasher()
631         self._plaintext_segment_hasher = p
632         self._plaintext_segment_hashed_bytes = 0
633         return p, self._segment_size
634
635     def _update_segment_hash(self, chunk):
636         offset = 0
637         while offset < len(chunk):
638             p, segment_left = self._get_segment_hasher()
639             chunk_left = len(chunk) - offset
640             this_segment = min(chunk_left, segment_left)
641             p.update(chunk[offset:offset+this_segment])
642             self._plaintext_segment_hashed_bytes += this_segment
643
644             if self._plaintext_segment_hashed_bytes == self._segment_size:
645                 # we've filled this segment
646                 self._plaintext_segment_hashes.append(p.digest())
647                 self._plaintext_segment_hasher = None
648                 self.log("closed hash [%d]: %dB" %
649                          (len(self._plaintext_segment_hashes)-1,
650                           self._plaintext_segment_hashed_bytes),
651                          level=log.NOISY)
652                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
653                          segnum=len(self._plaintext_segment_hashes)-1,
654                          hash=base32.b2a(p.digest()),
655                          level=log.NOISY)
656
657             offset += this_segment
658
659
660     def read_encrypted(self, length, hash_only):
661         # make sure our parameters have been set up first
662         d = self.get_all_encoding_parameters()
663         # and size
664         d.addCallback(lambda ignored: self.get_size())
665         d.addCallback(lambda ignored: self._get_encryptor())
666         # then fetch and encrypt the plaintext. The unusual structure here
667         # (passing a Deferred *into* a function) is needed to avoid
668         # overflowing the stack: Deferreds don't optimize out tail recursion.
669         # We also pass in a list, to which _read_encrypted will append
670         # ciphertext.
671         ciphertext = []
672         d2 = defer.Deferred()
673         d.addCallback(lambda ignored:
674                       self._read_encrypted(length, ciphertext, hash_only, d2))
675         d.addCallback(lambda ignored: d2)
676         return d
677
678     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
679         if not remaining:
680             fire_when_done.callback(ciphertext)
681             return None
682         # tolerate large length= values without consuming a lot of RAM by
683         # reading just a chunk (say 50kB) at a time. This only really matters
684         # when hash_only==True (i.e. resuming an interrupted upload), since
685         # that's the case where we will be skipping over a lot of data.
686         size = min(remaining, self.CHUNKSIZE)
687         remaining = remaining - size
688         # read a chunk of plaintext..
689         d = defer.maybeDeferred(self.original.read, size)
690         # N.B.: if read() is synchronous, then since everything else is
691         # actually synchronous too, we'd blow the stack unless we stall for a
692         # tick. Once you accept a Deferred from IUploadable.read(), you must
693         # be prepared to have it fire immediately too.
694         d.addCallback(fireEventually)
695         def _good(plaintext):
696             # and encrypt it..
697             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
698             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
699             ciphertext.extend(ct)
700             self._read_encrypted(remaining, ciphertext, hash_only,
701                                  fire_when_done)
702         def _err(why):
703             fire_when_done.errback(why)
704         d.addCallback(_good)
705         d.addErrback(_err)
706         return None
707
708     def _hash_and_encrypt_plaintext(self, data, hash_only):
709         assert isinstance(data, (tuple, list)), type(data)
710         data = list(data)
711         cryptdata = []
712         # we use data.pop(0) instead of 'for chunk in data' to save
713         # memory: each chunk is destroyed as soon as we're done with it.
714         bytes_processed = 0
715         while data:
716             chunk = data.pop(0)
717             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
718                      level=log.NOISY)
719             bytes_processed += len(chunk)
720             self._plaintext_hasher.update(chunk)
721             self._update_segment_hash(chunk)
722             # TODO: we have to encrypt the data (even if hash_only==True)
723             # because pycryptopp's AES-CTR implementation doesn't offer a
724             # way to change the counter value. Once pycryptopp acquires
725             # this ability, change this to simply update the counter
726             # before each call to (hash_only==False) _encryptor.process()
727             ciphertext = self._encryptor.process(chunk)
728             if hash_only:
729                 self.log("  skipping encryption", level=log.NOISY)
730             else:
731                 cryptdata.append(ciphertext)
732             del ciphertext
733             del chunk
734         self._ciphertext_bytes_read += bytes_processed
735         if self._status:
736             progress = float(self._ciphertext_bytes_read) / self._file_size
737             self._status.set_progress(1, progress)
738         return cryptdata
739
740
741     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
742         # this is currently unused, but will live again when we fix #453
743         if len(self._plaintext_segment_hashes) < num_segments:
744             # close out the last one
745             assert len(self._plaintext_segment_hashes) == num_segments-1
746             p, segment_left = self._get_segment_hasher()
747             self._plaintext_segment_hashes.append(p.digest())
748             del self._plaintext_segment_hasher
749             self.log("closing plaintext leaf hasher, hashed %d bytes" %
750                      self._plaintext_segment_hashed_bytes,
751                      level=log.NOISY)
752             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
753                      segnum=len(self._plaintext_segment_hashes)-1,
754                      hash=base32.b2a(p.digest()),
755                      level=log.NOISY)
756         assert len(self._plaintext_segment_hashes) == num_segments
757         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
758
759     def get_plaintext_hash(self):
760         h = self._plaintext_hasher.digest()
761         return defer.succeed(h)
762
763     def close(self):
764         return self.original.close()
765
766 class UploadStatus:
767     implements(IUploadStatus)
768     statusid_counter = itertools.count(0)
769
770     def __init__(self):
771         self.storage_index = None
772         self.size = None
773         self.helper = False
774         self.status = "Not started"
775         self.progress = [0.0, 0.0, 0.0]
776         self.active = True
777         self.results = None
778         self.counter = self.statusid_counter.next()
779         self.started = time.time()
780
781     def get_started(self):
782         return self.started
783     def get_storage_index(self):
784         return self.storage_index
785     def get_size(self):
786         return self.size
787     def using_helper(self):
788         return self.helper
789     def get_status(self):
790         return self.status
791     def get_progress(self):
792         return tuple(self.progress)
793     def get_active(self):
794         return self.active
795     def get_results(self):
796         return self.results
797     def get_counter(self):
798         return self.counter
799
800     def set_storage_index(self, si):
801         self.storage_index = si
802     def set_size(self, size):
803         self.size = size
804     def set_helper(self, helper):
805         self.helper = helper
806     def set_status(self, status):
807         self.status = status
808     def set_progress(self, which, value):
809         # [0]: chk, [1]: ciphertext, [2]: encode+push
810         self.progress[which] = value
811     def set_active(self, value):
812         self.active = value
813     def set_results(self, value):
814         self.results = value
815
816 class CHKUploader:
817     peer_selector_class = Tahoe2PeerSelector
818
819     def __init__(self, storage_broker, secret_holder):
820         # peer_selector needs storage_broker and secret_holder
821         self._storage_broker = storage_broker
822         self._secret_holder = secret_holder
823         self._log_number = self.log("CHKUploader starting", parent=None)
824         self._encoder = None
825         self._results = UploadResults()
826         self._storage_index = None
827         self._upload_status = UploadStatus()
828         self._upload_status.set_helper(False)
829         self._upload_status.set_active(True)
830         self._upload_status.set_results(self._results)
831
832         # locate_all_shareholders() will create the following attribute:
833         # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
834
835     def log(self, *args, **kwargs):
836         if "parent" not in kwargs:
837             kwargs["parent"] = self._log_number
838         if "facility" not in kwargs:
839             kwargs["facility"] = "tahoe.upload"
840         return log.msg(*args, **kwargs)
841
842     def start(self, encrypted_uploadable):
843         """Start uploading the file.
844
845         Returns a Deferred that will fire with the UploadResults instance.
846         """
847
848         self._started = time.time()
849         eu = IEncryptedUploadable(encrypted_uploadable)
850         self.log("starting upload of %s" % eu)
851
852         eu.set_upload_status(self._upload_status)
853         d = self.start_encrypted(eu)
854         def _done(uploadresults):
855             self._upload_status.set_active(False)
856             return uploadresults
857         d.addBoth(_done)
858         return d
859
860     def abort(self):
861         """Call this if the upload must be abandoned before it completes.
862         This will tell the shareholders to delete their partial shares. I
863         return a Deferred that fires when these messages have been acked."""
864         if not self._encoder:
865             # how did you call abort() before calling start() ?
866             return defer.succeed(None)
867         return self._encoder.abort()
868
869     def start_encrypted(self, encrypted):
870         """ Returns a Deferred that will fire with the UploadResults instance. """
871         eu = IEncryptedUploadable(encrypted)
872
873         started = time.time()
874         self._encoder = e = encode.Encoder(self._log_number,
875                                            self._upload_status)
876         d = e.set_encrypted_uploadable(eu)
877         d.addCallback(self.locate_all_shareholders, started)
878         d.addCallback(self.set_shareholders, e)
879         d.addCallback(lambda res: e.start())
880         d.addCallback(self._encrypted_done)
881         return d
882
883     def locate_all_shareholders(self, encoder, started):
884         peer_selection_started = now = time.time()
885         self._storage_index_elapsed = now - started
886         storage_broker = self._storage_broker
887         secret_holder = self._secret_holder
888         storage_index = encoder.get_param("storage_index")
889         self._storage_index = storage_index
890         upload_id = si_b2a(storage_index)[:5]
891         self.log("using storage index %s" % upload_id)
892         peer_selector = self.peer_selector_class(upload_id, self._log_number,
893                                                  self._upload_status)
894
895         share_size = encoder.get_param("share_size")
896         block_size = encoder.get_param("block_size")
897         num_segments = encoder.get_param("num_segments")
898         k,desired,n = encoder.get_param("share_counts")
899
900         self._peer_selection_started = time.time()
901         d = peer_selector.get_shareholders(storage_broker, secret_holder,
902                                            storage_index,
903                                            share_size, block_size,
904                                            num_segments, n, k, desired)
905         def _done(res):
906             self._peer_selection_elapsed = time.time() - peer_selection_started
907             return res
908         d.addCallback(_done)
909         return d
910
911     def set_shareholders(self, (upload_servers, already_peers), encoder):
912         """
913         @param upload_servers: a sequence of PeerTracker objects that have agreed to hold some shares for us (the shareids are stashed inside the PeerTracker)
914         @paran already_peers: a dict mapping sharenum to a set of peerids
915                               that claim to already have this share
916         """
917         self.log("_send_shares, upload_servers is %s" % (upload_servers,))
918         # record already-present shares in self._results
919         self._results.preexisting_shares = len(already_peers)
920
921         self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
922         for peer in upload_servers:
923             assert isinstance(peer, PeerTracker)
924         buckets = {}
925         servermap = already_peers.copy()
926         for peer in upload_servers:
927             buckets.update(peer.buckets)
928             for shnum in peer.buckets:
929                 self._peer_trackers[shnum] = peer
930                 servermap.setdefault(shnum, set()).add(peer.peerid)
931         assert len(buckets) == sum([len(peer.buckets) for peer in upload_servers]), "%s (%s) != %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers])
932         encoder.set_shareholders(buckets, servermap)
933
934     def _encrypted_done(self, verifycap):
935         """ Returns a Deferred that will fire with the UploadResults instance. """
936         r = self._results
937         for shnum in self._encoder.get_shares_placed():
938             peer_tracker = self._peer_trackers[shnum]
939             peerid = peer_tracker.peerid
940             r.sharemap.add(shnum, peerid)
941             r.servermap.add(peerid, shnum)
942         r.pushed_shares = len(self._encoder.get_shares_placed())
943         now = time.time()
944         r.file_size = self._encoder.file_size
945         r.timings["total"] = now - self._started
946         r.timings["storage_index"] = self._storage_index_elapsed
947         r.timings["peer_selection"] = self._peer_selection_elapsed
948         r.timings.update(self._encoder.get_times())
949         r.uri_extension_data = self._encoder.get_uri_extension_data()
950         r.verifycapstr = verifycap.to_string()
951         return r
952
953     def get_upload_status(self):
954         return self._upload_status
955
956 def read_this_many_bytes(uploadable, size, prepend_data=[]):
957     if size == 0:
958         return defer.succeed([])
959     d = uploadable.read(size)
960     def _got(data):
961         assert isinstance(data, list)
962         bytes = sum([len(piece) for piece in data])
963         assert bytes > 0
964         assert bytes <= size
965         remaining = size - bytes
966         if remaining:
967             return read_this_many_bytes(uploadable, remaining,
968                                         prepend_data + data)
969         return prepend_data + data
970     d.addCallback(_got)
971     return d
972
973 class LiteralUploader:
974
975     def __init__(self):
976         self._results = UploadResults()
977         self._status = s = UploadStatus()
978         s.set_storage_index(None)
979         s.set_helper(False)
980         s.set_progress(0, 1.0)
981         s.set_active(False)
982         s.set_results(self._results)
983
984     def start(self, uploadable):
985         uploadable = IUploadable(uploadable)
986         d = uploadable.get_size()
987         def _got_size(size):
988             self._size = size
989             self._status.set_size(size)
990             self._results.file_size = size
991             return read_this_many_bytes(uploadable, size)
992         d.addCallback(_got_size)
993         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
994         d.addCallback(lambda u: u.to_string())
995         d.addCallback(self._build_results)
996         return d
997
998     def _build_results(self, uri):
999         self._results.uri = uri
1000         self._status.set_status("Finished")
1001         self._status.set_progress(1, 1.0)
1002         self._status.set_progress(2, 1.0)
1003         return self._results
1004
1005     def close(self):
1006         pass
1007
1008     def get_upload_status(self):
1009         return self._status
1010
1011 class RemoteEncryptedUploadable(Referenceable):
1012     implements(RIEncryptedUploadable)
1013
1014     def __init__(self, encrypted_uploadable, upload_status):
1015         self._eu = IEncryptedUploadable(encrypted_uploadable)
1016         self._offset = 0
1017         self._bytes_sent = 0
1018         self._status = IUploadStatus(upload_status)
1019         # we are responsible for updating the status string while we run, and
1020         # for setting the ciphertext-fetch progress.
1021         self._size = None
1022
1023     def get_size(self):
1024         if self._size is not None:
1025             return defer.succeed(self._size)
1026         d = self._eu.get_size()
1027         def _got_size(size):
1028             self._size = size
1029             return size
1030         d.addCallback(_got_size)
1031         return d
1032
1033     def remote_get_size(self):
1034         return self.get_size()
1035     def remote_get_all_encoding_parameters(self):
1036         return self._eu.get_all_encoding_parameters()
1037
1038     def _read_encrypted(self, length, hash_only):
1039         d = self._eu.read_encrypted(length, hash_only)
1040         def _read(strings):
1041             if hash_only:
1042                 self._offset += length
1043             else:
1044                 size = sum([len(data) for data in strings])
1045                 self._offset += size
1046             return strings
1047         d.addCallback(_read)
1048         return d
1049
1050     def remote_read_encrypted(self, offset, length):
1051         # we don't support seek backwards, but we allow skipping forwards
1052         precondition(offset >= 0, offset)
1053         precondition(length >= 0, length)
1054         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1055                      level=log.NOISY)
1056         precondition(offset >= self._offset, offset, self._offset)
1057         if offset > self._offset:
1058             # read the data from disk anyways, to build up the hash tree
1059             skip = offset - self._offset
1060             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1061                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1062             d = self._read_encrypted(skip, hash_only=True)
1063         else:
1064             d = defer.succeed(None)
1065
1066         def _at_correct_offset(res):
1067             assert offset == self._offset, "%d != %d" % (offset, self._offset)
1068             return self._read_encrypted(length, hash_only=False)
1069         d.addCallback(_at_correct_offset)
1070
1071         def _read(strings):
1072             size = sum([len(data) for data in strings])
1073             self._bytes_sent += size
1074             return strings
1075         d.addCallback(_read)
1076         return d
1077
1078     def remote_close(self):
1079         return self._eu.close()
1080
1081
1082 class AssistedUploader:
1083
1084     def __init__(self, helper):
1085         self._helper = helper
1086         self._log_number = log.msg("AssistedUploader starting")
1087         self._storage_index = None
1088         self._upload_status = s = UploadStatus()
1089         s.set_helper(True)
1090         s.set_active(True)
1091
1092     def log(self, *args, **kwargs):
1093         if "parent" not in kwargs:
1094             kwargs["parent"] = self._log_number
1095         return log.msg(*args, **kwargs)
1096
1097     def start(self, encrypted_uploadable, storage_index):
1098         """Start uploading the file.
1099
1100         Returns a Deferred that will fire with the UploadResults instance.
1101         """
1102         precondition(isinstance(storage_index, str), storage_index)
1103         self._started = time.time()
1104         eu = IEncryptedUploadable(encrypted_uploadable)
1105         eu.set_upload_status(self._upload_status)
1106         self._encuploadable = eu
1107         self._storage_index = storage_index
1108         d = eu.get_size()
1109         d.addCallback(self._got_size)
1110         d.addCallback(lambda res: eu.get_all_encoding_parameters())
1111         d.addCallback(self._got_all_encoding_parameters)
1112         d.addCallback(self._contact_helper)
1113         d.addCallback(self._build_verifycap)
1114         def _done(res):
1115             self._upload_status.set_active(False)
1116             return res
1117         d.addBoth(_done)
1118         return d
1119
1120     def _got_size(self, size):
1121         self._size = size
1122         self._upload_status.set_size(size)
1123
1124     def _got_all_encoding_parameters(self, params):
1125         k, happy, n, segment_size = params
1126         # stash these for URI generation later
1127         self._needed_shares = k
1128         self._total_shares = n
1129         self._segment_size = segment_size
1130
1131     def _contact_helper(self, res):
1132         now = self._time_contacting_helper_start = time.time()
1133         self._storage_index_elapsed = now - self._started
1134         self.log(format="contacting helper for SI %(si)s..",
1135                  si=si_b2a(self._storage_index))
1136         self._upload_status.set_status("Contacting Helper")
1137         d = self._helper.callRemote("upload_chk", self._storage_index)
1138         d.addCallback(self._contacted_helper)
1139         return d
1140
1141     def _contacted_helper(self, (upload_results, upload_helper)):
1142         now = time.time()
1143         elapsed = now - self._time_contacting_helper_start
1144         self._elapsed_time_contacting_helper = elapsed
1145         if upload_helper:
1146             self.log("helper says we need to upload")
1147             self._upload_status.set_status("Uploading Ciphertext")
1148             # we need to upload the file
1149             reu = RemoteEncryptedUploadable(self._encuploadable,
1150                                             self._upload_status)
1151             # let it pre-compute the size for progress purposes
1152             d = reu.get_size()
1153             d.addCallback(lambda ignored:
1154                           upload_helper.callRemote("upload", reu))
1155             # this Deferred will fire with the upload results
1156             return d
1157         self.log("helper says file is already uploaded")
1158         self._upload_status.set_progress(1, 1.0)
1159         self._upload_status.set_results(upload_results)
1160         return upload_results
1161
1162     def _convert_old_upload_results(self, upload_results):
1163         # pre-1.3.0 helpers return upload results which contain a mapping
1164         # from shnum to a single human-readable string, containing things
1165         # like "Found on [x],[y],[z]" (for healthy files that were already in
1166         # the grid), "Found on [x]" (for files that needed upload but which
1167         # discovered pre-existing shares), and "Placed on [x]" (for newly
1168         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1169         # set of binary serverid strings.
1170
1171         # the old results are too hard to deal with (they don't even contain
1172         # as much information as the new results, since the nodeids are
1173         # abbreviated), so if we detect old results, just clobber them.
1174
1175         sharemap = upload_results.sharemap
1176         if str in [type(v) for v in sharemap.values()]:
1177             upload_results.sharemap = None
1178
1179     def _build_verifycap(self, upload_results):
1180         self.log("upload finished, building readcap")
1181         self._convert_old_upload_results(upload_results)
1182         self._upload_status.set_status("Building Readcap")
1183         r = upload_results
1184         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1185         assert r.uri_extension_data["total_shares"] == self._total_shares
1186         assert r.uri_extension_data["segment_size"] == self._segment_size
1187         assert r.uri_extension_data["size"] == self._size
1188         r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1189                                              uri_extension_hash=r.uri_extension_hash,
1190                                              needed_shares=self._needed_shares,
1191                                              total_shares=self._total_shares, size=self._size
1192                                              ).to_string()
1193         now = time.time()
1194         r.file_size = self._size
1195         r.timings["storage_index"] = self._storage_index_elapsed
1196         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1197         if "total" in r.timings:
1198             r.timings["helper_total"] = r.timings["total"]
1199         r.timings["total"] = now - self._started
1200         self._upload_status.set_status("Finished")
1201         self._upload_status.set_results(r)
1202         return r
1203
1204     def get_upload_status(self):
1205         return self._upload_status
1206
1207 class BaseUploadable:
1208     default_max_segment_size = 128*KiB # overridden by max_segment_size
1209     default_encoding_param_k = 3 # overridden by encoding_parameters
1210     default_encoding_param_happy = 7
1211     default_encoding_param_n = 10
1212
1213     max_segment_size = None
1214     encoding_param_k = None
1215     encoding_param_happy = None
1216     encoding_param_n = None
1217
1218     _all_encoding_parameters = None
1219     _status = None
1220
1221     def set_upload_status(self, upload_status):
1222         self._status = IUploadStatus(upload_status)
1223
1224     def set_default_encoding_parameters(self, default_params):
1225         assert isinstance(default_params, dict)
1226         for k,v in default_params.items():
1227             precondition(isinstance(k, str), k, v)
1228             precondition(isinstance(v, int), k, v)
1229         if "k" in default_params:
1230             self.default_encoding_param_k = default_params["k"]
1231         if "happy" in default_params:
1232             self.default_encoding_param_happy = default_params["happy"]
1233         if "n" in default_params:
1234             self.default_encoding_param_n = default_params["n"]
1235         if "max_segment_size" in default_params:
1236             self.default_max_segment_size = default_params["max_segment_size"]
1237
1238     def get_all_encoding_parameters(self):
1239         if self._all_encoding_parameters:
1240             return defer.succeed(self._all_encoding_parameters)
1241
1242         max_segsize = self.max_segment_size or self.default_max_segment_size
1243         k = self.encoding_param_k or self.default_encoding_param_k
1244         happy = self.encoding_param_happy or self.default_encoding_param_happy
1245         n = self.encoding_param_n or self.default_encoding_param_n
1246
1247         d = self.get_size()
1248         def _got_size(file_size):
1249             # for small files, shrink the segment size to avoid wasting space
1250             segsize = min(max_segsize, file_size)
1251             # this must be a multiple of 'required_shares'==k
1252             segsize = mathutil.next_multiple(segsize, k)
1253             encoding_parameters = (k, happy, n, segsize)
1254             self._all_encoding_parameters = encoding_parameters
1255             return encoding_parameters
1256         d.addCallback(_got_size)
1257         return d
1258
1259 class FileHandle(BaseUploadable):
1260     implements(IUploadable)
1261
1262     def __init__(self, filehandle, convergence):
1263         """
1264         Upload the data from the filehandle.  If convergence is None then a
1265         random encryption key will be used, else the plaintext will be hashed,
1266         then the hash will be hashed together with the string in the
1267         "convergence" argument to form the encryption key.
1268         """
1269         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1270         self._filehandle = filehandle
1271         self._key = None
1272         self.convergence = convergence
1273         self._size = None
1274
1275     def _get_encryption_key_convergent(self):
1276         if self._key is not None:
1277             return defer.succeed(self._key)
1278
1279         d = self.get_size()
1280         # that sets self._size as a side-effect
1281         d.addCallback(lambda size: self.get_all_encoding_parameters())
1282         def _got(params):
1283             k, happy, n, segsize = params
1284             f = self._filehandle
1285             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1286             f.seek(0)
1287             BLOCKSIZE = 64*1024
1288             bytes_read = 0
1289             while True:
1290                 data = f.read(BLOCKSIZE)
1291                 if not data:
1292                     break
1293                 enckey_hasher.update(data)
1294                 # TODO: setting progress in a non-yielding loop is kind of
1295                 # pointless, but I'm anticipating (perhaps prematurely) the
1296                 # day when we use a slowjob or twisted's CooperatorService to
1297                 # make this yield time to other jobs.
1298                 bytes_read += len(data)
1299                 if self._status:
1300                     self._status.set_progress(0, float(bytes_read)/self._size)
1301             f.seek(0)
1302             self._key = enckey_hasher.digest()
1303             if self._status:
1304                 self._status.set_progress(0, 1.0)
1305             assert len(self._key) == 16
1306             return self._key
1307         d.addCallback(_got)
1308         return d
1309
1310     def _get_encryption_key_random(self):
1311         if self._key is None:
1312             self._key = os.urandom(16)
1313         return defer.succeed(self._key)
1314
1315     def get_encryption_key(self):
1316         if self.convergence is not None:
1317             return self._get_encryption_key_convergent()
1318         else:
1319             return self._get_encryption_key_random()
1320
1321     def get_size(self):
1322         if self._size is not None:
1323             return defer.succeed(self._size)
1324         self._filehandle.seek(0,2)
1325         size = self._filehandle.tell()
1326         self._size = size
1327         self._filehandle.seek(0)
1328         return defer.succeed(size)
1329
1330     def read(self, length):
1331         return defer.succeed([self._filehandle.read(length)])
1332
1333     def close(self):
1334         # the originator of the filehandle reserves the right to close it
1335         pass
1336
1337 class FileName(FileHandle):
1338     def __init__(self, filename, convergence):
1339         """
1340         Upload the data from the filename.  If convergence is None then a
1341         random encryption key will be used, else the plaintext will be hashed,
1342         then the hash will be hashed together with the string in the
1343         "convergence" argument to form the encryption key.
1344         """
1345         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1346         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1347     def close(self):
1348         FileHandle.close(self)
1349         self._filehandle.close()
1350
1351 class Data(FileHandle):
1352     def __init__(self, data, convergence):
1353         """
1354         Upload the data from the data argument.  If convergence is None then a
1355         random encryption key will be used, else the plaintext will be hashed,
1356         then the hash will be hashed together with the string in the
1357         "convergence" argument to form the encryption key.
1358         """
1359         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1360         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1361
1362 class Uploader(service.MultiService, log.PrefixingLogMixin):
1363     """I am a service that allows file uploading. I am a service-child of the
1364     Client.
1365     """
1366     implements(IUploader)
1367     name = "uploader"
1368     URI_LIT_SIZE_THRESHOLD = 55
1369
1370     def __init__(self, helper_furl=None, stats_provider=None):
1371         self._helper_furl = helper_furl
1372         self.stats_provider = stats_provider
1373         self._helper = None
1374         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1375         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1376         service.MultiService.__init__(self)
1377
1378     def startService(self):
1379         service.MultiService.startService(self)
1380         if self._helper_furl:
1381             self.parent.tub.connectTo(self._helper_furl,
1382                                       self._got_helper)
1383
1384     def _got_helper(self, helper):
1385         self.log("got helper connection, getting versions")
1386         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1387                     { },
1388                     "application-version": "unknown: no get_version()",
1389                     }
1390         d = add_version_to_remote_reference(helper, default)
1391         d.addCallback(self._got_versioned_helper)
1392
1393     def _got_versioned_helper(self, helper):
1394         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1395         if needed not in helper.version:
1396             raise InsufficientVersionError(needed, helper.version)
1397         self._helper = helper
1398         helper.notifyOnDisconnect(self._lost_helper)
1399
1400     def _lost_helper(self):
1401         self._helper = None
1402
1403     def get_helper_info(self):
1404         # return a tuple of (helper_furl_or_None, connected_bool)
1405         return (self._helper_furl, bool(self._helper))
1406
1407
1408     def upload(self, uploadable, history=None):
1409         """
1410         Returns a Deferred that will fire with the UploadResults instance.
1411         """
1412         assert self.parent
1413         assert self.running
1414
1415         uploadable = IUploadable(uploadable)
1416         d = uploadable.get_size()
1417         def _got_size(size):
1418             default_params = self.parent.get_encoding_parameters()
1419             precondition(isinstance(default_params, dict), default_params)
1420             precondition("max_segment_size" in default_params, default_params)
1421             uploadable.set_default_encoding_parameters(default_params)
1422
1423             if self.stats_provider:
1424                 self.stats_provider.count('uploader.files_uploaded', 1)
1425                 self.stats_provider.count('uploader.bytes_uploaded', size)
1426
1427             if size <= self.URI_LIT_SIZE_THRESHOLD:
1428                 uploader = LiteralUploader()
1429                 return uploader.start(uploadable)
1430             else:
1431                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1432                 d2 = defer.succeed(None)
1433                 if self._helper:
1434                     uploader = AssistedUploader(self._helper)
1435                     d2.addCallback(lambda x: eu.get_storage_index())
1436                     d2.addCallback(lambda si: uploader.start(eu, si))
1437                 else:
1438                     storage_broker = self.parent.get_storage_broker()
1439                     secret_holder = self.parent._secret_holder
1440                     uploader = CHKUploader(storage_broker, secret_holder)
1441                     d2.addCallback(lambda x: uploader.start(eu))
1442
1443                 self._all_uploads[uploader] = None
1444                 if history:
1445                     history.add_upload(uploader.get_upload_status())
1446                 def turn_verifycap_into_read_cap(uploadresults):
1447                     # Generate the uri from the verifycap plus the key.
1448                     d3 = uploadable.get_encryption_key()
1449                     def put_readcap_into_results(key):
1450                         v = uri.from_string(uploadresults.verifycapstr)
1451                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1452                         uploadresults.uri = r.to_string()
1453                         return uploadresults
1454                     d3.addCallback(put_readcap_into_results)
1455                     return d3
1456                 d2.addCallback(turn_verifycap_into_read_cap)
1457                 return d2
1458         d.addCallback(_got_size)
1459         def _done(res):
1460             uploadable.close()
1461             return res
1462         d.addBoth(_done)
1463         return d