]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
Remove a comment that no longer makes sense.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17                                          shares_by_server, merge_peers, \
18                                          failure_message
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23      NoServersError, InsufficientVersionError, UploadUnhappinessError
24 from allmydata.immutable import layout
25 from pycryptopp.cipher.aes import AES
26
27 from cStringIO import StringIO
28
29
30 KiB=1024
31 MiB=1024*KiB
32 GiB=1024*MiB
33 TiB=1024*GiB
34 PiB=1024*TiB
35
36 class HaveAllPeersError(Exception):
37     # we use this to jump out of the loop
38     pass
39
40 # this wants to live in storage, not here
41 class TooFullError(Exception):
42     pass
43
44 class UploadResults(Copyable, RemoteCopy):
45     implements(IUploadResults)
46     # note: don't change this string, it needs to match the value used on the
47     # helper, and it does *not* need to match the fully-qualified
48     # package/module/class name
49     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
50     copytype = typeToCopy
51
52     # also, think twice about changing the shape of any existing attribute,
53     # because instances of this class are sent from the helper to its client,
54     # so changing this may break compatibility. Consider adding new fields
55     # instead of modifying existing ones.
56
57     def __init__(self):
58         self.timings = {} # dict of name to number of seconds
59         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
60         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
61         self.file_size = None
62         self.ciphertext_fetched = None # how much the helper fetched
63         self.uri = None
64         self.preexisting_shares = None # count of shares already present
65         self.pushed_shares = None # count of shares we pushed
66
67
68 # our current uri_extension is 846 bytes for small files, a few bytes
69 # more for larger ones (since the filesize is encoded in decimal in a
70 # few places). Ask for a little bit more just in case we need it. If
71 # the extension changes size, we can change EXTENSION_SIZE to
72 # allocate a more accurate amount of space.
73 EXTENSION_SIZE = 1000
74 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
75 # this.
76
77 class PeerTracker:
78     def __init__(self, peerid, storage_server,
79                  sharesize, blocksize, num_segments, num_share_hashes,
80                  storage_index,
81                  bucket_renewal_secret, bucket_cancel_secret):
82         precondition(isinstance(peerid, str), peerid)
83         precondition(len(peerid) == 20, peerid)
84         self.peerid = peerid
85         self._storageserver = storage_server # to an RIStorageServer
86         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
87         self.sharesize = sharesize
88
89         wbp = layout.make_write_bucket_proxy(None, sharesize,
90                                              blocksize, num_segments,
91                                              num_share_hashes,
92                                              EXTENSION_SIZE, peerid)
93         self.wbp_class = wbp.__class__ # to create more of them
94         self.allocated_size = wbp.get_allocated_size()
95         self.blocksize = blocksize
96         self.num_segments = num_segments
97         self.num_share_hashes = num_share_hashes
98         self.storage_index = storage_index
99
100         self.renew_secret = bucket_renewal_secret
101         self.cancel_secret = bucket_cancel_secret
102
103     def __repr__(self):
104         return ("<PeerTracker for peer %s and SI %s>"
105                 % (idlib.shortnodeid_b2a(self.peerid),
106                    si_b2a(self.storage_index)[:5]))
107
108     def query(self, sharenums):
109         d = self._storageserver.callRemote("allocate_buckets",
110                                            self.storage_index,
111                                            self.renew_secret,
112                                            self.cancel_secret,
113                                            sharenums,
114                                            self.allocated_size,
115                                            canary=Referenceable())
116         d.addCallback(self._got_reply)
117         return d
118
119     def ask_about_existing_shares(self):
120         return self._storageserver.callRemote("get_buckets",
121                                               self.storage_index)
122
123     def _got_reply(self, (alreadygot, buckets)):
124         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
125         b = {}
126         for sharenum, rref in buckets.iteritems():
127             bp = self.wbp_class(rref, self.sharesize,
128                                 self.blocksize,
129                                 self.num_segments,
130                                 self.num_share_hashes,
131                                 EXTENSION_SIZE,
132                                 self.peerid)
133             b[sharenum] = bp
134         self.buckets.update(b)
135         return (alreadygot, set(b.keys()))
136
137
138 class Tahoe2PeerSelector:
139
140     def __init__(self, upload_id, logparent=None, upload_status=None):
141         self.upload_id = upload_id
142         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
143         # Peers that are working normally, but full.
144         self.full_count = 0
145         self.error_count = 0
146         self.num_peers_contacted = 0
147         self.last_failure_msg = None
148         self._status = IUploadStatus(upload_status)
149         self._log_parent = log.msg("%s starting" % self, parent=logparent)
150
151     def __repr__(self):
152         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
153
154     def get_shareholders(self, storage_broker, secret_holder,
155                          storage_index, share_size, block_size,
156                          num_segments, total_shares, needed_shares,
157                          servers_of_happiness):
158         """
159         @return: (used_peers, already_peers), where used_peers is a set of
160                  PeerTracker instances that have agreed to hold some shares
161                  for us (the shnum is stashed inside the PeerTracker),
162                  and already_peers is a dict mapping shnum to a set of peers
163                  which claim to already have the share.
164         """
165
166         if self._status:
167             self._status.set_status("Contacting Peers..")
168
169         self.total_shares = total_shares
170         self.servers_of_happiness = servers_of_happiness
171         self.needed_shares = needed_shares
172
173         self.homeless_shares = range(total_shares)
174         self.contacted_peers = [] # peers worth asking again
175         self.contacted_peers2 = [] # peers that we have asked again
176         self._started_second_pass = False
177         self.use_peers = set() # PeerTrackers that have shares assigned to them
178         self.preexisting_shares = {} # shareid => set(peerids) holding shareid
179         # We don't try to allocate shares to these servers, since they've said
180         # that they're incapable of storing shares of the size that we'd want
181         # to store. We keep them around because they may have existing shares
182         # for this storage index, which we want to know about for accurate
183         # servers_of_happiness accounting
184         # (this is eventually a list, but it is initialized later)
185         self.readonly_peers = None
186         # These peers have shares -- any shares -- for our SI. We keep
187         # track of these to write an error message with them later.
188         self.peers_with_shares = set()
189
190         # this needed_hashes computation should mirror
191         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
192         # (instead of a HashTree) because we don't require actual hashing
193         # just to count the levels.
194         ht = hashtree.IncompleteHashTree(total_shares)
195         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
196
197         # figure out how much space to ask for
198         wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
199                                              num_share_hashes, EXTENSION_SIZE,
200                                              None)
201         allocated_size = wbp.get_allocated_size()
202         all_peers = storage_broker.get_servers_for_index(storage_index)
203         if not all_peers:
204             raise NoServersError("client gave us zero peers")
205
206         # filter the list of peers according to which ones can accomodate
207         # this request. This excludes older peers (which used a 4-byte size
208         # field) from getting large shares (for files larger than about
209         # 12GiB). See #439 for details.
210         def _get_maxsize(peer):
211             (peerid, conn) = peer
212             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
213             return v1["maximum-immutable-share-size"]
214         writable_peers = [peer for peer in all_peers
215                           if _get_maxsize(peer) >= allocated_size]
216         readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers)
217
218         # decide upon the renewal/cancel secrets, to include them in the
219         # allocate_buckets query.
220         client_renewal_secret = secret_holder.get_renewal_secret()
221         client_cancel_secret = secret_holder.get_cancel_secret()
222
223         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
224                                                        storage_index)
225         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
226                                                      storage_index)
227         def _make_trackers(peers):
228            return [PeerTracker(peerid, conn,
229                                share_size, block_size,
230                                num_segments, num_share_hashes,
231                                storage_index,
232                                bucket_renewal_secret_hash(file_renewal_secret,
233                                                           peerid),
234                                bucket_cancel_secret_hash(file_cancel_secret,
235                                                          peerid))
236                     for (peerid, conn) in peers]
237         self.uncontacted_peers = _make_trackers(writable_peers)
238         self.readonly_peers = _make_trackers(readonly_peers)
239         # We now ask peers that can't hold any new shares about existing
240         # shares that they might have for our SI. Once this is done, we
241         # start placing the shares that we haven't already accounted
242         # for.
243         ds = []
244         if self._status and self.readonly_peers:
245             self._status.set_status("Contacting readonly peers to find "
246                                     "any existing shares")
247         for peer in self.readonly_peers:
248             assert isinstance(peer, PeerTracker)
249             d = peer.ask_about_existing_shares()
250             d.addBoth(self._handle_existing_response, peer.peerid)
251             ds.append(d)
252             self.num_peers_contacted += 1
253             self.query_count += 1
254             log.msg("asking peer %s for any existing shares for "
255                     "upload id %s"
256                     % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
257                     level=log.NOISY, parent=self._log_parent)
258         dl = defer.DeferredList(ds)
259         dl.addCallback(lambda ign: self._loop())
260         return dl
261
262
263     def _handle_existing_response(self, res, peer):
264         """
265         I handle responses to the queries sent by
266         Tahoe2PeerSelector._existing_shares.
267         """
268         if isinstance(res, failure.Failure):
269             log.msg("%s got error during existing shares check: %s"
270                     % (idlib.shortnodeid_b2a(peer), res),
271                     level=log.UNUSUAL, parent=self._log_parent)
272             self.error_count += 1
273             self.bad_query_count += 1
274         else:
275             buckets = res
276             if buckets:
277                 self.peers_with_shares.add(peer)
278             log.msg("response from peer %s: alreadygot=%s"
279                     % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
280                     level=log.NOISY, parent=self._log_parent)
281             for bucket in buckets:
282                 self.preexisting_shares.setdefault(bucket, set()).add(peer)
283                 if self.homeless_shares and bucket in self.homeless_shares:
284                     self.homeless_shares.remove(bucket)
285             self.full_count += 1
286             self.bad_query_count += 1
287
288
289     def _get_progress_message(self):
290         if not self.homeless_shares:
291             msg = "placed all %d shares, " % (self.total_shares)
292         else:
293             msg = ("placed %d shares out of %d total (%d homeless), " %
294                    (self.total_shares - len(self.homeless_shares),
295                     self.total_shares,
296                     len(self.homeless_shares)))
297         return (msg + "want to place shares on at least %d servers such that "
298                       "any %d of them have enough shares to recover the file, "
299                       "sent %d queries to %d peers, "
300                       "%d queries placed some shares, %d placed none "
301                       "(of which %d placed none due to the server being"
302                       " full and %d placed none due to an error)" %
303                         (self.servers_of_happiness, self.needed_shares,
304                          self.query_count, self.num_peers_contacted,
305                          self.good_query_count, self.bad_query_count,
306                          self.full_count, self.error_count))
307
308
309     def _loop(self):
310         if not self.homeless_shares:
311             merged = merge_peers(self.preexisting_shares, self.use_peers)
312             effective_happiness = servers_of_happiness(merged)
313             if self.servers_of_happiness <= effective_happiness:
314                 msg = ("peer selection successful for %s: %s" % (self,
315                             self._get_progress_message()))
316                 log.msg(msg, parent=self._log_parent)
317                 return (self.use_peers, self.preexisting_shares)
318             else:
319                 # We're not okay right now, but maybe we can fix it by
320                 # redistributing some shares. In cases where one or two
321                 # servers has, before the upload, all or most of the
322                 # shares for a given SI, this can work by allowing _loop
323                 # a chance to spread those out over the other peers, 
324                 delta = self.servers_of_happiness - effective_happiness
325                 shares = shares_by_server(self.preexisting_shares)
326                 # Each server in shares maps to a set of shares stored on it.
327                 # Since we want to keep at least one share on each server 
328                 # that has one (otherwise we'd only be making
329                 # the situation worse by removing distinct servers),
330                 # each server has len(its shares) - 1 to spread around.
331                 shares_to_spread = sum([len(list(sharelist)) - 1
332                                         for (server, sharelist)
333                                         in shares.items()])
334                 if delta <= len(self.uncontacted_peers) and \
335                    shares_to_spread >= delta:
336                     items = shares.items()
337                     while len(self.homeless_shares) < delta:
338                         # Loop through the allocated shares, removing
339                         # one from each server that has more than one
340                         # and putting it back into self.homeless_shares
341                         # until we've done this delta times.
342                         server, sharelist = items.pop()
343                         if len(sharelist) > 1:
344                             share = sharelist.pop()
345                             self.homeless_shares.append(share)
346                             self.preexisting_shares[share].remove(server)
347                             if not self.preexisting_shares[share]:
348                                 del self.preexisting_shares[share]
349                             items.append((server, sharelist))
350                     return self._loop()
351                 else:
352                     # Redistribution won't help us; fail.
353                     peer_count = len(self.peers_with_shares)
354                     msg = failure_message(peer_count,
355                                           self.needed_shares,
356                                           self.servers_of_happiness,
357                                           effective_happiness)
358                     raise UploadUnhappinessError("%s (%s)" % (msg,
359                                                  self._get_progress_message()))
360
361         if self.uncontacted_peers:
362             peer = self.uncontacted_peers.pop(0)
363             # TODO: don't pre-convert all peerids to PeerTrackers
364             assert isinstance(peer, PeerTracker)
365
366             shares_to_ask = set([self.homeless_shares.pop(0)])
367             self.query_count += 1
368             self.num_peers_contacted += 1
369             if self._status:
370                 self._status.set_status("Contacting Peers [%s] (first query),"
371                                         " %d shares left.."
372                                         % (idlib.shortnodeid_b2a(peer.peerid),
373                                            len(self.homeless_shares)))
374             d = peer.query(shares_to_ask)
375             d.addBoth(self._got_response, peer, shares_to_ask,
376                       self.contacted_peers)
377             return d
378         elif self.contacted_peers:
379             # ask a peer that we've already asked.
380             if not self._started_second_pass:
381                 log.msg("starting second pass", parent=self._log_parent,
382                         level=log.NOISY)
383                 self._started_second_pass = True
384             num_shares = mathutil.div_ceil(len(self.homeless_shares),
385                                            len(self.contacted_peers))
386             peer = self.contacted_peers.pop(0)
387             shares_to_ask = set(self.homeless_shares[:num_shares])
388             self.homeless_shares[:num_shares] = []
389             self.query_count += 1
390             if self._status:
391                 self._status.set_status("Contacting Peers [%s] (second query),"
392                                         " %d shares left.."
393                                         % (idlib.shortnodeid_b2a(peer.peerid),
394                                            len(self.homeless_shares)))
395             d = peer.query(shares_to_ask)
396             d.addBoth(self._got_response, peer, shares_to_ask,
397                       self.contacted_peers2)
398             return d
399         elif self.contacted_peers2:
400             # we've finished the second-or-later pass. Move all the remaining
401             # peers back into self.contacted_peers for the next pass.
402             self.contacted_peers.extend(self.contacted_peers2)
403             self.contacted_peers2[:] = []
404             return self._loop()
405         else:
406             # no more peers. If we haven't placed enough shares, we fail.
407             placed_shares = self.total_shares - len(self.homeless_shares)
408             merged = merge_peers(self.preexisting_shares, self.use_peers)
409             effective_happiness = servers_of_happiness(merged)
410             if effective_happiness < self.servers_of_happiness:
411                 msg = failure_message(len(self.peers_with_shares),
412                                       self.needed_shares,
413                                       self.servers_of_happiness,
414                                       effective_happiness)
415                 msg = ("peer selection failed for %s: %s (%s)" % (self,
416                                 msg,
417                                 self._get_progress_message()))
418                 if self.last_failure_msg:
419                     msg += " (%s)" % (self.last_failure_msg,)
420                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
421                 raise UploadUnhappinessError(msg)
422             else:
423                 # we placed enough to be happy, so we're done
424                 if self._status:
425                     self._status.set_status("Placed all shares")
426                 return (self.use_peers, self.preexisting_shares)
427
428     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
429         if isinstance(res, failure.Failure):
430             # This is unusual, and probably indicates a bug or a network
431             # problem.
432             log.msg("%s got error during peer selection: %s" % (peer, res),
433                     level=log.UNUSUAL, parent=self._log_parent)
434             self.error_count += 1
435             self.bad_query_count += 1
436             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
437             if (self.uncontacted_peers
438                 or self.contacted_peers
439                 or self.contacted_peers2):
440                 # there is still hope, so just loop
441                 pass
442             else:
443                 # No more peers, so this upload might fail (it depends upon
444                 # whether we've hit servers_of_happiness or not). Log the last
445                 # failure we got: if a coding error causes all peers to fail
446                 # in the same way, this allows the common failure to be seen
447                 # by the uploader and should help with debugging
448                 msg = ("last failure (from %s) was: %s" % (peer, res))
449                 self.last_failure_msg = msg
450         else:
451             (alreadygot, allocated) = res
452             log.msg("response from peer %s: alreadygot=%s, allocated=%s"
453                     % (idlib.shortnodeid_b2a(peer.peerid),
454                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
455                     level=log.NOISY, parent=self._log_parent)
456             progress = False
457             for s in alreadygot:
458                 self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
459                 if s in self.homeless_shares:
460                     self.homeless_shares.remove(s)
461                     progress = True
462                 elif s in shares_to_ask:
463                     progress = True
464
465             # the PeerTracker will remember which shares were allocated on
466             # that peer. We just have to remember to use them.
467             if allocated:
468                 self.use_peers.add(peer)
469                 progress = True
470
471             if allocated or alreadygot:
472                 self.peers_with_shares.add(peer.peerid)
473
474             not_yet_present = set(shares_to_ask) - set(alreadygot)
475             still_homeless = not_yet_present - set(allocated)
476
477             if progress:
478                 # They accepted at least one of the shares that we asked
479                 # them to accept, or they had a share that we didn't ask
480                 # them to accept but that we hadn't placed yet, so this
481                 # was a productive query
482                 self.good_query_count += 1
483             else:
484                 self.bad_query_count += 1
485                 self.full_count += 1
486
487             if still_homeless:
488                 # In networks with lots of space, this is very unusual and
489                 # probably indicates an error. In networks with peers that
490                 # are full, it is merely unusual. In networks that are very
491                 # full, it is common, and many uploads will fail. In most
492                 # cases, this is obviously not fatal, and we'll just use some
493                 # other peers.
494
495                 # some shares are still homeless, keep trying to find them a
496                 # home. The ones that were rejected get first priority.
497                 self.homeless_shares = (list(still_homeless)
498                                         + self.homeless_shares)
499                 # Since they were unable to accept all of our requests, so it
500                 # is safe to assume that asking them again won't help.
501             else:
502                 # if they *were* able to accept everything, they might be
503                 # willing to accept even more.
504                 put_peer_here.append(peer)
505
506         # now loop
507         return self._loop()
508
509
510 class EncryptAnUploadable:
511     """This is a wrapper that takes an IUploadable and provides
512     IEncryptedUploadable."""
513     implements(IEncryptedUploadable)
514     CHUNKSIZE = 50*1024
515
516     def __init__(self, original, log_parent=None):
517         self.original = IUploadable(original)
518         self._log_number = log_parent
519         self._encryptor = None
520         self._plaintext_hasher = plaintext_hasher()
521         self._plaintext_segment_hasher = None
522         self._plaintext_segment_hashes = []
523         self._encoding_parameters = None
524         self._file_size = None
525         self._ciphertext_bytes_read = 0
526         self._status = None
527
528     def set_upload_status(self, upload_status):
529         self._status = IUploadStatus(upload_status)
530         self.original.set_upload_status(upload_status)
531
532     def log(self, *args, **kwargs):
533         if "facility" not in kwargs:
534             kwargs["facility"] = "upload.encryption"
535         if "parent" not in kwargs:
536             kwargs["parent"] = self._log_number
537         return log.msg(*args, **kwargs)
538
539     def get_size(self):
540         if self._file_size is not None:
541             return defer.succeed(self._file_size)
542         d = self.original.get_size()
543         def _got_size(size):
544             self._file_size = size
545             if self._status:
546                 self._status.set_size(size)
547             return size
548         d.addCallback(_got_size)
549         return d
550
551     def get_all_encoding_parameters(self):
552         if self._encoding_parameters is not None:
553             return defer.succeed(self._encoding_parameters)
554         d = self.original.get_all_encoding_parameters()
555         def _got(encoding_parameters):
556             (k, happy, n, segsize) = encoding_parameters
557             self._segment_size = segsize # used by segment hashers
558             self._encoding_parameters = encoding_parameters
559             self.log("my encoding parameters: %s" % (encoding_parameters,),
560                      level=log.NOISY)
561             return encoding_parameters
562         d.addCallback(_got)
563         return d
564
565     def _get_encryptor(self):
566         if self._encryptor:
567             return defer.succeed(self._encryptor)
568
569         d = self.original.get_encryption_key()
570         def _got(key):
571             e = AES(key)
572             self._encryptor = e
573
574             storage_index = storage_index_hash(key)
575             assert isinstance(storage_index, str)
576             # There's no point to having the SI be longer than the key, so we
577             # specify that it is truncated to the same 128 bits as the AES key.
578             assert len(storage_index) == 16  # SHA-256 truncated to 128b
579             self._storage_index = storage_index
580             if self._status:
581                 self._status.set_storage_index(storage_index)
582             return e
583         d.addCallback(_got)
584         return d
585
586     def get_storage_index(self):
587         d = self._get_encryptor()
588         d.addCallback(lambda res: self._storage_index)
589         return d
590
591     def _get_segment_hasher(self):
592         p = self._plaintext_segment_hasher
593         if p:
594             left = self._segment_size - self._plaintext_segment_hashed_bytes
595             return p, left
596         p = plaintext_segment_hasher()
597         self._plaintext_segment_hasher = p
598         self._plaintext_segment_hashed_bytes = 0
599         return p, self._segment_size
600
601     def _update_segment_hash(self, chunk):
602         offset = 0
603         while offset < len(chunk):
604             p, segment_left = self._get_segment_hasher()
605             chunk_left = len(chunk) - offset
606             this_segment = min(chunk_left, segment_left)
607             p.update(chunk[offset:offset+this_segment])
608             self._plaintext_segment_hashed_bytes += this_segment
609
610             if self._plaintext_segment_hashed_bytes == self._segment_size:
611                 # we've filled this segment
612                 self._plaintext_segment_hashes.append(p.digest())
613                 self._plaintext_segment_hasher = None
614                 self.log("closed hash [%d]: %dB" %
615                          (len(self._plaintext_segment_hashes)-1,
616                           self._plaintext_segment_hashed_bytes),
617                          level=log.NOISY)
618                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
619                          segnum=len(self._plaintext_segment_hashes)-1,
620                          hash=base32.b2a(p.digest()),
621                          level=log.NOISY)
622
623             offset += this_segment
624
625
626     def read_encrypted(self, length, hash_only):
627         # make sure our parameters have been set up first
628         d = self.get_all_encoding_parameters()
629         # and size
630         d.addCallback(lambda ignored: self.get_size())
631         d.addCallback(lambda ignored: self._get_encryptor())
632         # then fetch and encrypt the plaintext. The unusual structure here
633         # (passing a Deferred *into* a function) is needed to avoid
634         # overflowing the stack: Deferreds don't optimize out tail recursion.
635         # We also pass in a list, to which _read_encrypted will append
636         # ciphertext.
637         ciphertext = []
638         d2 = defer.Deferred()
639         d.addCallback(lambda ignored:
640                       self._read_encrypted(length, ciphertext, hash_only, d2))
641         d.addCallback(lambda ignored: d2)
642         return d
643
644     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
645         if not remaining:
646             fire_when_done.callback(ciphertext)
647             return None
648         # tolerate large length= values without consuming a lot of RAM by
649         # reading just a chunk (say 50kB) at a time. This only really matters
650         # when hash_only==True (i.e. resuming an interrupted upload), since
651         # that's the case where we will be skipping over a lot of data.
652         size = min(remaining, self.CHUNKSIZE)
653         remaining = remaining - size
654         # read a chunk of plaintext..
655         d = defer.maybeDeferred(self.original.read, size)
656         # N.B.: if read() is synchronous, then since everything else is
657         # actually synchronous too, we'd blow the stack unless we stall for a
658         # tick. Once you accept a Deferred from IUploadable.read(), you must
659         # be prepared to have it fire immediately too.
660         d.addCallback(fireEventually)
661         def _good(plaintext):
662             # and encrypt it..
663             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
664             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
665             ciphertext.extend(ct)
666             self._read_encrypted(remaining, ciphertext, hash_only,
667                                  fire_when_done)
668         def _err(why):
669             fire_when_done.errback(why)
670         d.addCallback(_good)
671         d.addErrback(_err)
672         return None
673
674     def _hash_and_encrypt_plaintext(self, data, hash_only):
675         assert isinstance(data, (tuple, list)), type(data)
676         data = list(data)
677         cryptdata = []
678         # we use data.pop(0) instead of 'for chunk in data' to save
679         # memory: each chunk is destroyed as soon as we're done with it.
680         bytes_processed = 0
681         while data:
682             chunk = data.pop(0)
683             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
684                      level=log.NOISY)
685             bytes_processed += len(chunk)
686             self._plaintext_hasher.update(chunk)
687             self._update_segment_hash(chunk)
688             # TODO: we have to encrypt the data (even if hash_only==True)
689             # because pycryptopp's AES-CTR implementation doesn't offer a
690             # way to change the counter value. Once pycryptopp acquires
691             # this ability, change this to simply update the counter
692             # before each call to (hash_only==False) _encryptor.process()
693             ciphertext = self._encryptor.process(chunk)
694             if hash_only:
695                 self.log("  skipping encryption", level=log.NOISY)
696             else:
697                 cryptdata.append(ciphertext)
698             del ciphertext
699             del chunk
700         self._ciphertext_bytes_read += bytes_processed
701         if self._status:
702             progress = float(self._ciphertext_bytes_read) / self._file_size
703             self._status.set_progress(1, progress)
704         return cryptdata
705
706
707     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
708         # this is currently unused, but will live again when we fix #453
709         if len(self._plaintext_segment_hashes) < num_segments:
710             # close out the last one
711             assert len(self._plaintext_segment_hashes) == num_segments-1
712             p, segment_left = self._get_segment_hasher()
713             self._plaintext_segment_hashes.append(p.digest())
714             del self._plaintext_segment_hasher
715             self.log("closing plaintext leaf hasher, hashed %d bytes" %
716                      self._plaintext_segment_hashed_bytes,
717                      level=log.NOISY)
718             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
719                      segnum=len(self._plaintext_segment_hashes)-1,
720                      hash=base32.b2a(p.digest()),
721                      level=log.NOISY)
722         assert len(self._plaintext_segment_hashes) == num_segments
723         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
724
725     def get_plaintext_hash(self):
726         h = self._plaintext_hasher.digest()
727         return defer.succeed(h)
728
729     def close(self):
730         return self.original.close()
731
732 class UploadStatus:
733     implements(IUploadStatus)
734     statusid_counter = itertools.count(0)
735
736     def __init__(self):
737         self.storage_index = None
738         self.size = None
739         self.helper = False
740         self.status = "Not started"
741         self.progress = [0.0, 0.0, 0.0]
742         self.active = True
743         self.results = None
744         self.counter = self.statusid_counter.next()
745         self.started = time.time()
746
747     def get_started(self):
748         return self.started
749     def get_storage_index(self):
750         return self.storage_index
751     def get_size(self):
752         return self.size
753     def using_helper(self):
754         return self.helper
755     def get_status(self):
756         return self.status
757     def get_progress(self):
758         return tuple(self.progress)
759     def get_active(self):
760         return self.active
761     def get_results(self):
762         return self.results
763     def get_counter(self):
764         return self.counter
765
766     def set_storage_index(self, si):
767         self.storage_index = si
768     def set_size(self, size):
769         self.size = size
770     def set_helper(self, helper):
771         self.helper = helper
772     def set_status(self, status):
773         self.status = status
774     def set_progress(self, which, value):
775         # [0]: chk, [1]: ciphertext, [2]: encode+push
776         self.progress[which] = value
777     def set_active(self, value):
778         self.active = value
779     def set_results(self, value):
780         self.results = value
781
782 class CHKUploader:
783     peer_selector_class = Tahoe2PeerSelector
784
785     def __init__(self, storage_broker, secret_holder):
786         # peer_selector needs storage_broker and secret_holder
787         self._storage_broker = storage_broker
788         self._secret_holder = secret_holder
789         self._log_number = self.log("CHKUploader starting", parent=None)
790         self._encoder = None
791         self._results = UploadResults()
792         self._storage_index = None
793         self._upload_status = UploadStatus()
794         self._upload_status.set_helper(False)
795         self._upload_status.set_active(True)
796         self._upload_status.set_results(self._results)
797
798         # locate_all_shareholders() will create the following attribute:
799         # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
800
801     def log(self, *args, **kwargs):
802         if "parent" not in kwargs:
803             kwargs["parent"] = self._log_number
804         if "facility" not in kwargs:
805             kwargs["facility"] = "tahoe.upload"
806         return log.msg(*args, **kwargs)
807
808     def start(self, encrypted_uploadable):
809         """Start uploading the file.
810
811         Returns a Deferred that will fire with the UploadResults instance.
812         """
813
814         self._started = time.time()
815         eu = IEncryptedUploadable(encrypted_uploadable)
816         self.log("starting upload of %s" % eu)
817
818         eu.set_upload_status(self._upload_status)
819         d = self.start_encrypted(eu)
820         def _done(uploadresults):
821             self._upload_status.set_active(False)
822             return uploadresults
823         d.addBoth(_done)
824         return d
825
826     def abort(self):
827         """Call this if the upload must be abandoned before it completes.
828         This will tell the shareholders to delete their partial shares. I
829         return a Deferred that fires when these messages have been acked."""
830         if not self._encoder:
831             # how did you call abort() before calling start() ?
832             return defer.succeed(None)
833         return self._encoder.abort()
834
835     def start_encrypted(self, encrypted):
836         """ Returns a Deferred that will fire with the UploadResults instance. """
837         eu = IEncryptedUploadable(encrypted)
838
839         started = time.time()
840         self._encoder = e = encode.Encoder(self._log_number,
841                                            self._upload_status)
842         d = e.set_encrypted_uploadable(eu)
843         d.addCallback(self.locate_all_shareholders, started)
844         d.addCallback(self.set_shareholders, e)
845         d.addCallback(lambda res: e.start())
846         d.addCallback(self._encrypted_done)
847         return d
848
849     def locate_all_shareholders(self, encoder, started):
850         peer_selection_started = now = time.time()
851         self._storage_index_elapsed = now - started
852         storage_broker = self._storage_broker
853         secret_holder = self._secret_holder
854         storage_index = encoder.get_param("storage_index")
855         self._storage_index = storage_index
856         upload_id = si_b2a(storage_index)[:5]
857         self.log("using storage index %s" % upload_id)
858         peer_selector = self.peer_selector_class(upload_id, self._log_number,
859                                                  self._upload_status)
860
861         share_size = encoder.get_param("share_size")
862         block_size = encoder.get_param("block_size")
863         num_segments = encoder.get_param("num_segments")
864         k,desired,n = encoder.get_param("share_counts")
865
866         self._peer_selection_started = time.time()
867         d = peer_selector.get_shareholders(storage_broker, secret_holder,
868                                            storage_index,
869                                            share_size, block_size,
870                                            num_segments, n, k, desired)
871         def _done(res):
872             self._peer_selection_elapsed = time.time() - peer_selection_started
873             return res
874         d.addCallback(_done)
875         return d
876
877     def set_shareholders(self, (used_peers, already_peers), encoder):
878         """
879         @param used_peers: a sequence of PeerTracker objects
880         @paran already_peers: a dict mapping sharenum to a set of peerids
881                               that claim to already have this share
882         """
883         self.log("_send_shares, used_peers is %s" % (used_peers,))
884         # record already-present shares in self._results
885         self._results.preexisting_shares = len(already_peers)
886
887         self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
888         for peer in used_peers:
889             assert isinstance(peer, PeerTracker)
890         buckets = {}
891         servermap = already_peers.copy()
892         for peer in used_peers:
893             buckets.update(peer.buckets)
894             for shnum in peer.buckets:
895                 self._peer_trackers[shnum] = peer
896                 servermap.setdefault(shnum, set()).add(peer.peerid)
897         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
898         encoder.set_shareholders(buckets, servermap)
899
900     def _encrypted_done(self, verifycap):
901         """ Returns a Deferred that will fire with the UploadResults instance. """
902         r = self._results
903         for shnum in self._encoder.get_shares_placed():
904             peer_tracker = self._peer_trackers[shnum]
905             peerid = peer_tracker.peerid
906             r.sharemap.add(shnum, peerid)
907             r.servermap.add(peerid, shnum)
908         r.pushed_shares = len(self._encoder.get_shares_placed())
909         now = time.time()
910         r.file_size = self._encoder.file_size
911         r.timings["total"] = now - self._started
912         r.timings["storage_index"] = self._storage_index_elapsed
913         r.timings["peer_selection"] = self._peer_selection_elapsed
914         r.timings.update(self._encoder.get_times())
915         r.uri_extension_data = self._encoder.get_uri_extension_data()
916         r.verifycapstr = verifycap.to_string()
917         return r
918
919     def get_upload_status(self):
920         return self._upload_status
921
922 def read_this_many_bytes(uploadable, size, prepend_data=[]):
923     if size == 0:
924         return defer.succeed([])
925     d = uploadable.read(size)
926     def _got(data):
927         assert isinstance(data, list)
928         bytes = sum([len(piece) for piece in data])
929         assert bytes > 0
930         assert bytes <= size
931         remaining = size - bytes
932         if remaining:
933             return read_this_many_bytes(uploadable, remaining,
934                                         prepend_data + data)
935         return prepend_data + data
936     d.addCallback(_got)
937     return d
938
939 class LiteralUploader:
940
941     def __init__(self):
942         self._results = UploadResults()
943         self._status = s = UploadStatus()
944         s.set_storage_index(None)
945         s.set_helper(False)
946         s.set_progress(0, 1.0)
947         s.set_active(False)
948         s.set_results(self._results)
949
950     def start(self, uploadable):
951         uploadable = IUploadable(uploadable)
952         d = uploadable.get_size()
953         def _got_size(size):
954             self._size = size
955             self._status.set_size(size)
956             self._results.file_size = size
957             return read_this_many_bytes(uploadable, size)
958         d.addCallback(_got_size)
959         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
960         d.addCallback(lambda u: u.to_string())
961         d.addCallback(self._build_results)
962         return d
963
964     def _build_results(self, uri):
965         self._results.uri = uri
966         self._status.set_status("Finished")
967         self._status.set_progress(1, 1.0)
968         self._status.set_progress(2, 1.0)
969         return self._results
970
971     def close(self):
972         pass
973
974     def get_upload_status(self):
975         return self._status
976
977 class RemoteEncryptedUploadable(Referenceable):
978     implements(RIEncryptedUploadable)
979
980     def __init__(self, encrypted_uploadable, upload_status):
981         self._eu = IEncryptedUploadable(encrypted_uploadable)
982         self._offset = 0
983         self._bytes_sent = 0
984         self._status = IUploadStatus(upload_status)
985         # we are responsible for updating the status string while we run, and
986         # for setting the ciphertext-fetch progress.
987         self._size = None
988
989     def get_size(self):
990         if self._size is not None:
991             return defer.succeed(self._size)
992         d = self._eu.get_size()
993         def _got_size(size):
994             self._size = size
995             return size
996         d.addCallback(_got_size)
997         return d
998
999     def remote_get_size(self):
1000         return self.get_size()
1001     def remote_get_all_encoding_parameters(self):
1002         return self._eu.get_all_encoding_parameters()
1003
1004     def _read_encrypted(self, length, hash_only):
1005         d = self._eu.read_encrypted(length, hash_only)
1006         def _read(strings):
1007             if hash_only:
1008                 self._offset += length
1009             else:
1010                 size = sum([len(data) for data in strings])
1011                 self._offset += size
1012             return strings
1013         d.addCallback(_read)
1014         return d
1015
1016     def remote_read_encrypted(self, offset, length):
1017         # we don't support seek backwards, but we allow skipping forwards
1018         precondition(offset >= 0, offset)
1019         precondition(length >= 0, length)
1020         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1021                      level=log.NOISY)
1022         precondition(offset >= self._offset, offset, self._offset)
1023         if offset > self._offset:
1024             # read the data from disk anyways, to build up the hash tree
1025             skip = offset - self._offset
1026             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1027                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1028             d = self._read_encrypted(skip, hash_only=True)
1029         else:
1030             d = defer.succeed(None)
1031
1032         def _at_correct_offset(res):
1033             assert offset == self._offset, "%d != %d" % (offset, self._offset)
1034             return self._read_encrypted(length, hash_only=False)
1035         d.addCallback(_at_correct_offset)
1036
1037         def _read(strings):
1038             size = sum([len(data) for data in strings])
1039             self._bytes_sent += size
1040             return strings
1041         d.addCallback(_read)
1042         return d
1043
1044     def remote_close(self):
1045         return self._eu.close()
1046
1047
1048 class AssistedUploader:
1049
1050     def __init__(self, helper):
1051         self._helper = helper
1052         self._log_number = log.msg("AssistedUploader starting")
1053         self._storage_index = None
1054         self._upload_status = s = UploadStatus()
1055         s.set_helper(True)
1056         s.set_active(True)
1057
1058     def log(self, *args, **kwargs):
1059         if "parent" not in kwargs:
1060             kwargs["parent"] = self._log_number
1061         return log.msg(*args, **kwargs)
1062
1063     def start(self, encrypted_uploadable, storage_index):
1064         """Start uploading the file.
1065
1066         Returns a Deferred that will fire with the UploadResults instance.
1067         """
1068         precondition(isinstance(storage_index, str), storage_index)
1069         self._started = time.time()
1070         eu = IEncryptedUploadable(encrypted_uploadable)
1071         eu.set_upload_status(self._upload_status)
1072         self._encuploadable = eu
1073         self._storage_index = storage_index
1074         d = eu.get_size()
1075         d.addCallback(self._got_size)
1076         d.addCallback(lambda res: eu.get_all_encoding_parameters())
1077         d.addCallback(self._got_all_encoding_parameters)
1078         d.addCallback(self._contact_helper)
1079         d.addCallback(self._build_verifycap)
1080         def _done(res):
1081             self._upload_status.set_active(False)
1082             return res
1083         d.addBoth(_done)
1084         return d
1085
1086     def _got_size(self, size):
1087         self._size = size
1088         self._upload_status.set_size(size)
1089
1090     def _got_all_encoding_parameters(self, params):
1091         k, happy, n, segment_size = params
1092         # stash these for URI generation later
1093         self._needed_shares = k
1094         self._total_shares = n
1095         self._segment_size = segment_size
1096
1097     def _contact_helper(self, res):
1098         now = self._time_contacting_helper_start = time.time()
1099         self._storage_index_elapsed = now - self._started
1100         self.log(format="contacting helper for SI %(si)s..",
1101                  si=si_b2a(self._storage_index))
1102         self._upload_status.set_status("Contacting Helper")
1103         d = self._helper.callRemote("upload_chk", self._storage_index)
1104         d.addCallback(self._contacted_helper)
1105         return d
1106
1107     def _contacted_helper(self, (upload_results, upload_helper)):
1108         now = time.time()
1109         elapsed = now - self._time_contacting_helper_start
1110         self._elapsed_time_contacting_helper = elapsed
1111         if upload_helper:
1112             self.log("helper says we need to upload")
1113             self._upload_status.set_status("Uploading Ciphertext")
1114             # we need to upload the file
1115             reu = RemoteEncryptedUploadable(self._encuploadable,
1116                                             self._upload_status)
1117             # let it pre-compute the size for progress purposes
1118             d = reu.get_size()
1119             d.addCallback(lambda ignored:
1120                           upload_helper.callRemote("upload", reu))
1121             # this Deferred will fire with the upload results
1122             return d
1123         self.log("helper says file is already uploaded")
1124         self._upload_status.set_progress(1, 1.0)
1125         self._upload_status.set_results(upload_results)
1126         return upload_results
1127
1128     def _convert_old_upload_results(self, upload_results):
1129         # pre-1.3.0 helpers return upload results which contain a mapping
1130         # from shnum to a single human-readable string, containing things
1131         # like "Found on [x],[y],[z]" (for healthy files that were already in
1132         # the grid), "Found on [x]" (for files that needed upload but which
1133         # discovered pre-existing shares), and "Placed on [x]" (for newly
1134         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1135         # set of binary serverid strings.
1136
1137         # the old results are too hard to deal with (they don't even contain
1138         # as much information as the new results, since the nodeids are
1139         # abbreviated), so if we detect old results, just clobber them.
1140
1141         sharemap = upload_results.sharemap
1142         if str in [type(v) for v in sharemap.values()]:
1143             upload_results.sharemap = None
1144
1145     def _build_verifycap(self, upload_results):
1146         self.log("upload finished, building readcap")
1147         self._convert_old_upload_results(upload_results)
1148         self._upload_status.set_status("Building Readcap")
1149         r = upload_results
1150         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1151         assert r.uri_extension_data["total_shares"] == self._total_shares
1152         assert r.uri_extension_data["segment_size"] == self._segment_size
1153         assert r.uri_extension_data["size"] == self._size
1154         r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1155                                              uri_extension_hash=r.uri_extension_hash,
1156                                              needed_shares=self._needed_shares,
1157                                              total_shares=self._total_shares, size=self._size
1158                                              ).to_string()
1159         now = time.time()
1160         r.file_size = self._size
1161         r.timings["storage_index"] = self._storage_index_elapsed
1162         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1163         if "total" in r.timings:
1164             r.timings["helper_total"] = r.timings["total"]
1165         r.timings["total"] = now - self._started
1166         self._upload_status.set_status("Finished")
1167         self._upload_status.set_results(r)
1168         return r
1169
1170     def get_upload_status(self):
1171         return self._upload_status
1172
1173 class BaseUploadable:
1174     default_max_segment_size = 128*KiB # overridden by max_segment_size
1175     default_encoding_param_k = 3 # overridden by encoding_parameters
1176     default_encoding_param_happy = 7
1177     default_encoding_param_n = 10
1178
1179     max_segment_size = None
1180     encoding_param_k = None
1181     encoding_param_happy = None
1182     encoding_param_n = None
1183
1184     _all_encoding_parameters = None
1185     _status = None
1186
1187     def set_upload_status(self, upload_status):
1188         self._status = IUploadStatus(upload_status)
1189
1190     def set_default_encoding_parameters(self, default_params):
1191         assert isinstance(default_params, dict)
1192         for k,v in default_params.items():
1193             precondition(isinstance(k, str), k, v)
1194             precondition(isinstance(v, int), k, v)
1195         if "k" in default_params:
1196             self.default_encoding_param_k = default_params["k"]
1197         if "happy" in default_params:
1198             self.default_encoding_param_happy = default_params["happy"]
1199         if "n" in default_params:
1200             self.default_encoding_param_n = default_params["n"]
1201         if "max_segment_size" in default_params:
1202             self.default_max_segment_size = default_params["max_segment_size"]
1203
1204     def get_all_encoding_parameters(self):
1205         if self._all_encoding_parameters:
1206             return defer.succeed(self._all_encoding_parameters)
1207
1208         max_segsize = self.max_segment_size or self.default_max_segment_size
1209         k = self.encoding_param_k or self.default_encoding_param_k
1210         happy = self.encoding_param_happy or self.default_encoding_param_happy
1211         n = self.encoding_param_n or self.default_encoding_param_n
1212
1213         d = self.get_size()
1214         def _got_size(file_size):
1215             # for small files, shrink the segment size to avoid wasting space
1216             segsize = min(max_segsize, file_size)
1217             # this must be a multiple of 'required_shares'==k
1218             segsize = mathutil.next_multiple(segsize, k)
1219             encoding_parameters = (k, happy, n, segsize)
1220             self._all_encoding_parameters = encoding_parameters
1221             return encoding_parameters
1222         d.addCallback(_got_size)
1223         return d
1224
1225 class FileHandle(BaseUploadable):
1226     implements(IUploadable)
1227
1228     def __init__(self, filehandle, convergence):
1229         """
1230         Upload the data from the filehandle.  If convergence is None then a
1231         random encryption key will be used, else the plaintext will be hashed,
1232         then the hash will be hashed together with the string in the
1233         "convergence" argument to form the encryption key.
1234         """
1235         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1236         self._filehandle = filehandle
1237         self._key = None
1238         self.convergence = convergence
1239         self._size = None
1240
1241     def _get_encryption_key_convergent(self):
1242         if self._key is not None:
1243             return defer.succeed(self._key)
1244
1245         d = self.get_size()
1246         # that sets self._size as a side-effect
1247         d.addCallback(lambda size: self.get_all_encoding_parameters())
1248         def _got(params):
1249             k, happy, n, segsize = params
1250             f = self._filehandle
1251             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1252             f.seek(0)
1253             BLOCKSIZE = 64*1024
1254             bytes_read = 0
1255             while True:
1256                 data = f.read(BLOCKSIZE)
1257                 if not data:
1258                     break
1259                 enckey_hasher.update(data)
1260                 # TODO: setting progress in a non-yielding loop is kind of
1261                 # pointless, but I'm anticipating (perhaps prematurely) the
1262                 # day when we use a slowjob or twisted's CooperatorService to
1263                 # make this yield time to other jobs.
1264                 bytes_read += len(data)
1265                 if self._status:
1266                     self._status.set_progress(0, float(bytes_read)/self._size)
1267             f.seek(0)
1268             self._key = enckey_hasher.digest()
1269             if self._status:
1270                 self._status.set_progress(0, 1.0)
1271             assert len(self._key) == 16
1272             return self._key
1273         d.addCallback(_got)
1274         return d
1275
1276     def _get_encryption_key_random(self):
1277         if self._key is None:
1278             self._key = os.urandom(16)
1279         return defer.succeed(self._key)
1280
1281     def get_encryption_key(self):
1282         if self.convergence is not None:
1283             return self._get_encryption_key_convergent()
1284         else:
1285             return self._get_encryption_key_random()
1286
1287     def get_size(self):
1288         if self._size is not None:
1289             return defer.succeed(self._size)
1290         self._filehandle.seek(0,2)
1291         size = self._filehandle.tell()
1292         self._size = size
1293         self._filehandle.seek(0)
1294         return defer.succeed(size)
1295
1296     def read(self, length):
1297         return defer.succeed([self._filehandle.read(length)])
1298
1299     def close(self):
1300         # the originator of the filehandle reserves the right to close it
1301         pass
1302
1303 class FileName(FileHandle):
1304     def __init__(self, filename, convergence):
1305         """
1306         Upload the data from the filename.  If convergence is None then a
1307         random encryption key will be used, else the plaintext will be hashed,
1308         then the hash will be hashed together with the string in the
1309         "convergence" argument to form the encryption key.
1310         """
1311         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1312         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1313     def close(self):
1314         FileHandle.close(self)
1315         self._filehandle.close()
1316
1317 class Data(FileHandle):
1318     def __init__(self, data, convergence):
1319         """
1320         Upload the data from the data argument.  If convergence is None then a
1321         random encryption key will be used, else the plaintext will be hashed,
1322         then the hash will be hashed together with the string in the
1323         "convergence" argument to form the encryption key.
1324         """
1325         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1326         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1327
1328 class Uploader(service.MultiService, log.PrefixingLogMixin):
1329     """I am a service that allows file uploading. I am a service-child of the
1330     Client.
1331     """
1332     implements(IUploader)
1333     name = "uploader"
1334     URI_LIT_SIZE_THRESHOLD = 55
1335
1336     def __init__(self, helper_furl=None, stats_provider=None):
1337         self._helper_furl = helper_furl
1338         self.stats_provider = stats_provider
1339         self._helper = None
1340         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1341         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1342         service.MultiService.__init__(self)
1343
1344     def startService(self):
1345         service.MultiService.startService(self)
1346         if self._helper_furl:
1347             self.parent.tub.connectTo(self._helper_furl,
1348                                       self._got_helper)
1349
1350     def _got_helper(self, helper):
1351         self.log("got helper connection, getting versions")
1352         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1353                     { },
1354                     "application-version": "unknown: no get_version()",
1355                     }
1356         d = add_version_to_remote_reference(helper, default)
1357         d.addCallback(self._got_versioned_helper)
1358
1359     def _got_versioned_helper(self, helper):
1360         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1361         if needed not in helper.version:
1362             raise InsufficientVersionError(needed, helper.version)
1363         self._helper = helper
1364         helper.notifyOnDisconnect(self._lost_helper)
1365
1366     def _lost_helper(self):
1367         self._helper = None
1368
1369     def get_helper_info(self):
1370         # return a tuple of (helper_furl_or_None, connected_bool)
1371         return (self._helper_furl, bool(self._helper))
1372
1373
1374     def upload(self, uploadable, history=None):
1375         """
1376         Returns a Deferred that will fire with the UploadResults instance.
1377         """
1378         assert self.parent
1379         assert self.running
1380
1381         uploadable = IUploadable(uploadable)
1382         d = uploadable.get_size()
1383         def _got_size(size):
1384             default_params = self.parent.get_encoding_parameters()
1385             precondition(isinstance(default_params, dict), default_params)
1386             precondition("max_segment_size" in default_params, default_params)
1387             uploadable.set_default_encoding_parameters(default_params)
1388
1389             if self.stats_provider:
1390                 self.stats_provider.count('uploader.files_uploaded', 1)
1391                 self.stats_provider.count('uploader.bytes_uploaded', size)
1392
1393             if size <= self.URI_LIT_SIZE_THRESHOLD:
1394                 uploader = LiteralUploader()
1395                 return uploader.start(uploadable)
1396             else:
1397                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1398                 d2 = defer.succeed(None)
1399                 if self._helper:
1400                     uploader = AssistedUploader(self._helper)
1401                     d2.addCallback(lambda x: eu.get_storage_index())
1402                     d2.addCallback(lambda si: uploader.start(eu, si))
1403                 else:
1404                     storage_broker = self.parent.get_storage_broker()
1405                     secret_holder = self.parent._secret_holder
1406                     uploader = CHKUploader(storage_broker, secret_holder)
1407                     d2.addCallback(lambda x: uploader.start(eu))
1408
1409                 self._all_uploads[uploader] = None
1410                 if history:
1411                     history.add_upload(uploader.get_upload_status())
1412                 def turn_verifycap_into_read_cap(uploadresults):
1413                     # Generate the uri from the verifycap plus the key.
1414                     d3 = uploadable.get_encryption_key()
1415                     def put_readcap_into_results(key):
1416                         v = uri.from_string(uploadresults.verifycapstr)
1417                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1418                         uploadresults.uri = r.to_string()
1419                         return uploadresults
1420                     d3.addCallback(put_readcap_into_results)
1421                     return d3
1422                 d2.addCallback(turn_verifycap_into_read_cap)
1423                 return d2
1424         d.addCallback(_got_size)
1425         def _done(res):
1426             uploadable.close()
1427             return res
1428         d.addBoth(_done)
1429         return d