]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
immutable: test for #1118
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17                                          shares_by_server, merge_peers, \
18                                          failure_message
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23      NoServersError, InsufficientVersionError, UploadUnhappinessError
24 from allmydata.immutable import layout
25 from pycryptopp.cipher.aes import AES
26
27 from cStringIO import StringIO
28
29
30 KiB=1024
31 MiB=1024*KiB
32 GiB=1024*MiB
33 TiB=1024*GiB
34 PiB=1024*TiB
35
36 class HaveAllPeersError(Exception):
37     # we use this to jump out of the loop
38     pass
39
40 # this wants to live in storage, not here
41 class TooFullError(Exception):
42     pass
43
44 class UploadResults(Copyable, RemoteCopy):
45     implements(IUploadResults)
46     # note: don't change this string, it needs to match the value used on the
47     # helper, and it does *not* need to match the fully-qualified
48     # package/module/class name
49     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
50     copytype = typeToCopy
51
52     # also, think twice about changing the shape of any existing attribute,
53     # because instances of this class are sent from the helper to its client,
54     # so changing this may break compatibility. Consider adding new fields
55     # instead of modifying existing ones.
56
57     def __init__(self):
58         self.timings = {} # dict of name to number of seconds
59         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
60         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
61         self.file_size = None
62         self.ciphertext_fetched = None # how much the helper fetched
63         self.uri = None
64         self.preexisting_shares = None # count of shares already present
65         self.pushed_shares = None # count of shares we pushed
66
67
68 # our current uri_extension is 846 bytes for small files, a few bytes
69 # more for larger ones (since the filesize is encoded in decimal in a
70 # few places). Ask for a little bit more just in case we need it. If
71 # the extension changes size, we can change EXTENSION_SIZE to
72 # allocate a more accurate amount of space.
73 EXTENSION_SIZE = 1000
74 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
75 # this.
76
77 class PeerTracker:
78     def __init__(self, peerid, storage_server,
79                  sharesize, blocksize, num_segments, num_share_hashes,
80                  storage_index,
81                  bucket_renewal_secret, bucket_cancel_secret):
82         precondition(isinstance(peerid, str), peerid)
83         precondition(len(peerid) == 20, peerid)
84         self.peerid = peerid
85         self._storageserver = storage_server # to an RIStorageServer
86         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
87         self.sharesize = sharesize
88
89         wbp = layout.make_write_bucket_proxy(None, sharesize,
90                                              blocksize, num_segments,
91                                              num_share_hashes,
92                                              EXTENSION_SIZE, peerid)
93         self.wbp_class = wbp.__class__ # to create more of them
94         self.allocated_size = wbp.get_allocated_size()
95         self.blocksize = blocksize
96         self.num_segments = num_segments
97         self.num_share_hashes = num_share_hashes
98         self.storage_index = storage_index
99
100         self.renew_secret = bucket_renewal_secret
101         self.cancel_secret = bucket_cancel_secret
102
103     def __repr__(self):
104         return ("<PeerTracker for peer %s and SI %s>"
105                 % (idlib.shortnodeid_b2a(self.peerid),
106                    si_b2a(self.storage_index)[:5]))
107
108     def query(self, sharenums):
109         d = self._storageserver.callRemote("allocate_buckets",
110                                            self.storage_index,
111                                            self.renew_secret,
112                                            self.cancel_secret,
113                                            sharenums,
114                                            self.allocated_size,
115                                            canary=Referenceable())
116         d.addCallback(self._got_reply)
117         return d
118
119     def ask_about_existing_shares(self):
120         return self._storageserver.callRemote("get_buckets",
121                                               self.storage_index)
122
123     def _got_reply(self, (alreadygot, buckets)):
124         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
125         b = {}
126         for sharenum, rref in buckets.iteritems():
127             bp = self.wbp_class(rref, self.sharesize,
128                                 self.blocksize,
129                                 self.num_segments,
130                                 self.num_share_hashes,
131                                 EXTENSION_SIZE,
132                                 self.peerid)
133             b[sharenum] = bp
134         self.buckets.update(b)
135         return (alreadygot, set(b.keys()))
136
137
138     def abort(self):
139         """
140         I abort the remote bucket writers for the share numbers in
141         sharenums. This is a good idea to conserve space on the storage
142         server.
143         """
144         for writer in self.buckets.itervalues(): writer.abort()
145
146
147 class Tahoe2PeerSelector:
148
149     def __init__(self, upload_id, logparent=None, upload_status=None):
150         self.upload_id = upload_id
151         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
152         # Peers that are working normally, but full.
153         self.full_count = 0
154         self.error_count = 0
155         self.num_peers_contacted = 0
156         self.last_failure_msg = None
157         self._status = IUploadStatus(upload_status)
158         self._log_parent = log.msg("%s starting" % self, parent=logparent)
159
160     def __repr__(self):
161         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
162
163     def get_shareholders(self, storage_broker, secret_holder,
164                          storage_index, share_size, block_size,
165                          num_segments, total_shares, needed_shares,
166                          servers_of_happiness):
167         """
168         @return: (used_peers, already_peers), where used_peers is a set of
169                  PeerTracker instances that have agreed to hold some shares
170                  for us (the shnum is stashed inside the PeerTracker),
171                  and already_peers is a dict mapping shnum to a set of peers
172                  which claim to already have the share.
173         """
174
175         if self._status:
176             self._status.set_status("Contacting Peers..")
177
178         self.total_shares = total_shares
179         self.servers_of_happiness = servers_of_happiness
180         self.needed_shares = needed_shares
181
182         self.homeless_shares = range(total_shares)
183         self.contacted_peers = [] # peers worth asking again
184         self.contacted_peers2 = [] # peers that we have asked again
185         self._started_second_pass = False
186         self.use_peers = set() # PeerTrackers that have shares assigned to them
187         self.preexisting_shares = {} # shareid => set(peerids) holding shareid
188         # We don't try to allocate shares to these servers, since they've said
189         # that they're incapable of storing shares of the size that we'd want
190         # to store. We keep them around because they may have existing shares
191         # for this storage index, which we want to know about for accurate
192         # servers_of_happiness accounting
193         # (this is eventually a list, but it is initialized later)
194         self.readonly_peers = None
195         # These peers have shares -- any shares -- for our SI. We keep
196         # track of these to write an error message with them later.
197         self.peers_with_shares = set()
198
199         # this needed_hashes computation should mirror
200         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
201         # (instead of a HashTree) because we don't require actual hashing
202         # just to count the levels.
203         ht = hashtree.IncompleteHashTree(total_shares)
204         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
205
206         # figure out how much space to ask for
207         wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
208                                              num_share_hashes, EXTENSION_SIZE,
209                                              None)
210         allocated_size = wbp.get_allocated_size()
211         all_peers = storage_broker.get_servers_for_index(storage_index)
212         if not all_peers:
213             raise NoServersError("client gave us zero peers")
214
215         # filter the list of peers according to which ones can accomodate
216         # this request. This excludes older peers (which used a 4-byte size
217         # field) from getting large shares (for files larger than about
218         # 12GiB). See #439 for details.
219         def _get_maxsize(peer):
220             (peerid, conn) = peer
221             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
222             return v1["maximum-immutable-share-size"]
223         writable_peers = [peer for peer in all_peers
224                           if _get_maxsize(peer) >= allocated_size]
225         readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers)
226
227         # decide upon the renewal/cancel secrets, to include them in the
228         # allocate_buckets query.
229         client_renewal_secret = secret_holder.get_renewal_secret()
230         client_cancel_secret = secret_holder.get_cancel_secret()
231
232         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
233                                                        storage_index)
234         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
235                                                      storage_index)
236         def _make_trackers(peers):
237            return [PeerTracker(peerid, conn,
238                                share_size, block_size,
239                                num_segments, num_share_hashes,
240                                storage_index,
241                                bucket_renewal_secret_hash(file_renewal_secret,
242                                                           peerid),
243                                bucket_cancel_secret_hash(file_cancel_secret,
244                                                          peerid))
245                     for (peerid, conn) in peers]
246         self.uncontacted_peers = _make_trackers(writable_peers)
247         self.readonly_peers = _make_trackers(readonly_peers)
248         # We now ask peers that can't hold any new shares about existing
249         # shares that they might have for our SI. Once this is done, we
250         # start placing the shares that we haven't already accounted
251         # for.
252         ds = []
253         if self._status and self.readonly_peers:
254             self._status.set_status("Contacting readonly peers to find "
255                                     "any existing shares")
256         for peer in self.readonly_peers:
257             assert isinstance(peer, PeerTracker)
258             d = peer.ask_about_existing_shares()
259             d.addBoth(self._handle_existing_response, peer.peerid)
260             ds.append(d)
261             self.num_peers_contacted += 1
262             self.query_count += 1
263             log.msg("asking peer %s for any existing shares for "
264                     "upload id %s"
265                     % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
266                     level=log.NOISY, parent=self._log_parent)
267         dl = defer.DeferredList(ds)
268         dl.addCallback(lambda ign: self._loop())
269         return dl
270
271
272     def _handle_existing_response(self, res, peer):
273         """
274         I handle responses to the queries sent by
275         Tahoe2PeerSelector._existing_shares.
276         """
277         if isinstance(res, failure.Failure):
278             log.msg("%s got error during existing shares check: %s"
279                     % (idlib.shortnodeid_b2a(peer), res),
280                     level=log.UNUSUAL, parent=self._log_parent)
281             self.error_count += 1
282             self.bad_query_count += 1
283         else:
284             buckets = res
285             if buckets:
286                 self.peers_with_shares.add(peer)
287             log.msg("response from peer %s: alreadygot=%s"
288                     % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
289                     level=log.NOISY, parent=self._log_parent)
290             for bucket in buckets:
291                 self.preexisting_shares.setdefault(bucket, set()).add(peer)
292                 if self.homeless_shares and bucket in self.homeless_shares:
293                     self.homeless_shares.remove(bucket)
294             self.full_count += 1
295             self.bad_query_count += 1
296
297
298     def _get_progress_message(self):
299         if not self.homeless_shares:
300             msg = "placed all %d shares, " % (self.total_shares)
301         else:
302             msg = ("placed %d shares out of %d total (%d homeless), " %
303                    (self.total_shares - len(self.homeless_shares),
304                     self.total_shares,
305                     len(self.homeless_shares)))
306         return (msg + "want to place shares on at least %d servers such that "
307                       "any %d of them have enough shares to recover the file, "
308                       "sent %d queries to %d peers, "
309                       "%d queries placed some shares, %d placed none "
310                       "(of which %d placed none due to the server being"
311                       " full and %d placed none due to an error)" %
312                         (self.servers_of_happiness, self.needed_shares,
313                          self.query_count, self.num_peers_contacted,
314                          self.good_query_count, self.bad_query_count,
315                          self.full_count, self.error_count))
316
317
318     def _loop(self):
319         if not self.homeless_shares:
320             merged = merge_peers(self.preexisting_shares, self.use_peers)
321             effective_happiness = servers_of_happiness(merged)
322             if self.servers_of_happiness <= effective_happiness:
323                 msg = ("peer selection successful for %s: %s" % (self,
324                             self._get_progress_message()))
325                 log.msg(msg, parent=self._log_parent)
326                 return (self.use_peers, self.preexisting_shares)
327             else:
328                 # We're not okay right now, but maybe we can fix it by
329                 # redistributing some shares. In cases where one or two
330                 # servers has, before the upload, all or most of the
331                 # shares for a given SI, this can work by allowing _loop
332                 # a chance to spread those out over the other peers, 
333                 delta = self.servers_of_happiness - effective_happiness
334                 shares = shares_by_server(self.preexisting_shares)
335                 # Each server in shares maps to a set of shares stored on it.
336                 # Since we want to keep at least one share on each server 
337                 # that has one (otherwise we'd only be making
338                 # the situation worse by removing distinct servers),
339                 # each server has len(its shares) - 1 to spread around.
340                 shares_to_spread = sum([len(list(sharelist)) - 1
341                                         for (server, sharelist)
342                                         in shares.items()])
343                 if delta <= len(self.uncontacted_peers) and \
344                    shares_to_spread >= delta:
345                     items = shares.items()
346                     while len(self.homeless_shares) < delta:
347                         # Loop through the allocated shares, removing
348                         # one from each server that has more than one
349                         # and putting it back into self.homeless_shares
350                         # until we've done this delta times.
351                         server, sharelist = items.pop()
352                         if len(sharelist) > 1:
353                             share = sharelist.pop()
354                             self.homeless_shares.append(share)
355                             self.preexisting_shares[share].remove(server)
356                             if not self.preexisting_shares[share]:
357                                 del self.preexisting_shares[share]
358                             items.append((server, sharelist))
359                     return self._loop()
360                 else:
361                     # Redistribution won't help us; fail.
362                     peer_count = len(self.peers_with_shares)
363                     msg = failure_message(peer_count,
364                                           self.needed_shares,
365                                           self.servers_of_happiness,
366                                           effective_happiness)
367                     return self._failed("%s (%s)" % (msg, self._get_progress_message()))
368
369         if self.uncontacted_peers:
370             peer = self.uncontacted_peers.pop(0)
371             # TODO: don't pre-convert all peerids to PeerTrackers
372             assert isinstance(peer, PeerTracker)
373
374             shares_to_ask = set([self.homeless_shares.pop(0)])
375             self.query_count += 1
376             self.num_peers_contacted += 1
377             if self._status:
378                 self._status.set_status("Contacting Peers [%s] (first query),"
379                                         " %d shares left.."
380                                         % (idlib.shortnodeid_b2a(peer.peerid),
381                                            len(self.homeless_shares)))
382             d = peer.query(shares_to_ask)
383             d.addBoth(self._got_response, peer, shares_to_ask,
384                       self.contacted_peers)
385             return d
386         elif self.contacted_peers:
387             # ask a peer that we've already asked.
388             if not self._started_second_pass:
389                 log.msg("starting second pass", parent=self._log_parent,
390                         level=log.NOISY)
391                 self._started_second_pass = True
392             num_shares = mathutil.div_ceil(len(self.homeless_shares),
393                                            len(self.contacted_peers))
394             peer = self.contacted_peers.pop(0)
395             shares_to_ask = set(self.homeless_shares[:num_shares])
396             self.homeless_shares[:num_shares] = []
397             self.query_count += 1
398             if self._status:
399                 self._status.set_status("Contacting Peers [%s] (second query),"
400                                         " %d shares left.."
401                                         % (idlib.shortnodeid_b2a(peer.peerid),
402                                            len(self.homeless_shares)))
403             d = peer.query(shares_to_ask)
404             d.addBoth(self._got_response, peer, shares_to_ask,
405                       self.contacted_peers2)
406             return d
407         elif self.contacted_peers2:
408             # we've finished the second-or-later pass. Move all the remaining
409             # peers back into self.contacted_peers for the next pass.
410             self.contacted_peers.extend(self.contacted_peers2)
411             self.contacted_peers2[:] = []
412             return self._loop()
413         else:
414             # no more peers. If we haven't placed enough shares, we fail.
415             merged = merge_peers(self.preexisting_shares, self.use_peers)
416             effective_happiness = servers_of_happiness(merged)
417             if effective_happiness < self.servers_of_happiness:
418                 msg = failure_message(len(self.peers_with_shares),
419                                       self.needed_shares,
420                                       self.servers_of_happiness,
421                                       effective_happiness)
422                 msg = ("peer selection failed for %s: %s (%s)" % (self,
423                                 msg,
424                                 self._get_progress_message()))
425                 if self.last_failure_msg:
426                     msg += " (%s)" % (self.last_failure_msg,)
427                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
428                 return self._failed(msg)
429             else:
430                 # we placed enough to be happy, so we're done
431                 if self._status:
432                     self._status.set_status("Placed all shares")
433                 return (self.use_peers, self.preexisting_shares)
434
435     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
436         if isinstance(res, failure.Failure):
437             # This is unusual, and probably indicates a bug or a network
438             # problem.
439             log.msg("%s got error during peer selection: %s" % (peer, res),
440                     level=log.UNUSUAL, parent=self._log_parent)
441             self.error_count += 1
442             self.bad_query_count += 1
443             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
444             if (self.uncontacted_peers
445                 or self.contacted_peers
446                 or self.contacted_peers2):
447                 # there is still hope, so just loop
448                 pass
449             else:
450                 # No more peers, so this upload might fail (it depends upon
451                 # whether we've hit servers_of_happiness or not). Log the last
452                 # failure we got: if a coding error causes all peers to fail
453                 # in the same way, this allows the common failure to be seen
454                 # by the uploader and should help with debugging
455                 msg = ("last failure (from %s) was: %s" % (peer, res))
456                 self.last_failure_msg = msg
457         else:
458             (alreadygot, allocated) = res
459             log.msg("response from peer %s: alreadygot=%s, allocated=%s"
460                     % (idlib.shortnodeid_b2a(peer.peerid),
461                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
462                     level=log.NOISY, parent=self._log_parent)
463             progress = False
464             for s in alreadygot:
465                 self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
466                 if s in self.homeless_shares:
467                     self.homeless_shares.remove(s)
468                     progress = True
469                 elif s in shares_to_ask:
470                     progress = True
471
472             # the PeerTracker will remember which shares were allocated on
473             # that peer. We just have to remember to use them.
474             if allocated:
475                 self.use_peers.add(peer)
476                 progress = True
477
478             if allocated or alreadygot:
479                 self.peers_with_shares.add(peer.peerid)
480
481             not_yet_present = set(shares_to_ask) - set(alreadygot)
482             still_homeless = not_yet_present - set(allocated)
483
484             if progress:
485                 # They accepted at least one of the shares that we asked
486                 # them to accept, or they had a share that we didn't ask
487                 # them to accept but that we hadn't placed yet, so this
488                 # was a productive query
489                 self.good_query_count += 1
490             else:
491                 self.bad_query_count += 1
492                 self.full_count += 1
493
494             if still_homeless:
495                 # In networks with lots of space, this is very unusual and
496                 # probably indicates an error. In networks with peers that
497                 # are full, it is merely unusual. In networks that are very
498                 # full, it is common, and many uploads will fail. In most
499                 # cases, this is obviously not fatal, and we'll just use some
500                 # other peers.
501
502                 # some shares are still homeless, keep trying to find them a
503                 # home. The ones that were rejected get first priority.
504                 self.homeless_shares = (list(still_homeless)
505                                         + self.homeless_shares)
506                 # Since they were unable to accept all of our requests, so it
507                 # is safe to assume that asking them again won't help.
508             else:
509                 # if they *were* able to accept everything, they might be
510                 # willing to accept even more.
511                 put_peer_here.append(peer)
512
513         # now loop
514         return self._loop()
515
516
517     def _failed(self, msg):
518         """
519         I am called when peer selection fails. I first abort all of the
520         remote buckets that I allocated during my unsuccessful attempt to
521         place shares for this file. I then raise an
522         UploadUnhappinessError with my msg argument.
523         """
524         for peer in self.use_peers:
525             assert isinstance(peer, PeerTracker)
526
527             peer.abort()
528
529         raise UploadUnhappinessError(msg)
530
531
532 class EncryptAnUploadable:
533     """This is a wrapper that takes an IUploadable and provides
534     IEncryptedUploadable."""
535     implements(IEncryptedUploadable)
536     CHUNKSIZE = 50*1024
537
538     def __init__(self, original, log_parent=None):
539         self.original = IUploadable(original)
540         self._log_number = log_parent
541         self._encryptor = None
542         self._plaintext_hasher = plaintext_hasher()
543         self._plaintext_segment_hasher = None
544         self._plaintext_segment_hashes = []
545         self._encoding_parameters = None
546         self._file_size = None
547         self._ciphertext_bytes_read = 0
548         self._status = None
549
550     def set_upload_status(self, upload_status):
551         self._status = IUploadStatus(upload_status)
552         self.original.set_upload_status(upload_status)
553
554     def log(self, *args, **kwargs):
555         if "facility" not in kwargs:
556             kwargs["facility"] = "upload.encryption"
557         if "parent" not in kwargs:
558             kwargs["parent"] = self._log_number
559         return log.msg(*args, **kwargs)
560
561     def get_size(self):
562         if self._file_size is not None:
563             return defer.succeed(self._file_size)
564         d = self.original.get_size()
565         def _got_size(size):
566             self._file_size = size
567             if self._status:
568                 self._status.set_size(size)
569             return size
570         d.addCallback(_got_size)
571         return d
572
573     def get_all_encoding_parameters(self):
574         if self._encoding_parameters is not None:
575             return defer.succeed(self._encoding_parameters)
576         d = self.original.get_all_encoding_parameters()
577         def _got(encoding_parameters):
578             (k, happy, n, segsize) = encoding_parameters
579             self._segment_size = segsize # used by segment hashers
580             self._encoding_parameters = encoding_parameters
581             self.log("my encoding parameters: %s" % (encoding_parameters,),
582                      level=log.NOISY)
583             return encoding_parameters
584         d.addCallback(_got)
585         return d
586
587     def _get_encryptor(self):
588         if self._encryptor:
589             return defer.succeed(self._encryptor)
590
591         d = self.original.get_encryption_key()
592         def _got(key):
593             e = AES(key)
594             self._encryptor = e
595
596             storage_index = storage_index_hash(key)
597             assert isinstance(storage_index, str)
598             # There's no point to having the SI be longer than the key, so we
599             # specify that it is truncated to the same 128 bits as the AES key.
600             assert len(storage_index) == 16  # SHA-256 truncated to 128b
601             self._storage_index = storage_index
602             if self._status:
603                 self._status.set_storage_index(storage_index)
604             return e
605         d.addCallback(_got)
606         return d
607
608     def get_storage_index(self):
609         d = self._get_encryptor()
610         d.addCallback(lambda res: self._storage_index)
611         return d
612
613     def _get_segment_hasher(self):
614         p = self._plaintext_segment_hasher
615         if p:
616             left = self._segment_size - self._plaintext_segment_hashed_bytes
617             return p, left
618         p = plaintext_segment_hasher()
619         self._plaintext_segment_hasher = p
620         self._plaintext_segment_hashed_bytes = 0
621         return p, self._segment_size
622
623     def _update_segment_hash(self, chunk):
624         offset = 0
625         while offset < len(chunk):
626             p, segment_left = self._get_segment_hasher()
627             chunk_left = len(chunk) - offset
628             this_segment = min(chunk_left, segment_left)
629             p.update(chunk[offset:offset+this_segment])
630             self._plaintext_segment_hashed_bytes += this_segment
631
632             if self._plaintext_segment_hashed_bytes == self._segment_size:
633                 # we've filled this segment
634                 self._plaintext_segment_hashes.append(p.digest())
635                 self._plaintext_segment_hasher = None
636                 self.log("closed hash [%d]: %dB" %
637                          (len(self._plaintext_segment_hashes)-1,
638                           self._plaintext_segment_hashed_bytes),
639                          level=log.NOISY)
640                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
641                          segnum=len(self._plaintext_segment_hashes)-1,
642                          hash=base32.b2a(p.digest()),
643                          level=log.NOISY)
644
645             offset += this_segment
646
647
648     def read_encrypted(self, length, hash_only):
649         # make sure our parameters have been set up first
650         d = self.get_all_encoding_parameters()
651         # and size
652         d.addCallback(lambda ignored: self.get_size())
653         d.addCallback(lambda ignored: self._get_encryptor())
654         # then fetch and encrypt the plaintext. The unusual structure here
655         # (passing a Deferred *into* a function) is needed to avoid
656         # overflowing the stack: Deferreds don't optimize out tail recursion.
657         # We also pass in a list, to which _read_encrypted will append
658         # ciphertext.
659         ciphertext = []
660         d2 = defer.Deferred()
661         d.addCallback(lambda ignored:
662                       self._read_encrypted(length, ciphertext, hash_only, d2))
663         d.addCallback(lambda ignored: d2)
664         return d
665
666     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
667         if not remaining:
668             fire_when_done.callback(ciphertext)
669             return None
670         # tolerate large length= values without consuming a lot of RAM by
671         # reading just a chunk (say 50kB) at a time. This only really matters
672         # when hash_only==True (i.e. resuming an interrupted upload), since
673         # that's the case where we will be skipping over a lot of data.
674         size = min(remaining, self.CHUNKSIZE)
675         remaining = remaining - size
676         # read a chunk of plaintext..
677         d = defer.maybeDeferred(self.original.read, size)
678         # N.B.: if read() is synchronous, then since everything else is
679         # actually synchronous too, we'd blow the stack unless we stall for a
680         # tick. Once you accept a Deferred from IUploadable.read(), you must
681         # be prepared to have it fire immediately too.
682         d.addCallback(fireEventually)
683         def _good(plaintext):
684             # and encrypt it..
685             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
686             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
687             ciphertext.extend(ct)
688             self._read_encrypted(remaining, ciphertext, hash_only,
689                                  fire_when_done)
690         def _err(why):
691             fire_when_done.errback(why)
692         d.addCallback(_good)
693         d.addErrback(_err)
694         return None
695
696     def _hash_and_encrypt_plaintext(self, data, hash_only):
697         assert isinstance(data, (tuple, list)), type(data)
698         data = list(data)
699         cryptdata = []
700         # we use data.pop(0) instead of 'for chunk in data' to save
701         # memory: each chunk is destroyed as soon as we're done with it.
702         bytes_processed = 0
703         while data:
704             chunk = data.pop(0)
705             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
706                      level=log.NOISY)
707             bytes_processed += len(chunk)
708             self._plaintext_hasher.update(chunk)
709             self._update_segment_hash(chunk)
710             # TODO: we have to encrypt the data (even if hash_only==True)
711             # because pycryptopp's AES-CTR implementation doesn't offer a
712             # way to change the counter value. Once pycryptopp acquires
713             # this ability, change this to simply update the counter
714             # before each call to (hash_only==False) _encryptor.process()
715             ciphertext = self._encryptor.process(chunk)
716             if hash_only:
717                 self.log("  skipping encryption", level=log.NOISY)
718             else:
719                 cryptdata.append(ciphertext)
720             del ciphertext
721             del chunk
722         self._ciphertext_bytes_read += bytes_processed
723         if self._status:
724             progress = float(self._ciphertext_bytes_read) / self._file_size
725             self._status.set_progress(1, progress)
726         return cryptdata
727
728
729     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
730         # this is currently unused, but will live again when we fix #453
731         if len(self._plaintext_segment_hashes) < num_segments:
732             # close out the last one
733             assert len(self._plaintext_segment_hashes) == num_segments-1
734             p, segment_left = self._get_segment_hasher()
735             self._plaintext_segment_hashes.append(p.digest())
736             del self._plaintext_segment_hasher
737             self.log("closing plaintext leaf hasher, hashed %d bytes" %
738                      self._plaintext_segment_hashed_bytes,
739                      level=log.NOISY)
740             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
741                      segnum=len(self._plaintext_segment_hashes)-1,
742                      hash=base32.b2a(p.digest()),
743                      level=log.NOISY)
744         assert len(self._plaintext_segment_hashes) == num_segments
745         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
746
747     def get_plaintext_hash(self):
748         h = self._plaintext_hasher.digest()
749         return defer.succeed(h)
750
751     def close(self):
752         return self.original.close()
753
754 class UploadStatus:
755     implements(IUploadStatus)
756     statusid_counter = itertools.count(0)
757
758     def __init__(self):
759         self.storage_index = None
760         self.size = None
761         self.helper = False
762         self.status = "Not started"
763         self.progress = [0.0, 0.0, 0.0]
764         self.active = True
765         self.results = None
766         self.counter = self.statusid_counter.next()
767         self.started = time.time()
768
769     def get_started(self):
770         return self.started
771     def get_storage_index(self):
772         return self.storage_index
773     def get_size(self):
774         return self.size
775     def using_helper(self):
776         return self.helper
777     def get_status(self):
778         return self.status
779     def get_progress(self):
780         return tuple(self.progress)
781     def get_active(self):
782         return self.active
783     def get_results(self):
784         return self.results
785     def get_counter(self):
786         return self.counter
787
788     def set_storage_index(self, si):
789         self.storage_index = si
790     def set_size(self, size):
791         self.size = size
792     def set_helper(self, helper):
793         self.helper = helper
794     def set_status(self, status):
795         self.status = status
796     def set_progress(self, which, value):
797         # [0]: chk, [1]: ciphertext, [2]: encode+push
798         self.progress[which] = value
799     def set_active(self, value):
800         self.active = value
801     def set_results(self, value):
802         self.results = value
803
804 class CHKUploader:
805     peer_selector_class = Tahoe2PeerSelector
806
807     def __init__(self, storage_broker, secret_holder):
808         # peer_selector needs storage_broker and secret_holder
809         self._storage_broker = storage_broker
810         self._secret_holder = secret_holder
811         self._log_number = self.log("CHKUploader starting", parent=None)
812         self._encoder = None
813         self._results = UploadResults()
814         self._storage_index = None
815         self._upload_status = UploadStatus()
816         self._upload_status.set_helper(False)
817         self._upload_status.set_active(True)
818         self._upload_status.set_results(self._results)
819
820         # locate_all_shareholders() will create the following attribute:
821         # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
822
823     def log(self, *args, **kwargs):
824         if "parent" not in kwargs:
825             kwargs["parent"] = self._log_number
826         if "facility" not in kwargs:
827             kwargs["facility"] = "tahoe.upload"
828         return log.msg(*args, **kwargs)
829
830     def start(self, encrypted_uploadable):
831         """Start uploading the file.
832
833         Returns a Deferred that will fire with the UploadResults instance.
834         """
835
836         self._started = time.time()
837         eu = IEncryptedUploadable(encrypted_uploadable)
838         self.log("starting upload of %s" % eu)
839
840         eu.set_upload_status(self._upload_status)
841         d = self.start_encrypted(eu)
842         def _done(uploadresults):
843             self._upload_status.set_active(False)
844             return uploadresults
845         d.addBoth(_done)
846         return d
847
848     def abort(self):
849         """Call this if the upload must be abandoned before it completes.
850         This will tell the shareholders to delete their partial shares. I
851         return a Deferred that fires when these messages have been acked."""
852         if not self._encoder:
853             # how did you call abort() before calling start() ?
854             return defer.succeed(None)
855         return self._encoder.abort()
856
857     def start_encrypted(self, encrypted):
858         """ Returns a Deferred that will fire with the UploadResults instance. """
859         eu = IEncryptedUploadable(encrypted)
860
861         started = time.time()
862         self._encoder = e = encode.Encoder(self._log_number,
863                                            self._upload_status)
864         d = e.set_encrypted_uploadable(eu)
865         d.addCallback(self.locate_all_shareholders, started)
866         d.addCallback(self.set_shareholders, e)
867         d.addCallback(lambda res: e.start())
868         d.addCallback(self._encrypted_done)
869         return d
870
871     def locate_all_shareholders(self, encoder, started):
872         peer_selection_started = now = time.time()
873         self._storage_index_elapsed = now - started
874         storage_broker = self._storage_broker
875         secret_holder = self._secret_holder
876         storage_index = encoder.get_param("storage_index")
877         self._storage_index = storage_index
878         upload_id = si_b2a(storage_index)[:5]
879         self.log("using storage index %s" % upload_id)
880         peer_selector = self.peer_selector_class(upload_id, self._log_number,
881                                                  self._upload_status)
882
883         share_size = encoder.get_param("share_size")
884         block_size = encoder.get_param("block_size")
885         num_segments = encoder.get_param("num_segments")
886         k,desired,n = encoder.get_param("share_counts")
887
888         self._peer_selection_started = time.time()
889         d = peer_selector.get_shareholders(storage_broker, secret_holder,
890                                            storage_index,
891                                            share_size, block_size,
892                                            num_segments, n, k, desired)
893         def _done(res):
894             self._peer_selection_elapsed = time.time() - peer_selection_started
895             return res
896         d.addCallback(_done)
897         return d
898
899     def set_shareholders(self, (used_peers, already_peers), encoder):
900         """
901         @param used_peers: a sequence of PeerTracker objects
902         @paran already_peers: a dict mapping sharenum to a set of peerids
903                               that claim to already have this share
904         """
905         self.log("_send_shares, used_peers is %s" % (used_peers,))
906         # record already-present shares in self._results
907         self._results.preexisting_shares = len(already_peers)
908
909         self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
910         for peer in used_peers:
911             assert isinstance(peer, PeerTracker)
912         buckets = {}
913         servermap = already_peers.copy()
914         for peer in used_peers:
915             buckets.update(peer.buckets)
916             for shnum in peer.buckets:
917                 self._peer_trackers[shnum] = peer
918                 servermap.setdefault(shnum, set()).add(peer.peerid)
919         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers]), "%s (%s) != %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in used_peers]), [(p.buckets, p.peerid) for p in used_peers])
920         encoder.set_shareholders(buckets, servermap)
921
922     def _encrypted_done(self, verifycap):
923         """ Returns a Deferred that will fire with the UploadResults instance. """
924         r = self._results
925         for shnum in self._encoder.get_shares_placed():
926             peer_tracker = self._peer_trackers[shnum]
927             peerid = peer_tracker.peerid
928             r.sharemap.add(shnum, peerid)
929             r.servermap.add(peerid, shnum)
930         r.pushed_shares = len(self._encoder.get_shares_placed())
931         now = time.time()
932         r.file_size = self._encoder.file_size
933         r.timings["total"] = now - self._started
934         r.timings["storage_index"] = self._storage_index_elapsed
935         r.timings["peer_selection"] = self._peer_selection_elapsed
936         r.timings.update(self._encoder.get_times())
937         r.uri_extension_data = self._encoder.get_uri_extension_data()
938         r.verifycapstr = verifycap.to_string()
939         return r
940
941     def get_upload_status(self):
942         return self._upload_status
943
944 def read_this_many_bytes(uploadable, size, prepend_data=[]):
945     if size == 0:
946         return defer.succeed([])
947     d = uploadable.read(size)
948     def _got(data):
949         assert isinstance(data, list)
950         bytes = sum([len(piece) for piece in data])
951         assert bytes > 0
952         assert bytes <= size
953         remaining = size - bytes
954         if remaining:
955             return read_this_many_bytes(uploadable, remaining,
956                                         prepend_data + data)
957         return prepend_data + data
958     d.addCallback(_got)
959     return d
960
961 class LiteralUploader:
962
963     def __init__(self):
964         self._results = UploadResults()
965         self._status = s = UploadStatus()
966         s.set_storage_index(None)
967         s.set_helper(False)
968         s.set_progress(0, 1.0)
969         s.set_active(False)
970         s.set_results(self._results)
971
972     def start(self, uploadable):
973         uploadable = IUploadable(uploadable)
974         d = uploadable.get_size()
975         def _got_size(size):
976             self._size = size
977             self._status.set_size(size)
978             self._results.file_size = size
979             return read_this_many_bytes(uploadable, size)
980         d.addCallback(_got_size)
981         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
982         d.addCallback(lambda u: u.to_string())
983         d.addCallback(self._build_results)
984         return d
985
986     def _build_results(self, uri):
987         self._results.uri = uri
988         self._status.set_status("Finished")
989         self._status.set_progress(1, 1.0)
990         self._status.set_progress(2, 1.0)
991         return self._results
992
993     def close(self):
994         pass
995
996     def get_upload_status(self):
997         return self._status
998
999 class RemoteEncryptedUploadable(Referenceable):
1000     implements(RIEncryptedUploadable)
1001
1002     def __init__(self, encrypted_uploadable, upload_status):
1003         self._eu = IEncryptedUploadable(encrypted_uploadable)
1004         self._offset = 0
1005         self._bytes_sent = 0
1006         self._status = IUploadStatus(upload_status)
1007         # we are responsible for updating the status string while we run, and
1008         # for setting the ciphertext-fetch progress.
1009         self._size = None
1010
1011     def get_size(self):
1012         if self._size is not None:
1013             return defer.succeed(self._size)
1014         d = self._eu.get_size()
1015         def _got_size(size):
1016             self._size = size
1017             return size
1018         d.addCallback(_got_size)
1019         return d
1020
1021     def remote_get_size(self):
1022         return self.get_size()
1023     def remote_get_all_encoding_parameters(self):
1024         return self._eu.get_all_encoding_parameters()
1025
1026     def _read_encrypted(self, length, hash_only):
1027         d = self._eu.read_encrypted(length, hash_only)
1028         def _read(strings):
1029             if hash_only:
1030                 self._offset += length
1031             else:
1032                 size = sum([len(data) for data in strings])
1033                 self._offset += size
1034             return strings
1035         d.addCallback(_read)
1036         return d
1037
1038     def remote_read_encrypted(self, offset, length):
1039         # we don't support seek backwards, but we allow skipping forwards
1040         precondition(offset >= 0, offset)
1041         precondition(length >= 0, length)
1042         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1043                      level=log.NOISY)
1044         precondition(offset >= self._offset, offset, self._offset)
1045         if offset > self._offset:
1046             # read the data from disk anyways, to build up the hash tree
1047             skip = offset - self._offset
1048             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1049                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1050             d = self._read_encrypted(skip, hash_only=True)
1051         else:
1052             d = defer.succeed(None)
1053
1054         def _at_correct_offset(res):
1055             assert offset == self._offset, "%d != %d" % (offset, self._offset)
1056             return self._read_encrypted(length, hash_only=False)
1057         d.addCallback(_at_correct_offset)
1058
1059         def _read(strings):
1060             size = sum([len(data) for data in strings])
1061             self._bytes_sent += size
1062             return strings
1063         d.addCallback(_read)
1064         return d
1065
1066     def remote_close(self):
1067         return self._eu.close()
1068
1069
1070 class AssistedUploader:
1071
1072     def __init__(self, helper):
1073         self._helper = helper
1074         self._log_number = log.msg("AssistedUploader starting")
1075         self._storage_index = None
1076         self._upload_status = s = UploadStatus()
1077         s.set_helper(True)
1078         s.set_active(True)
1079
1080     def log(self, *args, **kwargs):
1081         if "parent" not in kwargs:
1082             kwargs["parent"] = self._log_number
1083         return log.msg(*args, **kwargs)
1084
1085     def start(self, encrypted_uploadable, storage_index):
1086         """Start uploading the file.
1087
1088         Returns a Deferred that will fire with the UploadResults instance.
1089         """
1090         precondition(isinstance(storage_index, str), storage_index)
1091         self._started = time.time()
1092         eu = IEncryptedUploadable(encrypted_uploadable)
1093         eu.set_upload_status(self._upload_status)
1094         self._encuploadable = eu
1095         self._storage_index = storage_index
1096         d = eu.get_size()
1097         d.addCallback(self._got_size)
1098         d.addCallback(lambda res: eu.get_all_encoding_parameters())
1099         d.addCallback(self._got_all_encoding_parameters)
1100         d.addCallback(self._contact_helper)
1101         d.addCallback(self._build_verifycap)
1102         def _done(res):
1103             self._upload_status.set_active(False)
1104             return res
1105         d.addBoth(_done)
1106         return d
1107
1108     def _got_size(self, size):
1109         self._size = size
1110         self._upload_status.set_size(size)
1111
1112     def _got_all_encoding_parameters(self, params):
1113         k, happy, n, segment_size = params
1114         # stash these for URI generation later
1115         self._needed_shares = k
1116         self._total_shares = n
1117         self._segment_size = segment_size
1118
1119     def _contact_helper(self, res):
1120         now = self._time_contacting_helper_start = time.time()
1121         self._storage_index_elapsed = now - self._started
1122         self.log(format="contacting helper for SI %(si)s..",
1123                  si=si_b2a(self._storage_index))
1124         self._upload_status.set_status("Contacting Helper")
1125         d = self._helper.callRemote("upload_chk", self._storage_index)
1126         d.addCallback(self._contacted_helper)
1127         return d
1128
1129     def _contacted_helper(self, (upload_results, upload_helper)):
1130         now = time.time()
1131         elapsed = now - self._time_contacting_helper_start
1132         self._elapsed_time_contacting_helper = elapsed
1133         if upload_helper:
1134             self.log("helper says we need to upload")
1135             self._upload_status.set_status("Uploading Ciphertext")
1136             # we need to upload the file
1137             reu = RemoteEncryptedUploadable(self._encuploadable,
1138                                             self._upload_status)
1139             # let it pre-compute the size for progress purposes
1140             d = reu.get_size()
1141             d.addCallback(lambda ignored:
1142                           upload_helper.callRemote("upload", reu))
1143             # this Deferred will fire with the upload results
1144             return d
1145         self.log("helper says file is already uploaded")
1146         self._upload_status.set_progress(1, 1.0)
1147         self._upload_status.set_results(upload_results)
1148         return upload_results
1149
1150     def _convert_old_upload_results(self, upload_results):
1151         # pre-1.3.0 helpers return upload results which contain a mapping
1152         # from shnum to a single human-readable string, containing things
1153         # like "Found on [x],[y],[z]" (for healthy files that were already in
1154         # the grid), "Found on [x]" (for files that needed upload but which
1155         # discovered pre-existing shares), and "Placed on [x]" (for newly
1156         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1157         # set of binary serverid strings.
1158
1159         # the old results are too hard to deal with (they don't even contain
1160         # as much information as the new results, since the nodeids are
1161         # abbreviated), so if we detect old results, just clobber them.
1162
1163         sharemap = upload_results.sharemap
1164         if str in [type(v) for v in sharemap.values()]:
1165             upload_results.sharemap = None
1166
1167     def _build_verifycap(self, upload_results):
1168         self.log("upload finished, building readcap")
1169         self._convert_old_upload_results(upload_results)
1170         self._upload_status.set_status("Building Readcap")
1171         r = upload_results
1172         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1173         assert r.uri_extension_data["total_shares"] == self._total_shares
1174         assert r.uri_extension_data["segment_size"] == self._segment_size
1175         assert r.uri_extension_data["size"] == self._size
1176         r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1177                                              uri_extension_hash=r.uri_extension_hash,
1178                                              needed_shares=self._needed_shares,
1179                                              total_shares=self._total_shares, size=self._size
1180                                              ).to_string()
1181         now = time.time()
1182         r.file_size = self._size
1183         r.timings["storage_index"] = self._storage_index_elapsed
1184         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1185         if "total" in r.timings:
1186             r.timings["helper_total"] = r.timings["total"]
1187         r.timings["total"] = now - self._started
1188         self._upload_status.set_status("Finished")
1189         self._upload_status.set_results(r)
1190         return r
1191
1192     def get_upload_status(self):
1193         return self._upload_status
1194
1195 class BaseUploadable:
1196     default_max_segment_size = 128*KiB # overridden by max_segment_size
1197     default_encoding_param_k = 3 # overridden by encoding_parameters
1198     default_encoding_param_happy = 7
1199     default_encoding_param_n = 10
1200
1201     max_segment_size = None
1202     encoding_param_k = None
1203     encoding_param_happy = None
1204     encoding_param_n = None
1205
1206     _all_encoding_parameters = None
1207     _status = None
1208
1209     def set_upload_status(self, upload_status):
1210         self._status = IUploadStatus(upload_status)
1211
1212     def set_default_encoding_parameters(self, default_params):
1213         assert isinstance(default_params, dict)
1214         for k,v in default_params.items():
1215             precondition(isinstance(k, str), k, v)
1216             precondition(isinstance(v, int), k, v)
1217         if "k" in default_params:
1218             self.default_encoding_param_k = default_params["k"]
1219         if "happy" in default_params:
1220             self.default_encoding_param_happy = default_params["happy"]
1221         if "n" in default_params:
1222             self.default_encoding_param_n = default_params["n"]
1223         if "max_segment_size" in default_params:
1224             self.default_max_segment_size = default_params["max_segment_size"]
1225
1226     def get_all_encoding_parameters(self):
1227         if self._all_encoding_parameters:
1228             return defer.succeed(self._all_encoding_parameters)
1229
1230         max_segsize = self.max_segment_size or self.default_max_segment_size
1231         k = self.encoding_param_k or self.default_encoding_param_k
1232         happy = self.encoding_param_happy or self.default_encoding_param_happy
1233         n = self.encoding_param_n or self.default_encoding_param_n
1234
1235         d = self.get_size()
1236         def _got_size(file_size):
1237             # for small files, shrink the segment size to avoid wasting space
1238             segsize = min(max_segsize, file_size)
1239             # this must be a multiple of 'required_shares'==k
1240             segsize = mathutil.next_multiple(segsize, k)
1241             encoding_parameters = (k, happy, n, segsize)
1242             self._all_encoding_parameters = encoding_parameters
1243             return encoding_parameters
1244         d.addCallback(_got_size)
1245         return d
1246
1247 class FileHandle(BaseUploadable):
1248     implements(IUploadable)
1249
1250     def __init__(self, filehandle, convergence):
1251         """
1252         Upload the data from the filehandle.  If convergence is None then a
1253         random encryption key will be used, else the plaintext will be hashed,
1254         then the hash will be hashed together with the string in the
1255         "convergence" argument to form the encryption key.
1256         """
1257         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1258         self._filehandle = filehandle
1259         self._key = None
1260         self.convergence = convergence
1261         self._size = None
1262
1263     def _get_encryption_key_convergent(self):
1264         if self._key is not None:
1265             return defer.succeed(self._key)
1266
1267         d = self.get_size()
1268         # that sets self._size as a side-effect
1269         d.addCallback(lambda size: self.get_all_encoding_parameters())
1270         def _got(params):
1271             k, happy, n, segsize = params
1272             f = self._filehandle
1273             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1274             f.seek(0)
1275             BLOCKSIZE = 64*1024
1276             bytes_read = 0
1277             while True:
1278                 data = f.read(BLOCKSIZE)
1279                 if not data:
1280                     break
1281                 enckey_hasher.update(data)
1282                 # TODO: setting progress in a non-yielding loop is kind of
1283                 # pointless, but I'm anticipating (perhaps prematurely) the
1284                 # day when we use a slowjob or twisted's CooperatorService to
1285                 # make this yield time to other jobs.
1286                 bytes_read += len(data)
1287                 if self._status:
1288                     self._status.set_progress(0, float(bytes_read)/self._size)
1289             f.seek(0)
1290             self._key = enckey_hasher.digest()
1291             if self._status:
1292                 self._status.set_progress(0, 1.0)
1293             assert len(self._key) == 16
1294             return self._key
1295         d.addCallback(_got)
1296         return d
1297
1298     def _get_encryption_key_random(self):
1299         if self._key is None:
1300             self._key = os.urandom(16)
1301         return defer.succeed(self._key)
1302
1303     def get_encryption_key(self):
1304         if self.convergence is not None:
1305             return self._get_encryption_key_convergent()
1306         else:
1307             return self._get_encryption_key_random()
1308
1309     def get_size(self):
1310         if self._size is not None:
1311             return defer.succeed(self._size)
1312         self._filehandle.seek(0,2)
1313         size = self._filehandle.tell()
1314         self._size = size
1315         self._filehandle.seek(0)
1316         return defer.succeed(size)
1317
1318     def read(self, length):
1319         return defer.succeed([self._filehandle.read(length)])
1320
1321     def close(self):
1322         # the originator of the filehandle reserves the right to close it
1323         pass
1324
1325 class FileName(FileHandle):
1326     def __init__(self, filename, convergence):
1327         """
1328         Upload the data from the filename.  If convergence is None then a
1329         random encryption key will be used, else the plaintext will be hashed,
1330         then the hash will be hashed together with the string in the
1331         "convergence" argument to form the encryption key.
1332         """
1333         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1334         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1335     def close(self):
1336         FileHandle.close(self)
1337         self._filehandle.close()
1338
1339 class Data(FileHandle):
1340     def __init__(self, data, convergence):
1341         """
1342         Upload the data from the data argument.  If convergence is None then a
1343         random encryption key will be used, else the plaintext will be hashed,
1344         then the hash will be hashed together with the string in the
1345         "convergence" argument to form the encryption key.
1346         """
1347         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1348         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1349
1350 class Uploader(service.MultiService, log.PrefixingLogMixin):
1351     """I am a service that allows file uploading. I am a service-child of the
1352     Client.
1353     """
1354     implements(IUploader)
1355     name = "uploader"
1356     URI_LIT_SIZE_THRESHOLD = 55
1357
1358     def __init__(self, helper_furl=None, stats_provider=None):
1359         self._helper_furl = helper_furl
1360         self.stats_provider = stats_provider
1361         self._helper = None
1362         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1363         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1364         service.MultiService.__init__(self)
1365
1366     def startService(self):
1367         service.MultiService.startService(self)
1368         if self._helper_furl:
1369             self.parent.tub.connectTo(self._helper_furl,
1370                                       self._got_helper)
1371
1372     def _got_helper(self, helper):
1373         self.log("got helper connection, getting versions")
1374         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1375                     { },
1376                     "application-version": "unknown: no get_version()",
1377                     }
1378         d = add_version_to_remote_reference(helper, default)
1379         d.addCallback(self._got_versioned_helper)
1380
1381     def _got_versioned_helper(self, helper):
1382         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1383         if needed not in helper.version:
1384             raise InsufficientVersionError(needed, helper.version)
1385         self._helper = helper
1386         helper.notifyOnDisconnect(self._lost_helper)
1387
1388     def _lost_helper(self):
1389         self._helper = None
1390
1391     def get_helper_info(self):
1392         # return a tuple of (helper_furl_or_None, connected_bool)
1393         return (self._helper_furl, bool(self._helper))
1394
1395
1396     def upload(self, uploadable, history=None):
1397         """
1398         Returns a Deferred that will fire with the UploadResults instance.
1399         """
1400         assert self.parent
1401         assert self.running
1402
1403         uploadable = IUploadable(uploadable)
1404         d = uploadable.get_size()
1405         def _got_size(size):
1406             default_params = self.parent.get_encoding_parameters()
1407             precondition(isinstance(default_params, dict), default_params)
1408             precondition("max_segment_size" in default_params, default_params)
1409             uploadable.set_default_encoding_parameters(default_params)
1410
1411             if self.stats_provider:
1412                 self.stats_provider.count('uploader.files_uploaded', 1)
1413                 self.stats_provider.count('uploader.bytes_uploaded', size)
1414
1415             if size <= self.URI_LIT_SIZE_THRESHOLD:
1416                 uploader = LiteralUploader()
1417                 return uploader.start(uploadable)
1418             else:
1419                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1420                 d2 = defer.succeed(None)
1421                 if self._helper:
1422                     uploader = AssistedUploader(self._helper)
1423                     d2.addCallback(lambda x: eu.get_storage_index())
1424                     d2.addCallback(lambda si: uploader.start(eu, si))
1425                 else:
1426                     storage_broker = self.parent.get_storage_broker()
1427                     secret_holder = self.parent._secret_holder
1428                     uploader = CHKUploader(storage_broker, secret_holder)
1429                     d2.addCallback(lambda x: uploader.start(eu))
1430
1431                 self._all_uploads[uploader] = None
1432                 if history:
1433                     history.add_upload(uploader.get_upload_status())
1434                 def turn_verifycap_into_read_cap(uploadresults):
1435                     # Generate the uri from the verifycap plus the key.
1436                     d3 = uploadable.get_encryption_key()
1437                     def put_readcap_into_results(key):
1438                         v = uri.from_string(uploadresults.verifycapstr)
1439                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1440                         uploadresults.uri = r.to_string()
1441                         return uploadresults
1442                     d3.addCallback(put_readcap_into_results)
1443                     return d3
1444                 d2.addCallback(turn_verifycap_into_read_cap)
1445                 return d2
1446         d.addCallback(_got_size)
1447         def _done(res):
1448             uploadable.close()
1449             return res
1450         d.addBoth(_done)
1451         return d