]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
Fix up the behavior of #778, per reviewers' comments
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17                                          shares_by_server, merge_peers, \
18                                          failure_message
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23      NoServersError, InsufficientVersionError, UploadUnhappinessError
24 from allmydata.immutable import layout
25 from pycryptopp.cipher.aes import AES
26
27 from cStringIO import StringIO
28
29
30 KiB=1024
31 MiB=1024*KiB
32 GiB=1024*MiB
33 TiB=1024*GiB
34 PiB=1024*TiB
35
36 class HaveAllPeersError(Exception):
37     # we use this to jump out of the loop
38     pass
39
40 # this wants to live in storage, not here
41 class TooFullError(Exception):
42     pass
43
44 class UploadResults(Copyable, RemoteCopy):
45     implements(IUploadResults)
46     # note: don't change this string, it needs to match the value used on the
47     # helper, and it does *not* need to match the fully-qualified
48     # package/module/class name
49     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
50     copytype = typeToCopy
51
52     # also, think twice about changing the shape of any existing attribute,
53     # because instances of this class are sent from the helper to its client,
54     # so changing this may break compatibility. Consider adding new fields
55     # instead of modifying existing ones.
56
57     def __init__(self):
58         self.timings = {} # dict of name to number of seconds
59         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
60         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
61         self.file_size = None
62         self.ciphertext_fetched = None # how much the helper fetched
63         self.uri = None
64         self.preexisting_shares = None # count of shares already present
65         self.pushed_shares = None # count of shares we pushed
66
67
68 # our current uri_extension is 846 bytes for small files, a few bytes
69 # more for larger ones (since the filesize is encoded in decimal in a
70 # few places). Ask for a little bit more just in case we need it. If
71 # the extension changes size, we can change EXTENSION_SIZE to
72 # allocate a more accurate amount of space.
73 EXTENSION_SIZE = 1000
74 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
75 # this.
76
77 class PeerTracker:
78     def __init__(self, peerid, storage_server,
79                  sharesize, blocksize, num_segments, num_share_hashes,
80                  storage_index,
81                  bucket_renewal_secret, bucket_cancel_secret):
82         precondition(isinstance(peerid, str), peerid)
83         precondition(len(peerid) == 20, peerid)
84         self.peerid = peerid
85         self._storageserver = storage_server # to an RIStorageServer
86         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
87         self.sharesize = sharesize
88
89         wbp = layout.make_write_bucket_proxy(None, sharesize,
90                                              blocksize, num_segments,
91                                              num_share_hashes,
92                                              EXTENSION_SIZE, peerid)
93         self.wbp_class = wbp.__class__ # to create more of them
94         self.allocated_size = wbp.get_allocated_size()
95         self.blocksize = blocksize
96         self.num_segments = num_segments
97         self.num_share_hashes = num_share_hashes
98         self.storage_index = storage_index
99
100         self.renew_secret = bucket_renewal_secret
101         self.cancel_secret = bucket_cancel_secret
102
103     def __repr__(self):
104         return ("<PeerTracker for peer %s and SI %s>"
105                 % (idlib.shortnodeid_b2a(self.peerid),
106                    si_b2a(self.storage_index)[:5]))
107
108     def query(self, sharenums):
109         d = self._storageserver.callRemote("allocate_buckets",
110                                            self.storage_index,
111                                            self.renew_secret,
112                                            self.cancel_secret,
113                                            sharenums,
114                                            self.allocated_size,
115                                            canary=Referenceable())
116         d.addCallback(self._got_reply)
117         return d
118
119     def ask_about_existing_shares(self):
120         return self._storageserver.callRemote("get_buckets",
121                                               self.storage_index)
122
123     def _got_reply(self, (alreadygot, buckets)):
124         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
125         b = {}
126         for sharenum, rref in buckets.iteritems():
127             bp = self.wbp_class(rref, self.sharesize,
128                                 self.blocksize,
129                                 self.num_segments,
130                                 self.num_share_hashes,
131                                 EXTENSION_SIZE,
132                                 self.peerid)
133             b[sharenum] = bp
134         self.buckets.update(b)
135         return (alreadygot, set(b.keys()))
136
137
138 class Tahoe2PeerSelector:
139
140     def __init__(self, upload_id, logparent=None, upload_status=None):
141         self.upload_id = upload_id
142         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
143         # Peers that are working normally, but full.
144         self.full_count = 0
145         self.error_count = 0
146         self.num_peers_contacted = 0
147         self.last_failure_msg = None
148         self._status = IUploadStatus(upload_status)
149         self._log_parent = log.msg("%s starting" % self, parent=logparent)
150
151     def __repr__(self):
152         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
153
154     def get_shareholders(self, storage_broker, secret_holder,
155                          storage_index, share_size, block_size,
156                          num_segments, total_shares, needed_shares,
157                          servers_of_happiness):
158         """
159         @return: (used_peers, already_peers), where used_peers is a set of
160                  PeerTracker instances that have agreed to hold some shares
161                  for us (the shnum is stashed inside the PeerTracker),
162                  and already_peers is a dict mapping shnum to a set of peers
163                  which claim to already have the share.
164         """
165
166         if self._status:
167             self._status.set_status("Contacting Peers..")
168
169         self.total_shares = total_shares
170         self.servers_of_happiness = servers_of_happiness
171         self.needed_shares = needed_shares
172
173         self.homeless_shares = range(total_shares)
174         self.contacted_peers = [] # peers worth asking again
175         self.contacted_peers2 = [] # peers that we have asked again
176         self._started_second_pass = False
177         self.use_peers = set() # PeerTrackers that have shares assigned to them
178         self.preexisting_shares = {} # shareid => set(peerids) holding shareid
179         # We don't try to allocate shares to these servers, since they've said
180         # that they're incapable of storing shares of the size that we'd want
181         # to store. We keep them around because they may have existing shares
182         # for this storage index, which we want to know about for accurate
183         # servers_of_happiness accounting
184         # (this is eventually a list, but it is initialized later)
185         self.readonly_peers = None
186         # These peers have shares -- any shares -- for our SI. We keep
187         # track of these to write an error message with them later.
188         self.peers_with_shares = set()
189
190         # this needed_hashes computation should mirror
191         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
192         # (instead of a HashTree) because we don't require actual hashing
193         # just to count the levels.
194         ht = hashtree.IncompleteHashTree(total_shares)
195         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
196
197         # figure out how much space to ask for
198         wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
199                                              num_share_hashes, EXTENSION_SIZE,
200                                              None)
201         allocated_size = wbp.get_allocated_size()
202         all_peers = storage_broker.get_servers_for_index(storage_index)
203         if not all_peers:
204             raise NoServersError("client gave us zero peers")
205
206         # filter the list of peers according to which ones can accomodate
207         # this request. This excludes older peers (which used a 4-byte size
208         # field) from getting large shares (for files larger than about
209         # 12GiB). See #439 for details.
210         def _get_maxsize(peer):
211             (peerid, conn) = peer
212             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
213             return v1["maximum-immutable-share-size"]
214         writable_peers = [peer for peer in all_peers
215                           if _get_maxsize(peer) >= allocated_size]
216         readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers)
217
218         # decide upon the renewal/cancel secrets, to include them in the
219         # allocate_buckets query.
220         client_renewal_secret = secret_holder.get_renewal_secret()
221         client_cancel_secret = secret_holder.get_cancel_secret()
222
223         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
224                                                        storage_index)
225         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
226                                                      storage_index)
227         def _make_trackers(peers):
228            return [PeerTracker(peerid, conn,
229                                share_size, block_size,
230                                num_segments, num_share_hashes,
231                                storage_index,
232                                bucket_renewal_secret_hash(file_renewal_secret,
233                                                           peerid),
234                                bucket_cancel_secret_hash(file_cancel_secret,
235                                                          peerid))
236                     for (peerid, conn) in peers]
237         self.uncontacted_peers = _make_trackers(writable_peers)
238         self.readonly_peers = _make_trackers(readonly_peers)
239         # We now ask peers that can't hold any new shares about existing
240         # shares that they might have for our SI. Once this is done, we
241         # start placing the shares that we haven't already accounted
242         # for.
243         ds = []
244         if self._status and self.readonly_peers:
245             self._status.set_status("Contacting readonly peers to find "
246                                     "any existing shares")
247         for peer in self.readonly_peers:
248             assert isinstance(peer, PeerTracker)
249             d = peer.ask_about_existing_shares()
250             d.addBoth(self._handle_existing_response, peer.peerid)
251             ds.append(d)
252             self.num_peers_contacted += 1
253             self.query_count += 1
254             log.msg("asking peer %s for any existing shares for "
255                     "upload id %s"
256                     % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
257                     level=log.NOISY, parent=self._log_parent)
258         dl = defer.DeferredList(ds)
259         dl.addCallback(lambda ign: self._loop())
260         return dl
261
262
263     def _handle_existing_response(self, res, peer):
264         """
265         I handle responses to the queries sent by
266         Tahoe2PeerSelector._existing_shares.
267         """
268         if isinstance(res, failure.Failure):
269             log.msg("%s got error during existing shares check: %s"
270                     % (idlib.shortnodeid_b2a(peer), res),
271                     level=log.UNUSUAL, parent=self._log_parent)
272             self.error_count += 1
273             self.bad_query_count += 1
274         else:
275             buckets = res
276             if buckets:
277                 self.peers_with_shares.add(peer)
278             log.msg("response from peer %s: alreadygot=%s"
279                     % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
280                     level=log.NOISY, parent=self._log_parent)
281             for bucket in buckets:
282                 self.preexisting_shares.setdefault(bucket, set()).add(peer)
283                 if self.homeless_shares and bucket in self.homeless_shares:
284                     self.homeless_shares.remove(bucket)
285             self.full_count += 1
286             self.bad_query_count += 1
287
288
289     def _get_progress_message(self):
290         if not self.homeless_shares:
291             msg = "placed all %d shares, " % (self.total_shares)
292         else:
293             msg = ("placed %d shares out of %d total (%d homeless), " %
294                    (self.total_shares - len(self.homeless_shares),
295                     self.total_shares,
296                     len(self.homeless_shares)))
297         return (msg + "want to place shares on at least %d servers such that "
298                       "any %d of them have enough shares to recover the file, "
299                       "sent %d queries to %d peers, "
300                       "%d queries placed some shares, %d placed none "
301                       "(of which %d placed none due to the server being"
302                       " full and %d placed none due to an error)" %
303                         (self.servers_of_happiness, self.needed_shares,
304                          self.query_count, self.num_peers_contacted,
305                          self.good_query_count, self.bad_query_count,
306                          self.full_count, self.error_count))
307
308
309     def _loop(self):
310         if not self.homeless_shares:
311             merged = merge_peers(self.preexisting_shares, self.use_peers)
312             effective_happiness = servers_of_happiness(merged)
313             if self.servers_of_happiness <= effective_happiness:
314                 msg = ("peer selection successful for %s: %s" % (self,
315                             self._get_progress_message()))
316                 log.msg(msg, parent=self._log_parent)
317                 return (self.use_peers, self.preexisting_shares)
318             else:
319                 # We're not okay right now, but maybe we can fix it by
320                 # redistributing some shares. In cases where one or two
321                 # servers has, before the upload, all or most of the
322                 # shares for a given SI, this can work by allowing _loop
323                 # a chance to spread those out over the other peers, 
324                 delta = self.servers_of_happiness - effective_happiness
325                 shares = shares_by_server(self.preexisting_shares)
326                 # Each server in shares maps to a set of shares stored on it.
327                 # Since we want to keep at least one share on each server 
328                 # that has one (otherwise we'd only be making
329                 # the situation worse by removing distinct servers),
330                 # each server has len(its shares) - 1 to spread around.
331                 shares_to_spread = sum([len(list(sharelist)) - 1
332                                         for (server, sharelist)
333                                         in shares.items()])
334                 if delta <= len(self.uncontacted_peers) and \
335                    shares_to_spread >= delta:
336                     items = shares.items()
337                     while len(self.homeless_shares) < delta:
338                         # Loop through the allocated shares, removing
339                         # one from each server that has more than one
340                         # and putting it back into self.homeless_shares
341                         # until we've done this delta times.
342                         server, sharelist = items.pop()
343                         if len(sharelist) > 1:
344                             share = sharelist.pop()
345                             self.homeless_shares.append(share)
346                             self.preexisting_shares[share].remove(server)
347                             if not self.preexisting_shares[share]:
348                                 del self.preexisting_shares[share]
349                             items.append((server, sharelist))
350                     return self._loop()
351                 else:
352                     # Redistribution won't help us; fail.
353                     peer_count = len(self.peers_with_shares)
354                     # If peer_count < needed_shares, then the second error
355                     # message is nonsensical, so we use this one.
356                     msg = failure_message(peer_count,
357                                           self.needed_shares,
358                                           self.servers_of_happiness,
359                                           effective_happiness)
360                     raise UploadUnhappinessError("%s (%s)" % (msg,
361                                                  self._get_progress_message()))
362
363         if self.uncontacted_peers:
364             peer = self.uncontacted_peers.pop(0)
365             # TODO: don't pre-convert all peerids to PeerTrackers
366             assert isinstance(peer, PeerTracker)
367
368             shares_to_ask = set([self.homeless_shares.pop(0)])
369             self.query_count += 1
370             self.num_peers_contacted += 1
371             if self._status:
372                 self._status.set_status("Contacting Peers [%s] (first query),"
373                                         " %d shares left.."
374                                         % (idlib.shortnodeid_b2a(peer.peerid),
375                                            len(self.homeless_shares)))
376             d = peer.query(shares_to_ask)
377             d.addBoth(self._got_response, peer, shares_to_ask,
378                       self.contacted_peers)
379             return d
380         elif self.contacted_peers:
381             # ask a peer that we've already asked.
382             if not self._started_second_pass:
383                 log.msg("starting second pass", parent=self._log_parent,
384                         level=log.NOISY)
385                 self._started_second_pass = True
386             num_shares = mathutil.div_ceil(len(self.homeless_shares),
387                                            len(self.contacted_peers))
388             peer = self.contacted_peers.pop(0)
389             shares_to_ask = set(self.homeless_shares[:num_shares])
390             self.homeless_shares[:num_shares] = []
391             self.query_count += 1
392             if self._status:
393                 self._status.set_status("Contacting Peers [%s] (second query),"
394                                         " %d shares left.."
395                                         % (idlib.shortnodeid_b2a(peer.peerid),
396                                            len(self.homeless_shares)))
397             d = peer.query(shares_to_ask)
398             d.addBoth(self._got_response, peer, shares_to_ask,
399                       self.contacted_peers2)
400             return d
401         elif self.contacted_peers2:
402             # we've finished the second-or-later pass. Move all the remaining
403             # peers back into self.contacted_peers for the next pass.
404             self.contacted_peers.extend(self.contacted_peers2)
405             self.contacted_peers2[:] = []
406             return self._loop()
407         else:
408             # no more peers. If we haven't placed enough shares, we fail.
409             placed_shares = self.total_shares - len(self.homeless_shares)
410             merged = merge_peers(self.preexisting_shares, self.use_peers)
411             effective_happiness = servers_of_happiness(merged)
412             if effective_happiness < self.servers_of_happiness:
413                 msg = failure_message(len(self.peers_with_shares),
414                                       self.needed_shares,
415                                       self.servers_of_happiness,
416                                       effective_happiness)
417                 msg = ("peer selection failed for %s: %s (%s)" % (self,
418                                 msg,
419                                 self._get_progress_message()))
420                 if self.last_failure_msg:
421                     msg += " (%s)" % (self.last_failure_msg,)
422                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
423                 raise UploadUnhappinessError(msg)
424             else:
425                 # we placed enough to be happy, so we're done
426                 if self._status:
427                     self._status.set_status("Placed all shares")
428                 return (self.use_peers, self.preexisting_shares)
429
430     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
431         if isinstance(res, failure.Failure):
432             # This is unusual, and probably indicates a bug or a network
433             # problem.
434             log.msg("%s got error during peer selection: %s" % (peer, res),
435                     level=log.UNUSUAL, parent=self._log_parent)
436             self.error_count += 1
437             self.bad_query_count += 1
438             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
439             if (self.uncontacted_peers
440                 or self.contacted_peers
441                 or self.contacted_peers2):
442                 # there is still hope, so just loop
443                 pass
444             else:
445                 # No more peers, so this upload might fail (it depends upon
446                 # whether we've hit servers_of_happiness or not). Log the last
447                 # failure we got: if a coding error causes all peers to fail
448                 # in the same way, this allows the common failure to be seen
449                 # by the uploader and should help with debugging
450                 msg = ("last failure (from %s) was: %s" % (peer, res))
451                 self.last_failure_msg = msg
452         else:
453             (alreadygot, allocated) = res
454             log.msg("response from peer %s: alreadygot=%s, allocated=%s"
455                     % (idlib.shortnodeid_b2a(peer.peerid),
456                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
457                     level=log.NOISY, parent=self._log_parent)
458             progress = False
459             for s in alreadygot:
460                 self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
461                 if s in self.homeless_shares:
462                     self.homeless_shares.remove(s)
463                     progress = True
464                 elif s in shares_to_ask:
465                     progress = True
466
467             # the PeerTracker will remember which shares were allocated on
468             # that peer. We just have to remember to use them.
469             if allocated:
470                 self.use_peers.add(peer)
471                 progress = True
472
473             if allocated or alreadygot:
474                 self.peers_with_shares.add(peer.peerid)
475
476             not_yet_present = set(shares_to_ask) - set(alreadygot)
477             still_homeless = not_yet_present - set(allocated)
478
479             if progress:
480                 # They accepted at least one of the shares that we asked
481                 # them to accept, or they had a share that we didn't ask
482                 # them to accept but that we hadn't placed yet, so this
483                 # was a productive query
484                 self.good_query_count += 1
485             else:
486                 self.bad_query_count += 1
487                 self.full_count += 1
488
489             if still_homeless:
490                 # In networks with lots of space, this is very unusual and
491                 # probably indicates an error. In networks with peers that
492                 # are full, it is merely unusual. In networks that are very
493                 # full, it is common, and many uploads will fail. In most
494                 # cases, this is obviously not fatal, and we'll just use some
495                 # other peers.
496
497                 # some shares are still homeless, keep trying to find them a
498                 # home. The ones that were rejected get first priority.
499                 self.homeless_shares = (list(still_homeless)
500                                         + self.homeless_shares)
501                 # Since they were unable to accept all of our requests, so it
502                 # is safe to assume that asking them again won't help.
503             else:
504                 # if they *were* able to accept everything, they might be
505                 # willing to accept even more.
506                 put_peer_here.append(peer)
507
508         # now loop
509         return self._loop()
510
511
512 class EncryptAnUploadable:
513     """This is a wrapper that takes an IUploadable and provides
514     IEncryptedUploadable."""
515     implements(IEncryptedUploadable)
516     CHUNKSIZE = 50*1024
517
518     def __init__(self, original, log_parent=None):
519         self.original = IUploadable(original)
520         self._log_number = log_parent
521         self._encryptor = None
522         self._plaintext_hasher = plaintext_hasher()
523         self._plaintext_segment_hasher = None
524         self._plaintext_segment_hashes = []
525         self._encoding_parameters = None
526         self._file_size = None
527         self._ciphertext_bytes_read = 0
528         self._status = None
529
530     def set_upload_status(self, upload_status):
531         self._status = IUploadStatus(upload_status)
532         self.original.set_upload_status(upload_status)
533
534     def log(self, *args, **kwargs):
535         if "facility" not in kwargs:
536             kwargs["facility"] = "upload.encryption"
537         if "parent" not in kwargs:
538             kwargs["parent"] = self._log_number
539         return log.msg(*args, **kwargs)
540
541     def get_size(self):
542         if self._file_size is not None:
543             return defer.succeed(self._file_size)
544         d = self.original.get_size()
545         def _got_size(size):
546             self._file_size = size
547             if self._status:
548                 self._status.set_size(size)
549             return size
550         d.addCallback(_got_size)
551         return d
552
553     def get_all_encoding_parameters(self):
554         if self._encoding_parameters is not None:
555             return defer.succeed(self._encoding_parameters)
556         d = self.original.get_all_encoding_parameters()
557         def _got(encoding_parameters):
558             (k, happy, n, segsize) = encoding_parameters
559             self._segment_size = segsize # used by segment hashers
560             self._encoding_parameters = encoding_parameters
561             self.log("my encoding parameters: %s" % (encoding_parameters,),
562                      level=log.NOISY)
563             return encoding_parameters
564         d.addCallback(_got)
565         return d
566
567     def _get_encryptor(self):
568         if self._encryptor:
569             return defer.succeed(self._encryptor)
570
571         d = self.original.get_encryption_key()
572         def _got(key):
573             e = AES(key)
574             self._encryptor = e
575
576             storage_index = storage_index_hash(key)
577             assert isinstance(storage_index, str)
578             # There's no point to having the SI be longer than the key, so we
579             # specify that it is truncated to the same 128 bits as the AES key.
580             assert len(storage_index) == 16  # SHA-256 truncated to 128b
581             self._storage_index = storage_index
582             if self._status:
583                 self._status.set_storage_index(storage_index)
584             return e
585         d.addCallback(_got)
586         return d
587
588     def get_storage_index(self):
589         d = self._get_encryptor()
590         d.addCallback(lambda res: self._storage_index)
591         return d
592
593     def _get_segment_hasher(self):
594         p = self._plaintext_segment_hasher
595         if p:
596             left = self._segment_size - self._plaintext_segment_hashed_bytes
597             return p, left
598         p = plaintext_segment_hasher()
599         self._plaintext_segment_hasher = p
600         self._plaintext_segment_hashed_bytes = 0
601         return p, self._segment_size
602
603     def _update_segment_hash(self, chunk):
604         offset = 0
605         while offset < len(chunk):
606             p, segment_left = self._get_segment_hasher()
607             chunk_left = len(chunk) - offset
608             this_segment = min(chunk_left, segment_left)
609             p.update(chunk[offset:offset+this_segment])
610             self._plaintext_segment_hashed_bytes += this_segment
611
612             if self._plaintext_segment_hashed_bytes == self._segment_size:
613                 # we've filled this segment
614                 self._plaintext_segment_hashes.append(p.digest())
615                 self._plaintext_segment_hasher = None
616                 self.log("closed hash [%d]: %dB" %
617                          (len(self._plaintext_segment_hashes)-1,
618                           self._plaintext_segment_hashed_bytes),
619                          level=log.NOISY)
620                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
621                          segnum=len(self._plaintext_segment_hashes)-1,
622                          hash=base32.b2a(p.digest()),
623                          level=log.NOISY)
624
625             offset += this_segment
626
627
628     def read_encrypted(self, length, hash_only):
629         # make sure our parameters have been set up first
630         d = self.get_all_encoding_parameters()
631         # and size
632         d.addCallback(lambda ignored: self.get_size())
633         d.addCallback(lambda ignored: self._get_encryptor())
634         # then fetch and encrypt the plaintext. The unusual structure here
635         # (passing a Deferred *into* a function) is needed to avoid
636         # overflowing the stack: Deferreds don't optimize out tail recursion.
637         # We also pass in a list, to which _read_encrypted will append
638         # ciphertext.
639         ciphertext = []
640         d2 = defer.Deferred()
641         d.addCallback(lambda ignored:
642                       self._read_encrypted(length, ciphertext, hash_only, d2))
643         d.addCallback(lambda ignored: d2)
644         return d
645
646     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
647         if not remaining:
648             fire_when_done.callback(ciphertext)
649             return None
650         # tolerate large length= values without consuming a lot of RAM by
651         # reading just a chunk (say 50kB) at a time. This only really matters
652         # when hash_only==True (i.e. resuming an interrupted upload), since
653         # that's the case where we will be skipping over a lot of data.
654         size = min(remaining, self.CHUNKSIZE)
655         remaining = remaining - size
656         # read a chunk of plaintext..
657         d = defer.maybeDeferred(self.original.read, size)
658         # N.B.: if read() is synchronous, then since everything else is
659         # actually synchronous too, we'd blow the stack unless we stall for a
660         # tick. Once you accept a Deferred from IUploadable.read(), you must
661         # be prepared to have it fire immediately too.
662         d.addCallback(fireEventually)
663         def _good(plaintext):
664             # and encrypt it..
665             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
666             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
667             ciphertext.extend(ct)
668             self._read_encrypted(remaining, ciphertext, hash_only,
669                                  fire_when_done)
670         def _err(why):
671             fire_when_done.errback(why)
672         d.addCallback(_good)
673         d.addErrback(_err)
674         return None
675
676     def _hash_and_encrypt_plaintext(self, data, hash_only):
677         assert isinstance(data, (tuple, list)), type(data)
678         data = list(data)
679         cryptdata = []
680         # we use data.pop(0) instead of 'for chunk in data' to save
681         # memory: each chunk is destroyed as soon as we're done with it.
682         bytes_processed = 0
683         while data:
684             chunk = data.pop(0)
685             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
686                      level=log.NOISY)
687             bytes_processed += len(chunk)
688             self._plaintext_hasher.update(chunk)
689             self._update_segment_hash(chunk)
690             # TODO: we have to encrypt the data (even if hash_only==True)
691             # because pycryptopp's AES-CTR implementation doesn't offer a
692             # way to change the counter value. Once pycryptopp acquires
693             # this ability, change this to simply update the counter
694             # before each call to (hash_only==False) _encryptor.process()
695             ciphertext = self._encryptor.process(chunk)
696             if hash_only:
697                 self.log("  skipping encryption", level=log.NOISY)
698             else:
699                 cryptdata.append(ciphertext)
700             del ciphertext
701             del chunk
702         self._ciphertext_bytes_read += bytes_processed
703         if self._status:
704             progress = float(self._ciphertext_bytes_read) / self._file_size
705             self._status.set_progress(1, progress)
706         return cryptdata
707
708
709     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
710         # this is currently unused, but will live again when we fix #453
711         if len(self._plaintext_segment_hashes) < num_segments:
712             # close out the last one
713             assert len(self._plaintext_segment_hashes) == num_segments-1
714             p, segment_left = self._get_segment_hasher()
715             self._plaintext_segment_hashes.append(p.digest())
716             del self._plaintext_segment_hasher
717             self.log("closing plaintext leaf hasher, hashed %d bytes" %
718                      self._plaintext_segment_hashed_bytes,
719                      level=log.NOISY)
720             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
721                      segnum=len(self._plaintext_segment_hashes)-1,
722                      hash=base32.b2a(p.digest()),
723                      level=log.NOISY)
724         assert len(self._plaintext_segment_hashes) == num_segments
725         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
726
727     def get_plaintext_hash(self):
728         h = self._plaintext_hasher.digest()
729         return defer.succeed(h)
730
731     def close(self):
732         return self.original.close()
733
734 class UploadStatus:
735     implements(IUploadStatus)
736     statusid_counter = itertools.count(0)
737
738     def __init__(self):
739         self.storage_index = None
740         self.size = None
741         self.helper = False
742         self.status = "Not started"
743         self.progress = [0.0, 0.0, 0.0]
744         self.active = True
745         self.results = None
746         self.counter = self.statusid_counter.next()
747         self.started = time.time()
748
749     def get_started(self):
750         return self.started
751     def get_storage_index(self):
752         return self.storage_index
753     def get_size(self):
754         return self.size
755     def using_helper(self):
756         return self.helper
757     def get_status(self):
758         return self.status
759     def get_progress(self):
760         return tuple(self.progress)
761     def get_active(self):
762         return self.active
763     def get_results(self):
764         return self.results
765     def get_counter(self):
766         return self.counter
767
768     def set_storage_index(self, si):
769         self.storage_index = si
770     def set_size(self, size):
771         self.size = size
772     def set_helper(self, helper):
773         self.helper = helper
774     def set_status(self, status):
775         self.status = status
776     def set_progress(self, which, value):
777         # [0]: chk, [1]: ciphertext, [2]: encode+push
778         self.progress[which] = value
779     def set_active(self, value):
780         self.active = value
781     def set_results(self, value):
782         self.results = value
783
784 class CHKUploader:
785     peer_selector_class = Tahoe2PeerSelector
786
787     def __init__(self, storage_broker, secret_holder):
788         # peer_selector needs storage_broker and secret_holder
789         self._storage_broker = storage_broker
790         self._secret_holder = secret_holder
791         self._log_number = self.log("CHKUploader starting", parent=None)
792         self._encoder = None
793         self._results = UploadResults()
794         self._storage_index = None
795         self._upload_status = UploadStatus()
796         self._upload_status.set_helper(False)
797         self._upload_status.set_active(True)
798         self._upload_status.set_results(self._results)
799
800         # locate_all_shareholders() will create the following attribute:
801         # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
802
803     def log(self, *args, **kwargs):
804         if "parent" not in kwargs:
805             kwargs["parent"] = self._log_number
806         if "facility" not in kwargs:
807             kwargs["facility"] = "tahoe.upload"
808         return log.msg(*args, **kwargs)
809
810     def start(self, encrypted_uploadable):
811         """Start uploading the file.
812
813         Returns a Deferred that will fire with the UploadResults instance.
814         """
815
816         self._started = time.time()
817         eu = IEncryptedUploadable(encrypted_uploadable)
818         self.log("starting upload of %s" % eu)
819
820         eu.set_upload_status(self._upload_status)
821         d = self.start_encrypted(eu)
822         def _done(uploadresults):
823             self._upload_status.set_active(False)
824             return uploadresults
825         d.addBoth(_done)
826         return d
827
828     def abort(self):
829         """Call this if the upload must be abandoned before it completes.
830         This will tell the shareholders to delete their partial shares. I
831         return a Deferred that fires when these messages have been acked."""
832         if not self._encoder:
833             # how did you call abort() before calling start() ?
834             return defer.succeed(None)
835         return self._encoder.abort()
836
837     def start_encrypted(self, encrypted):
838         """ Returns a Deferred that will fire with the UploadResults instance. """
839         eu = IEncryptedUploadable(encrypted)
840
841         started = time.time()
842         self._encoder = e = encode.Encoder(self._log_number,
843                                            self._upload_status)
844         d = e.set_encrypted_uploadable(eu)
845         d.addCallback(self.locate_all_shareholders, started)
846         d.addCallback(self.set_shareholders, e)
847         d.addCallback(lambda res: e.start())
848         d.addCallback(self._encrypted_done)
849         return d
850
851     def locate_all_shareholders(self, encoder, started):
852         peer_selection_started = now = time.time()
853         self._storage_index_elapsed = now - started
854         storage_broker = self._storage_broker
855         secret_holder = self._secret_holder
856         storage_index = encoder.get_param("storage_index")
857         self._storage_index = storage_index
858         upload_id = si_b2a(storage_index)[:5]
859         self.log("using storage index %s" % upload_id)
860         peer_selector = self.peer_selector_class(upload_id, self._log_number,
861                                                  self._upload_status)
862
863         share_size = encoder.get_param("share_size")
864         block_size = encoder.get_param("block_size")
865         num_segments = encoder.get_param("num_segments")
866         k,desired,n = encoder.get_param("share_counts")
867
868         self._peer_selection_started = time.time()
869         d = peer_selector.get_shareholders(storage_broker, secret_holder,
870                                            storage_index,
871                                            share_size, block_size,
872                                            num_segments, n, k, desired)
873         def _done(res):
874             self._peer_selection_elapsed = time.time() - peer_selection_started
875             return res
876         d.addCallback(_done)
877         return d
878
879     def set_shareholders(self, (used_peers, already_peers), encoder):
880         """
881         @param used_peers: a sequence of PeerTracker objects
882         @paran already_peers: a dict mapping sharenum to a set of peerids
883                               that claim to already have this share
884         """
885         self.log("_send_shares, used_peers is %s" % (used_peers,))
886         # record already-present shares in self._results
887         self._results.preexisting_shares = len(already_peers)
888
889         self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
890         for peer in used_peers:
891             assert isinstance(peer, PeerTracker)
892         buckets = {}
893         servermap = already_peers.copy()
894         for peer in used_peers:
895             buckets.update(peer.buckets)
896             for shnum in peer.buckets:
897                 self._peer_trackers[shnum] = peer
898                 servermap.setdefault(shnum, set()).add(peer.peerid)
899         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
900         encoder.set_shareholders(buckets, servermap)
901
902     def _encrypted_done(self, verifycap):
903         """ Returns a Deferred that will fire with the UploadResults instance. """
904         r = self._results
905         for shnum in self._encoder.get_shares_placed():
906             peer_tracker = self._peer_trackers[shnum]
907             peerid = peer_tracker.peerid
908             r.sharemap.add(shnum, peerid)
909             r.servermap.add(peerid, shnum)
910         r.pushed_shares = len(self._encoder.get_shares_placed())
911         now = time.time()
912         r.file_size = self._encoder.file_size
913         r.timings["total"] = now - self._started
914         r.timings["storage_index"] = self._storage_index_elapsed
915         r.timings["peer_selection"] = self._peer_selection_elapsed
916         r.timings.update(self._encoder.get_times())
917         r.uri_extension_data = self._encoder.get_uri_extension_data()
918         r.verifycapstr = verifycap.to_string()
919         return r
920
921     def get_upload_status(self):
922         return self._upload_status
923
924 def read_this_many_bytes(uploadable, size, prepend_data=[]):
925     if size == 0:
926         return defer.succeed([])
927     d = uploadable.read(size)
928     def _got(data):
929         assert isinstance(data, list)
930         bytes = sum([len(piece) for piece in data])
931         assert bytes > 0
932         assert bytes <= size
933         remaining = size - bytes
934         if remaining:
935             return read_this_many_bytes(uploadable, remaining,
936                                         prepend_data + data)
937         return prepend_data + data
938     d.addCallback(_got)
939     return d
940
941 class LiteralUploader:
942
943     def __init__(self):
944         self._results = UploadResults()
945         self._status = s = UploadStatus()
946         s.set_storage_index(None)
947         s.set_helper(False)
948         s.set_progress(0, 1.0)
949         s.set_active(False)
950         s.set_results(self._results)
951
952     def start(self, uploadable):
953         uploadable = IUploadable(uploadable)
954         d = uploadable.get_size()
955         def _got_size(size):
956             self._size = size
957             self._status.set_size(size)
958             self._results.file_size = size
959             return read_this_many_bytes(uploadable, size)
960         d.addCallback(_got_size)
961         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
962         d.addCallback(lambda u: u.to_string())
963         d.addCallback(self._build_results)
964         return d
965
966     def _build_results(self, uri):
967         self._results.uri = uri
968         self._status.set_status("Finished")
969         self._status.set_progress(1, 1.0)
970         self._status.set_progress(2, 1.0)
971         return self._results
972
973     def close(self):
974         pass
975
976     def get_upload_status(self):
977         return self._status
978
979 class RemoteEncryptedUploadable(Referenceable):
980     implements(RIEncryptedUploadable)
981
982     def __init__(self, encrypted_uploadable, upload_status):
983         self._eu = IEncryptedUploadable(encrypted_uploadable)
984         self._offset = 0
985         self._bytes_sent = 0
986         self._status = IUploadStatus(upload_status)
987         # we are responsible for updating the status string while we run, and
988         # for setting the ciphertext-fetch progress.
989         self._size = None
990
991     def get_size(self):
992         if self._size is not None:
993             return defer.succeed(self._size)
994         d = self._eu.get_size()
995         def _got_size(size):
996             self._size = size
997             return size
998         d.addCallback(_got_size)
999         return d
1000
1001     def remote_get_size(self):
1002         return self.get_size()
1003     def remote_get_all_encoding_parameters(self):
1004         return self._eu.get_all_encoding_parameters()
1005
1006     def _read_encrypted(self, length, hash_only):
1007         d = self._eu.read_encrypted(length, hash_only)
1008         def _read(strings):
1009             if hash_only:
1010                 self._offset += length
1011             else:
1012                 size = sum([len(data) for data in strings])
1013                 self._offset += size
1014             return strings
1015         d.addCallback(_read)
1016         return d
1017
1018     def remote_read_encrypted(self, offset, length):
1019         # we don't support seek backwards, but we allow skipping forwards
1020         precondition(offset >= 0, offset)
1021         precondition(length >= 0, length)
1022         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1023                      level=log.NOISY)
1024         precondition(offset >= self._offset, offset, self._offset)
1025         if offset > self._offset:
1026             # read the data from disk anyways, to build up the hash tree
1027             skip = offset - self._offset
1028             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1029                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1030             d = self._read_encrypted(skip, hash_only=True)
1031         else:
1032             d = defer.succeed(None)
1033
1034         def _at_correct_offset(res):
1035             assert offset == self._offset, "%d != %d" % (offset, self._offset)
1036             return self._read_encrypted(length, hash_only=False)
1037         d.addCallback(_at_correct_offset)
1038
1039         def _read(strings):
1040             size = sum([len(data) for data in strings])
1041             self._bytes_sent += size
1042             return strings
1043         d.addCallback(_read)
1044         return d
1045
1046     def remote_close(self):
1047         return self._eu.close()
1048
1049
1050 class AssistedUploader:
1051
1052     def __init__(self, helper):
1053         self._helper = helper
1054         self._log_number = log.msg("AssistedUploader starting")
1055         self._storage_index = None
1056         self._upload_status = s = UploadStatus()
1057         s.set_helper(True)
1058         s.set_active(True)
1059
1060     def log(self, *args, **kwargs):
1061         if "parent" not in kwargs:
1062             kwargs["parent"] = self._log_number
1063         return log.msg(*args, **kwargs)
1064
1065     def start(self, encrypted_uploadable, storage_index):
1066         """Start uploading the file.
1067
1068         Returns a Deferred that will fire with the UploadResults instance.
1069         """
1070         precondition(isinstance(storage_index, str), storage_index)
1071         self._started = time.time()
1072         eu = IEncryptedUploadable(encrypted_uploadable)
1073         eu.set_upload_status(self._upload_status)
1074         self._encuploadable = eu
1075         self._storage_index = storage_index
1076         d = eu.get_size()
1077         d.addCallback(self._got_size)
1078         d.addCallback(lambda res: eu.get_all_encoding_parameters())
1079         d.addCallback(self._got_all_encoding_parameters)
1080         d.addCallback(self._contact_helper)
1081         d.addCallback(self._build_verifycap)
1082         def _done(res):
1083             self._upload_status.set_active(False)
1084             return res
1085         d.addBoth(_done)
1086         return d
1087
1088     def _got_size(self, size):
1089         self._size = size
1090         self._upload_status.set_size(size)
1091
1092     def _got_all_encoding_parameters(self, params):
1093         k, happy, n, segment_size = params
1094         # stash these for URI generation later
1095         self._needed_shares = k
1096         self._total_shares = n
1097         self._segment_size = segment_size
1098
1099     def _contact_helper(self, res):
1100         now = self._time_contacting_helper_start = time.time()
1101         self._storage_index_elapsed = now - self._started
1102         self.log(format="contacting helper for SI %(si)s..",
1103                  si=si_b2a(self._storage_index))
1104         self._upload_status.set_status("Contacting Helper")
1105         d = self._helper.callRemote("upload_chk", self._storage_index)
1106         d.addCallback(self._contacted_helper)
1107         return d
1108
1109     def _contacted_helper(self, (upload_results, upload_helper)):
1110         now = time.time()
1111         elapsed = now - self._time_contacting_helper_start
1112         self._elapsed_time_contacting_helper = elapsed
1113         if upload_helper:
1114             self.log("helper says we need to upload")
1115             self._upload_status.set_status("Uploading Ciphertext")
1116             # we need to upload the file
1117             reu = RemoteEncryptedUploadable(self._encuploadable,
1118                                             self._upload_status)
1119             # let it pre-compute the size for progress purposes
1120             d = reu.get_size()
1121             d.addCallback(lambda ignored:
1122                           upload_helper.callRemote("upload", reu))
1123             # this Deferred will fire with the upload results
1124             return d
1125         self.log("helper says file is already uploaded")
1126         self._upload_status.set_progress(1, 1.0)
1127         self._upload_status.set_results(upload_results)
1128         return upload_results
1129
1130     def _convert_old_upload_results(self, upload_results):
1131         # pre-1.3.0 helpers return upload results which contain a mapping
1132         # from shnum to a single human-readable string, containing things
1133         # like "Found on [x],[y],[z]" (for healthy files that were already in
1134         # the grid), "Found on [x]" (for files that needed upload but which
1135         # discovered pre-existing shares), and "Placed on [x]" (for newly
1136         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1137         # set of binary serverid strings.
1138
1139         # the old results are too hard to deal with (they don't even contain
1140         # as much information as the new results, since the nodeids are
1141         # abbreviated), so if we detect old results, just clobber them.
1142
1143         sharemap = upload_results.sharemap
1144         if str in [type(v) for v in sharemap.values()]:
1145             upload_results.sharemap = None
1146
1147     def _build_verifycap(self, upload_results):
1148         self.log("upload finished, building readcap")
1149         self._convert_old_upload_results(upload_results)
1150         self._upload_status.set_status("Building Readcap")
1151         r = upload_results
1152         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1153         assert r.uri_extension_data["total_shares"] == self._total_shares
1154         assert r.uri_extension_data["segment_size"] == self._segment_size
1155         assert r.uri_extension_data["size"] == self._size
1156         r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1157                                              uri_extension_hash=r.uri_extension_hash,
1158                                              needed_shares=self._needed_shares,
1159                                              total_shares=self._total_shares, size=self._size
1160                                              ).to_string()
1161         now = time.time()
1162         r.file_size = self._size
1163         r.timings["storage_index"] = self._storage_index_elapsed
1164         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1165         if "total" in r.timings:
1166             r.timings["helper_total"] = r.timings["total"]
1167         r.timings["total"] = now - self._started
1168         self._upload_status.set_status("Finished")
1169         self._upload_status.set_results(r)
1170         return r
1171
1172     def get_upload_status(self):
1173         return self._upload_status
1174
1175 class BaseUploadable:
1176     default_max_segment_size = 128*KiB # overridden by max_segment_size
1177     default_encoding_param_k = 3 # overridden by encoding_parameters
1178     default_encoding_param_happy = 7
1179     default_encoding_param_n = 10
1180
1181     max_segment_size = None
1182     encoding_param_k = None
1183     encoding_param_happy = None
1184     encoding_param_n = None
1185
1186     _all_encoding_parameters = None
1187     _status = None
1188
1189     def set_upload_status(self, upload_status):
1190         self._status = IUploadStatus(upload_status)
1191
1192     def set_default_encoding_parameters(self, default_params):
1193         assert isinstance(default_params, dict)
1194         for k,v in default_params.items():
1195             precondition(isinstance(k, str), k, v)
1196             precondition(isinstance(v, int), k, v)
1197         if "k" in default_params:
1198             self.default_encoding_param_k = default_params["k"]
1199         if "happy" in default_params:
1200             self.default_encoding_param_happy = default_params["happy"]
1201         if "n" in default_params:
1202             self.default_encoding_param_n = default_params["n"]
1203         if "max_segment_size" in default_params:
1204             self.default_max_segment_size = default_params["max_segment_size"]
1205
1206     def get_all_encoding_parameters(self):
1207         if self._all_encoding_parameters:
1208             return defer.succeed(self._all_encoding_parameters)
1209
1210         max_segsize = self.max_segment_size or self.default_max_segment_size
1211         k = self.encoding_param_k or self.default_encoding_param_k
1212         happy = self.encoding_param_happy or self.default_encoding_param_happy
1213         n = self.encoding_param_n or self.default_encoding_param_n
1214
1215         d = self.get_size()
1216         def _got_size(file_size):
1217             # for small files, shrink the segment size to avoid wasting space
1218             segsize = min(max_segsize, file_size)
1219             # this must be a multiple of 'required_shares'==k
1220             segsize = mathutil.next_multiple(segsize, k)
1221             encoding_parameters = (k, happy, n, segsize)
1222             self._all_encoding_parameters = encoding_parameters
1223             return encoding_parameters
1224         d.addCallback(_got_size)
1225         return d
1226
1227 class FileHandle(BaseUploadable):
1228     implements(IUploadable)
1229
1230     def __init__(self, filehandle, convergence):
1231         """
1232         Upload the data from the filehandle.  If convergence is None then a
1233         random encryption key will be used, else the plaintext will be hashed,
1234         then the hash will be hashed together with the string in the
1235         "convergence" argument to form the encryption key.
1236         """
1237         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1238         self._filehandle = filehandle
1239         self._key = None
1240         self.convergence = convergence
1241         self._size = None
1242
1243     def _get_encryption_key_convergent(self):
1244         if self._key is not None:
1245             return defer.succeed(self._key)
1246
1247         d = self.get_size()
1248         # that sets self._size as a side-effect
1249         d.addCallback(lambda size: self.get_all_encoding_parameters())
1250         def _got(params):
1251             k, happy, n, segsize = params
1252             f = self._filehandle
1253             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1254             f.seek(0)
1255             BLOCKSIZE = 64*1024
1256             bytes_read = 0
1257             while True:
1258                 data = f.read(BLOCKSIZE)
1259                 if not data:
1260                     break
1261                 enckey_hasher.update(data)
1262                 # TODO: setting progress in a non-yielding loop is kind of
1263                 # pointless, but I'm anticipating (perhaps prematurely) the
1264                 # day when we use a slowjob or twisted's CooperatorService to
1265                 # make this yield time to other jobs.
1266                 bytes_read += len(data)
1267                 if self._status:
1268                     self._status.set_progress(0, float(bytes_read)/self._size)
1269             f.seek(0)
1270             self._key = enckey_hasher.digest()
1271             if self._status:
1272                 self._status.set_progress(0, 1.0)
1273             assert len(self._key) == 16
1274             return self._key
1275         d.addCallback(_got)
1276         return d
1277
1278     def _get_encryption_key_random(self):
1279         if self._key is None:
1280             self._key = os.urandom(16)
1281         return defer.succeed(self._key)
1282
1283     def get_encryption_key(self):
1284         if self.convergence is not None:
1285             return self._get_encryption_key_convergent()
1286         else:
1287             return self._get_encryption_key_random()
1288
1289     def get_size(self):
1290         if self._size is not None:
1291             return defer.succeed(self._size)
1292         self._filehandle.seek(0,2)
1293         size = self._filehandle.tell()
1294         self._size = size
1295         self._filehandle.seek(0)
1296         return defer.succeed(size)
1297
1298     def read(self, length):
1299         return defer.succeed([self._filehandle.read(length)])
1300
1301     def close(self):
1302         # the originator of the filehandle reserves the right to close it
1303         pass
1304
1305 class FileName(FileHandle):
1306     def __init__(self, filename, convergence):
1307         """
1308         Upload the data from the filename.  If convergence is None then a
1309         random encryption key will be used, else the plaintext will be hashed,
1310         then the hash will be hashed together with the string in the
1311         "convergence" argument to form the encryption key.
1312         """
1313         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1314         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1315     def close(self):
1316         FileHandle.close(self)
1317         self._filehandle.close()
1318
1319 class Data(FileHandle):
1320     def __init__(self, data, convergence):
1321         """
1322         Upload the data from the data argument.  If convergence is None then a
1323         random encryption key will be used, else the plaintext will be hashed,
1324         then the hash will be hashed together with the string in the
1325         "convergence" argument to form the encryption key.
1326         """
1327         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1328         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1329
1330 class Uploader(service.MultiService, log.PrefixingLogMixin):
1331     """I am a service that allows file uploading. I am a service-child of the
1332     Client.
1333     """
1334     implements(IUploader)
1335     name = "uploader"
1336     URI_LIT_SIZE_THRESHOLD = 55
1337
1338     def __init__(self, helper_furl=None, stats_provider=None):
1339         self._helper_furl = helper_furl
1340         self.stats_provider = stats_provider
1341         self._helper = None
1342         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1343         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1344         service.MultiService.__init__(self)
1345
1346     def startService(self):
1347         service.MultiService.startService(self)
1348         if self._helper_furl:
1349             self.parent.tub.connectTo(self._helper_furl,
1350                                       self._got_helper)
1351
1352     def _got_helper(self, helper):
1353         self.log("got helper connection, getting versions")
1354         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1355                     { },
1356                     "application-version": "unknown: no get_version()",
1357                     }
1358         d = add_version_to_remote_reference(helper, default)
1359         d.addCallback(self._got_versioned_helper)
1360
1361     def _got_versioned_helper(self, helper):
1362         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1363         if needed not in helper.version:
1364             raise InsufficientVersionError(needed, helper.version)
1365         self._helper = helper
1366         helper.notifyOnDisconnect(self._lost_helper)
1367
1368     def _lost_helper(self):
1369         self._helper = None
1370
1371     def get_helper_info(self):
1372         # return a tuple of (helper_furl_or_None, connected_bool)
1373         return (self._helper_furl, bool(self._helper))
1374
1375
1376     def upload(self, uploadable, history=None):
1377         """
1378         Returns a Deferred that will fire with the UploadResults instance.
1379         """
1380         assert self.parent
1381         assert self.running
1382
1383         uploadable = IUploadable(uploadable)
1384         d = uploadable.get_size()
1385         def _got_size(size):
1386             default_params = self.parent.get_encoding_parameters()
1387             precondition(isinstance(default_params, dict), default_params)
1388             precondition("max_segment_size" in default_params, default_params)
1389             uploadable.set_default_encoding_parameters(default_params)
1390
1391             if self.stats_provider:
1392                 self.stats_provider.count('uploader.files_uploaded', 1)
1393                 self.stats_provider.count('uploader.bytes_uploaded', size)
1394
1395             if size <= self.URI_LIT_SIZE_THRESHOLD:
1396                 uploader = LiteralUploader()
1397                 return uploader.start(uploadable)
1398             else:
1399                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1400                 d2 = defer.succeed(None)
1401                 if self._helper:
1402                     uploader = AssistedUploader(self._helper)
1403                     d2.addCallback(lambda x: eu.get_storage_index())
1404                     d2.addCallback(lambda si: uploader.start(eu, si))
1405                 else:
1406                     storage_broker = self.parent.get_storage_broker()
1407                     secret_holder = self.parent._secret_holder
1408                     uploader = CHKUploader(storage_broker, secret_holder)
1409                     d2.addCallback(lambda x: uploader.start(eu))
1410
1411                 self._all_uploads[uploader] = None
1412                 if history:
1413                     history.add_upload(uploader.get_upload_status())
1414                 def turn_verifycap_into_read_cap(uploadresults):
1415                     # Generate the uri from the verifycap plus the key.
1416                     d3 = uploadable.get_encryption_key()
1417                     def put_readcap_into_results(key):
1418                         v = uri.from_string(uploadresults.verifycapstr)
1419                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1420                         uploadresults.uri = r.to_string()
1421                         return uploadresults
1422                     d3.addCallback(put_readcap_into_results)
1423                     return d3
1424                 d2.addCallback(turn_verifycap_into_read_cap)
1425                 return d2
1426         d.addCallback(_got_size)
1427         def _done(res):
1428             uploadable.close()
1429             return res
1430         d.addBoth(_done)
1431         return d