]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
upload: tidy up logging messages
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17                                          shares_by_server, merge_peers, \
18                                          failure_message
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23      NoServersError, InsufficientVersionError, UploadUnhappinessError
24 from allmydata.immutable import layout
25 from pycryptopp.cipher.aes import AES
26
27 from cStringIO import StringIO
28
29
30 KiB=1024
31 MiB=1024*KiB
32 GiB=1024*MiB
33 TiB=1024*GiB
34 PiB=1024*TiB
35
36 class HaveAllPeersError(Exception):
37     # we use this to jump out of the loop
38     pass
39
40 # this wants to live in storage, not here
41 class TooFullError(Exception):
42     pass
43
44 class UploadResults(Copyable, RemoteCopy):
45     implements(IUploadResults)
46     # note: don't change this string, it needs to match the value used on the
47     # helper, and it does *not* need to match the fully-qualified
48     # package/module/class name
49     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
50     copytype = typeToCopy
51
52     # also, think twice about changing the shape of any existing attribute,
53     # because instances of this class are sent from the helper to its client,
54     # so changing this may break compatibility. Consider adding new fields
55     # instead of modifying existing ones.
56
57     def __init__(self):
58         self.timings = {} # dict of name to number of seconds
59         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
60         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
61         self.file_size = None
62         self.ciphertext_fetched = None # how much the helper fetched
63         self.uri = None
64         self.preexisting_shares = None # count of shares already present
65         self.pushed_shares = None # count of shares we pushed
66
67
68 # our current uri_extension is 846 bytes for small files, a few bytes
69 # more for larger ones (since the filesize is encoded in decimal in a
70 # few places). Ask for a little bit more just in case we need it. If
71 # the extension changes size, we can change EXTENSION_SIZE to
72 # allocate a more accurate amount of space.
73 EXTENSION_SIZE = 1000
74 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
75 # this.
76
77 def pretty_print_shnum_to_servers(s):
78     return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
79
80 class PeerTracker:
81     def __init__(self, peerid, storage_server,
82                  sharesize, blocksize, num_segments, num_share_hashes,
83                  storage_index,
84                  bucket_renewal_secret, bucket_cancel_secret):
85         precondition(isinstance(peerid, str), peerid)
86         precondition(len(peerid) == 20, peerid)
87         self.peerid = peerid
88         self._storageserver = storage_server # to an RIStorageServer
89         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
90         self.sharesize = sharesize
91
92         wbp = layout.make_write_bucket_proxy(None, sharesize,
93                                              blocksize, num_segments,
94                                              num_share_hashes,
95                                              EXTENSION_SIZE, peerid)
96         self.wbp_class = wbp.__class__ # to create more of them
97         self.allocated_size = wbp.get_allocated_size()
98         self.blocksize = blocksize
99         self.num_segments = num_segments
100         self.num_share_hashes = num_share_hashes
101         self.storage_index = storage_index
102
103         self.renew_secret = bucket_renewal_secret
104         self.cancel_secret = bucket_cancel_secret
105
106     def __repr__(self):
107         return ("<PeerTracker for peer %s and SI %s>"
108                 % (idlib.shortnodeid_b2a(self.peerid),
109                    si_b2a(self.storage_index)[:5]))
110
111     def query(self, sharenums):
112         d = self._storageserver.callRemote("allocate_buckets",
113                                            self.storage_index,
114                                            self.renew_secret,
115                                            self.cancel_secret,
116                                            sharenums,
117                                            self.allocated_size,
118                                            canary=Referenceable())
119         d.addCallback(self._got_reply)
120         return d
121
122     def ask_about_existing_shares(self):
123         return self._storageserver.callRemote("get_buckets",
124                                               self.storage_index)
125
126     def _got_reply(self, (alreadygot, buckets)):
127         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
128         b = {}
129         for sharenum, rref in buckets.iteritems():
130             bp = self.wbp_class(rref, self.sharesize,
131                                 self.blocksize,
132                                 self.num_segments,
133                                 self.num_share_hashes,
134                                 EXTENSION_SIZE,
135                                 self.peerid)
136             b[sharenum] = bp
137         self.buckets.update(b)
138         return (alreadygot, set(b.keys()))
139
140
141     def abort(self):
142         """
143         I abort the remote bucket writers for all shares. This is a good idea
144         to conserve space on the storage server.
145         """
146         self.abort_some_buckets(self.buckets.keys())
147
148     def abort_some_buckets(self, sharenums):
149         """
150         I abort the remote bucket writers for the share numbers in sharenums.
151         """
152         for sharenum in sharenums:
153             if sharenum in self.buckets:
154                 self.buckets[sharenum].abort()
155                 del self.buckets[sharenum]
156
157
158 def str_shareloc(shnum, bucketwriter):
159     return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),)
160
161 class Tahoe2PeerSelector(log.PrefixingLogMixin):
162
163     def __init__(self, upload_id, logparent=None, upload_status=None):
164         self.upload_id = upload_id
165         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
166         # Peers that are working normally, but full.
167         self.full_count = 0
168         self.error_count = 0
169         self.num_peers_contacted = 0
170         self.last_failure_msg = None
171         self._status = IUploadStatus(upload_status)
172         log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
173         self.log("starting", level=log.OPERATIONAL)
174
175     def __repr__(self):
176         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
177
178     def get_shareholders(self, storage_broker, secret_holder,
179                          storage_index, share_size, block_size,
180                          num_segments, total_shares, needed_shares,
181                          servers_of_happiness):
182         """
183         @return: (upload_servers, already_peers), where upload_servers is a set of
184                  PeerTracker instances that have agreed to hold some shares
185                  for us (the shareids are stashed inside the PeerTracker),
186                  and already_peers is a dict mapping shnum to a set of peers
187                  which claim to already have the share.
188         """
189
190         if self._status:
191             self._status.set_status("Contacting Peers..")
192
193         self.total_shares = total_shares
194         self.servers_of_happiness = servers_of_happiness
195         self.needed_shares = needed_shares
196
197         self.homeless_shares = set(range(total_shares))
198         self.contacted_peers = [] # peers worth asking again
199         self.contacted_peers2 = [] # peers that we have asked again
200         self._started_second_pass = False
201         self.use_peers = set() # PeerTrackers that have shares assigned to them
202         self.preexisting_shares = {} # shareid => set(peerids) holding shareid
203         # We don't try to allocate shares to these servers, since they've said
204         # that they're incapable of storing shares of the size that we'd want
205         # to store. We keep them around because they may have existing shares
206         # for this storage index, which we want to know about for accurate
207         # servers_of_happiness accounting
208         # (this is eventually a list, but it is initialized later)
209         self.readonly_peers = None
210         # These peers have shares -- any shares -- for our SI. We keep
211         # track of these to write an error message with them later.
212         self.peers_with_shares = set()
213
214         # this needed_hashes computation should mirror
215         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
216         # (instead of a HashTree) because we don't require actual hashing
217         # just to count the levels.
218         ht = hashtree.IncompleteHashTree(total_shares)
219         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
220
221         # figure out how much space to ask for
222         wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
223                                              num_share_hashes, EXTENSION_SIZE,
224                                              None)
225         allocated_size = wbp.get_allocated_size()
226         all_peers = storage_broker.get_servers_for_index(storage_index)
227         if not all_peers:
228             raise NoServersError("client gave us zero peers")
229
230         # filter the list of peers according to which ones can accomodate
231         # this request. This excludes older peers (which used a 4-byte size
232         # field) from getting large shares (for files larger than about
233         # 12GiB). See #439 for details.
234         def _get_maxsize(peer):
235             (peerid, conn) = peer
236             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
237             return v1["maximum-immutable-share-size"]
238         writable_peers = [peer for peer in all_peers
239                           if _get_maxsize(peer) >= allocated_size]
240         readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers)
241
242         # decide upon the renewal/cancel secrets, to include them in the
243         # allocate_buckets query.
244         client_renewal_secret = secret_holder.get_renewal_secret()
245         client_cancel_secret = secret_holder.get_cancel_secret()
246
247         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
248                                                        storage_index)
249         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
250                                                      storage_index)
251         def _make_trackers(peers):
252            return [PeerTracker(peerid, conn,
253                                share_size, block_size,
254                                num_segments, num_share_hashes,
255                                storage_index,
256                                bucket_renewal_secret_hash(file_renewal_secret,
257                                                           peerid),
258                                bucket_cancel_secret_hash(file_cancel_secret,
259                                                          peerid))
260                     for (peerid, conn) in peers]
261         self.uncontacted_peers = _make_trackers(writable_peers)
262         self.readonly_peers = _make_trackers(readonly_peers)
263         # We now ask peers that can't hold any new shares about existing
264         # shares that they might have for our SI. Once this is done, we
265         # start placing the shares that we haven't already accounted
266         # for.
267         ds = []
268         if self._status and self.readonly_peers:
269             self._status.set_status("Contacting readonly peers to find "
270                                     "any existing shares")
271         for peer in self.readonly_peers:
272             assert isinstance(peer, PeerTracker)
273             d = peer.ask_about_existing_shares()
274             d.addBoth(self._handle_existing_response, peer.peerid)
275             ds.append(d)
276             self.num_peers_contacted += 1
277             self.query_count += 1
278             self.log("asking peer %s for any existing shares" %
279                      (idlib.shortnodeid_b2a(peer.peerid),),
280                     level=log.NOISY)
281         dl = defer.DeferredList(ds)
282         dl.addCallback(lambda ign: self._loop())
283         return dl
284
285
286     def _handle_existing_response(self, res, peer):
287         """
288         I handle responses to the queries sent by
289         Tahoe2PeerSelector._existing_shares.
290         """
291         if isinstance(res, failure.Failure):
292             self.log("%s got error during existing shares check: %s"
293                     % (idlib.shortnodeid_b2a(peer), res),
294                     level=log.UNUSUAL)
295             self.error_count += 1
296             self.bad_query_count += 1
297         else:
298             buckets = res
299             if buckets:
300                 self.peers_with_shares.add(peer)
301             self.log("response to get_buckets() from peer %s: alreadygot=%s"
302                     % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
303                     level=log.NOISY)
304             for bucket in buckets:
305                 self.preexisting_shares.setdefault(bucket, set()).add(peer)
306                 self.homeless_shares.discard(bucket)
307             self.full_count += 1
308             self.bad_query_count += 1
309
310
311     def _get_progress_message(self):
312         if not self.homeless_shares:
313             msg = "placed all %d shares, " % (self.total_shares)
314         else:
315             msg = ("placed %d shares out of %d total (%d homeless), " %
316                    (self.total_shares - len(self.homeless_shares),
317                     self.total_shares,
318                     len(self.homeless_shares)))
319         return (msg + "want to place shares on at least %d servers such that "
320                       "any %d of them have enough shares to recover the file, "
321                       "sent %d queries to %d peers, "
322                       "%d queries placed some shares, %d placed none "
323                       "(of which %d placed none due to the server being"
324                       " full and %d placed none due to an error)" %
325                         (self.servers_of_happiness, self.needed_shares,
326                          self.query_count, self.num_peers_contacted,
327                          self.good_query_count, self.bad_query_count,
328                          self.full_count, self.error_count))
329
330
331     def _loop(self):
332         if not self.homeless_shares:
333             merged = merge_peers(self.preexisting_shares, self.use_peers)
334             effective_happiness = servers_of_happiness(merged)
335             if self.servers_of_happiness <= effective_happiness:
336                 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
337                     "self.use_peers: %s, self.preexisting_shares: %s") \
338                         % (self, self._get_progress_message(),
339                         pretty_print_shnum_to_servers(merged),
340                         [', '.join([str_shareloc(k,v) for k,v in p.buckets.iteritems()])
341                             for p in self.use_peers],
342                         pretty_print_shnum_to_servers(self.preexisting_shares))
343                 self.log(msg, level=log.OPERATIONAL)
344                 return (self.use_peers, self.preexisting_shares)
345             else:
346                 # We're not okay right now, but maybe we can fix it by
347                 # redistributing some shares. In cases where one or two
348                 # servers has, before the upload, all or most of the
349                 # shares for a given SI, this can work by allowing _loop
350                 # a chance to spread those out over the other peers,
351                 delta = self.servers_of_happiness - effective_happiness
352                 shares = shares_by_server(self.preexisting_shares)
353                 # Each server in shares maps to a set of shares stored on it.
354                 # Since we want to keep at least one share on each server
355                 # that has one (otherwise we'd only be making
356                 # the situation worse by removing distinct servers),
357                 # each server has len(its shares) - 1 to spread around.
358                 shares_to_spread = sum([len(list(sharelist)) - 1
359                                         for (server, sharelist)
360                                         in shares.items()])
361                 if delta <= len(self.uncontacted_peers) and \
362                    shares_to_spread >= delta:
363                     items = shares.items()
364                     while len(self.homeless_shares) < delta:
365                         # Loop through the allocated shares, removing
366                         # one from each server that has more than one
367                         # and putting it back into self.homeless_shares
368                         # until we've done this delta times.
369                         server, sharelist = items.pop()
370                         if len(sharelist) > 1:
371                             share = sharelist.pop()
372                             self.homeless_shares.add(share)
373                             self.preexisting_shares[share].remove(server)
374                             if not self.preexisting_shares[share]:
375                                 del self.preexisting_shares[share]
376                             items.append((server, sharelist))
377                         for writer in self.use_peers:
378                             writer.abort_some_buckets(self.homeless_shares)
379                     return self._loop()
380                 else:
381                     # Redistribution won't help us; fail.
382                     peer_count = len(self.peers_with_shares)
383                     failmsg = failure_message(peer_count,
384                                           self.needed_shares,
385                                           self.servers_of_happiness,
386                                           effective_happiness)
387                     servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
388                     servmsg = servmsgtempl % (
389                         self,
390                         failmsg,
391                         self._get_progress_message(),
392                         pretty_print_shnum_to_servers(merged)
393                         )
394                     self.log(servmsg, level=log.INFREQUENT)
395                     return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
396
397         if self.uncontacted_peers:
398             peer = self.uncontacted_peers.pop(0)
399             # TODO: don't pre-convert all peerids to PeerTrackers
400             assert isinstance(peer, PeerTracker)
401
402             shares_to_ask = set(sorted(self.homeless_shares)[:1])
403             self.homeless_shares -= shares_to_ask
404             self.query_count += 1
405             self.num_peers_contacted += 1
406             if self._status:
407                 self._status.set_status("Contacting Peers [%s] (first query),"
408                                         " %d shares left.."
409                                         % (idlib.shortnodeid_b2a(peer.peerid),
410                                            len(self.homeless_shares)))
411             d = peer.query(shares_to_ask)
412             d.addBoth(self._got_response, peer, shares_to_ask,
413                       self.contacted_peers)
414             return d
415         elif self.contacted_peers:
416             # ask a peer that we've already asked.
417             if not self._started_second_pass:
418                 self.log("starting second pass",
419                         level=log.NOISY)
420                 self._started_second_pass = True
421             num_shares = mathutil.div_ceil(len(self.homeless_shares),
422                                            len(self.contacted_peers))
423             peer = self.contacted_peers.pop(0)
424             shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
425             self.homeless_shares -= shares_to_ask
426             self.query_count += 1
427             if self._status:
428                 self._status.set_status("Contacting Peers [%s] (second query),"
429                                         " %d shares left.."
430                                         % (idlib.shortnodeid_b2a(peer.peerid),
431                                            len(self.homeless_shares)))
432             d = peer.query(shares_to_ask)
433             d.addBoth(self._got_response, peer, shares_to_ask,
434                       self.contacted_peers2)
435             return d
436         elif self.contacted_peers2:
437             # we've finished the second-or-later pass. Move all the remaining
438             # peers back into self.contacted_peers for the next pass.
439             self.contacted_peers.extend(self.contacted_peers2)
440             self.contacted_peers2[:] = []
441             return self._loop()
442         else:
443             # no more peers. If we haven't placed enough shares, we fail.
444             merged = merge_peers(self.preexisting_shares, self.use_peers)
445             effective_happiness = servers_of_happiness(merged)
446             if effective_happiness < self.servers_of_happiness:
447                 msg = failure_message(len(self.peers_with_shares),
448                                       self.needed_shares,
449                                       self.servers_of_happiness,
450                                       effective_happiness)
451                 msg = ("peer selection failed for %s: %s (%s)" % (self,
452                                 msg,
453                                 self._get_progress_message()))
454                 if self.last_failure_msg:
455                     msg += " (%s)" % (self.last_failure_msg,)
456                 self.log(msg, level=log.UNUSUAL)
457                 return self._failed(msg)
458             else:
459                 # we placed enough to be happy, so we're done
460                 if self._status:
461                     self._status.set_status("Placed all shares")
462                 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
463                             self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
464                 self.log(msg, level=log.OPERATIONAL)
465                 return (self.use_peers, self.preexisting_shares)
466
467     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
468         if isinstance(res, failure.Failure):
469             # This is unusual, and probably indicates a bug or a network
470             # problem.
471             self.log("%s got error during peer selection: %s" % (peer, res),
472                     level=log.UNUSUAL)
473             self.error_count += 1
474             self.bad_query_count += 1
475             self.homeless_shares |= shares_to_ask
476             if (self.uncontacted_peers
477                 or self.contacted_peers
478                 or self.contacted_peers2):
479                 # there is still hope, so just loop
480                 pass
481             else:
482                 # No more peers, so this upload might fail (it depends upon
483                 # whether we've hit servers_of_happiness or not). Log the last
484                 # failure we got: if a coding error causes all peers to fail
485                 # in the same way, this allows the common failure to be seen
486                 # by the uploader and should help with debugging
487                 msg = ("last failure (from %s) was: %s" % (peer, res))
488                 self.last_failure_msg = msg
489         else:
490             (alreadygot, allocated) = res
491             self.log("response to allocate_buckets() from peer %s: alreadygot=%s, allocated=%s"
492                     % (idlib.shortnodeid_b2a(peer.peerid),
493                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
494                     level=log.NOISY)
495             progress = False
496             for s in alreadygot:
497                 self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
498                 if s in self.homeless_shares:
499                     self.homeless_shares.remove(s)
500                     progress = True
501                 elif s in shares_to_ask:
502                     progress = True
503
504             # the PeerTracker will remember which shares were allocated on
505             # that peer. We just have to remember to use them.
506             if allocated:
507                 self.use_peers.add(peer)
508                 progress = True
509
510             if allocated or alreadygot:
511                 self.peers_with_shares.add(peer.peerid)
512
513             not_yet_present = set(shares_to_ask) - set(alreadygot)
514             still_homeless = not_yet_present - set(allocated)
515
516             if progress:
517                 # They accepted at least one of the shares that we asked
518                 # them to accept, or they had a share that we didn't ask
519                 # them to accept but that we hadn't placed yet, so this
520                 # was a productive query
521                 self.good_query_count += 1
522             else:
523                 self.bad_query_count += 1
524                 self.full_count += 1
525
526             if still_homeless:
527                 # In networks with lots of space, this is very unusual and
528                 # probably indicates an error. In networks with peers that
529                 # are full, it is merely unusual. In networks that are very
530                 # full, it is common, and many uploads will fail. In most
531                 # cases, this is obviously not fatal, and we'll just use some
532                 # other peers.
533
534                 # some shares are still homeless, keep trying to find them a
535                 # home. The ones that were rejected get first priority.
536                 self.homeless_shares |= still_homeless
537                 # Since they were unable to accept all of our requests, so it
538                 # is safe to assume that asking them again won't help.
539             else:
540                 # if they *were* able to accept everything, they might be
541                 # willing to accept even more.
542                 put_peer_here.append(peer)
543
544         # now loop
545         return self._loop()
546
547
548     def _failed(self, msg):
549         """
550         I am called when peer selection fails. I first abort all of the
551         remote buckets that I allocated during my unsuccessful attempt to
552         place shares for this file. I then raise an
553         UploadUnhappinessError with my msg argument.
554         """
555         for peer in self.use_peers:
556             assert isinstance(peer, PeerTracker)
557
558             peer.abort()
559
560         raise UploadUnhappinessError(msg)
561
562
563 class EncryptAnUploadable:
564     """This is a wrapper that takes an IUploadable and provides
565     IEncryptedUploadable."""
566     implements(IEncryptedUploadable)
567     CHUNKSIZE = 50*1024
568
569     def __init__(self, original, log_parent=None):
570         self.original = IUploadable(original)
571         self._log_number = log_parent
572         self._encryptor = None
573         self._plaintext_hasher = plaintext_hasher()
574         self._plaintext_segment_hasher = None
575         self._plaintext_segment_hashes = []
576         self._encoding_parameters = None
577         self._file_size = None
578         self._ciphertext_bytes_read = 0
579         self._status = None
580
581     def set_upload_status(self, upload_status):
582         self._status = IUploadStatus(upload_status)
583         self.original.set_upload_status(upload_status)
584
585     def log(self, *args, **kwargs):
586         if "facility" not in kwargs:
587             kwargs["facility"] = "upload.encryption"
588         if "parent" not in kwargs:
589             kwargs["parent"] = self._log_number
590         return log.msg(*args, **kwargs)
591
592     def get_size(self):
593         if self._file_size is not None:
594             return defer.succeed(self._file_size)
595         d = self.original.get_size()
596         def _got_size(size):
597             self._file_size = size
598             if self._status:
599                 self._status.set_size(size)
600             return size
601         d.addCallback(_got_size)
602         return d
603
604     def get_all_encoding_parameters(self):
605         if self._encoding_parameters is not None:
606             return defer.succeed(self._encoding_parameters)
607         d = self.original.get_all_encoding_parameters()
608         def _got(encoding_parameters):
609             (k, happy, n, segsize) = encoding_parameters
610             self._segment_size = segsize # used by segment hashers
611             self._encoding_parameters = encoding_parameters
612             self.log("my encoding parameters: %s" % (encoding_parameters,),
613                      level=log.NOISY)
614             return encoding_parameters
615         d.addCallback(_got)
616         return d
617
618     def _get_encryptor(self):
619         if self._encryptor:
620             return defer.succeed(self._encryptor)
621
622         d = self.original.get_encryption_key()
623         def _got(key):
624             e = AES(key)
625             self._encryptor = e
626
627             storage_index = storage_index_hash(key)
628             assert isinstance(storage_index, str)
629             # There's no point to having the SI be longer than the key, so we
630             # specify that it is truncated to the same 128 bits as the AES key.
631             assert len(storage_index) == 16  # SHA-256 truncated to 128b
632             self._storage_index = storage_index
633             if self._status:
634                 self._status.set_storage_index(storage_index)
635             return e
636         d.addCallback(_got)
637         return d
638
639     def get_storage_index(self):
640         d = self._get_encryptor()
641         d.addCallback(lambda res: self._storage_index)
642         return d
643
644     def _get_segment_hasher(self):
645         p = self._plaintext_segment_hasher
646         if p:
647             left = self._segment_size - self._plaintext_segment_hashed_bytes
648             return p, left
649         p = plaintext_segment_hasher()
650         self._plaintext_segment_hasher = p
651         self._plaintext_segment_hashed_bytes = 0
652         return p, self._segment_size
653
654     def _update_segment_hash(self, chunk):
655         offset = 0
656         while offset < len(chunk):
657             p, segment_left = self._get_segment_hasher()
658             chunk_left = len(chunk) - offset
659             this_segment = min(chunk_left, segment_left)
660             p.update(chunk[offset:offset+this_segment])
661             self._plaintext_segment_hashed_bytes += this_segment
662
663             if self._plaintext_segment_hashed_bytes == self._segment_size:
664                 # we've filled this segment
665                 self._plaintext_segment_hashes.append(p.digest())
666                 self._plaintext_segment_hasher = None
667                 self.log("closed hash [%d]: %dB" %
668                          (len(self._plaintext_segment_hashes)-1,
669                           self._plaintext_segment_hashed_bytes),
670                          level=log.NOISY)
671                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
672                          segnum=len(self._plaintext_segment_hashes)-1,
673                          hash=base32.b2a(p.digest()),
674                          level=log.NOISY)
675
676             offset += this_segment
677
678
679     def read_encrypted(self, length, hash_only):
680         # make sure our parameters have been set up first
681         d = self.get_all_encoding_parameters()
682         # and size
683         d.addCallback(lambda ignored: self.get_size())
684         d.addCallback(lambda ignored: self._get_encryptor())
685         # then fetch and encrypt the plaintext. The unusual structure here
686         # (passing a Deferred *into* a function) is needed to avoid
687         # overflowing the stack: Deferreds don't optimize out tail recursion.
688         # We also pass in a list, to which _read_encrypted will append
689         # ciphertext.
690         ciphertext = []
691         d2 = defer.Deferred()
692         d.addCallback(lambda ignored:
693                       self._read_encrypted(length, ciphertext, hash_only, d2))
694         d.addCallback(lambda ignored: d2)
695         return d
696
697     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
698         if not remaining:
699             fire_when_done.callback(ciphertext)
700             return None
701         # tolerate large length= values without consuming a lot of RAM by
702         # reading just a chunk (say 50kB) at a time. This only really matters
703         # when hash_only==True (i.e. resuming an interrupted upload), since
704         # that's the case where we will be skipping over a lot of data.
705         size = min(remaining, self.CHUNKSIZE)
706         remaining = remaining - size
707         # read a chunk of plaintext..
708         d = defer.maybeDeferred(self.original.read, size)
709         # N.B.: if read() is synchronous, then since everything else is
710         # actually synchronous too, we'd blow the stack unless we stall for a
711         # tick. Once you accept a Deferred from IUploadable.read(), you must
712         # be prepared to have it fire immediately too.
713         d.addCallback(fireEventually)
714         def _good(plaintext):
715             # and encrypt it..
716             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
717             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
718             ciphertext.extend(ct)
719             self._read_encrypted(remaining, ciphertext, hash_only,
720                                  fire_when_done)
721         def _err(why):
722             fire_when_done.errback(why)
723         d.addCallback(_good)
724         d.addErrback(_err)
725         return None
726
727     def _hash_and_encrypt_plaintext(self, data, hash_only):
728         assert isinstance(data, (tuple, list)), type(data)
729         data = list(data)
730         cryptdata = []
731         # we use data.pop(0) instead of 'for chunk in data' to save
732         # memory: each chunk is destroyed as soon as we're done with it.
733         bytes_processed = 0
734         while data:
735             chunk = data.pop(0)
736             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
737                      level=log.NOISY)
738             bytes_processed += len(chunk)
739             self._plaintext_hasher.update(chunk)
740             self._update_segment_hash(chunk)
741             # TODO: we have to encrypt the data (even if hash_only==True)
742             # because pycryptopp's AES-CTR implementation doesn't offer a
743             # way to change the counter value. Once pycryptopp acquires
744             # this ability, change this to simply update the counter
745             # before each call to (hash_only==False) _encryptor.process()
746             ciphertext = self._encryptor.process(chunk)
747             if hash_only:
748                 self.log("  skipping encryption", level=log.NOISY)
749             else:
750                 cryptdata.append(ciphertext)
751             del ciphertext
752             del chunk
753         self._ciphertext_bytes_read += bytes_processed
754         if self._status:
755             progress = float(self._ciphertext_bytes_read) / self._file_size
756             self._status.set_progress(1, progress)
757         return cryptdata
758
759
760     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
761         # this is currently unused, but will live again when we fix #453
762         if len(self._plaintext_segment_hashes) < num_segments:
763             # close out the last one
764             assert len(self._plaintext_segment_hashes) == num_segments-1
765             p, segment_left = self._get_segment_hasher()
766             self._plaintext_segment_hashes.append(p.digest())
767             del self._plaintext_segment_hasher
768             self.log("closing plaintext leaf hasher, hashed %d bytes" %
769                      self._plaintext_segment_hashed_bytes,
770                      level=log.NOISY)
771             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
772                      segnum=len(self._plaintext_segment_hashes)-1,
773                      hash=base32.b2a(p.digest()),
774                      level=log.NOISY)
775         assert len(self._plaintext_segment_hashes) == num_segments
776         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
777
778     def get_plaintext_hash(self):
779         h = self._plaintext_hasher.digest()
780         return defer.succeed(h)
781
782     def close(self):
783         return self.original.close()
784
785 class UploadStatus:
786     implements(IUploadStatus)
787     statusid_counter = itertools.count(0)
788
789     def __init__(self):
790         self.storage_index = None
791         self.size = None
792         self.helper = False
793         self.status = "Not started"
794         self.progress = [0.0, 0.0, 0.0]
795         self.active = True
796         self.results = None
797         self.counter = self.statusid_counter.next()
798         self.started = time.time()
799
800     def get_started(self):
801         return self.started
802     def get_storage_index(self):
803         return self.storage_index
804     def get_size(self):
805         return self.size
806     def using_helper(self):
807         return self.helper
808     def get_status(self):
809         return self.status
810     def get_progress(self):
811         return tuple(self.progress)
812     def get_active(self):
813         return self.active
814     def get_results(self):
815         return self.results
816     def get_counter(self):
817         return self.counter
818
819     def set_storage_index(self, si):
820         self.storage_index = si
821     def set_size(self, size):
822         self.size = size
823     def set_helper(self, helper):
824         self.helper = helper
825     def set_status(self, status):
826         self.status = status
827     def set_progress(self, which, value):
828         # [0]: chk, [1]: ciphertext, [2]: encode+push
829         self.progress[which] = value
830     def set_active(self, value):
831         self.active = value
832     def set_results(self, value):
833         self.results = value
834
835 class CHKUploader:
836     peer_selector_class = Tahoe2PeerSelector
837
838     def __init__(self, storage_broker, secret_holder):
839         # peer_selector needs storage_broker and secret_holder
840         self._storage_broker = storage_broker
841         self._secret_holder = secret_holder
842         self._log_number = self.log("CHKUploader starting", parent=None)
843         self._encoder = None
844         self._results = UploadResults()
845         self._storage_index = None
846         self._upload_status = UploadStatus()
847         self._upload_status.set_helper(False)
848         self._upload_status.set_active(True)
849         self._upload_status.set_results(self._results)
850
851         # locate_all_shareholders() will create the following attribute:
852         # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
853
854     def log(self, *args, **kwargs):
855         if "parent" not in kwargs:
856             kwargs["parent"] = self._log_number
857         if "facility" not in kwargs:
858             kwargs["facility"] = "tahoe.upload"
859         return log.msg(*args, **kwargs)
860
861     def start(self, encrypted_uploadable):
862         """Start uploading the file.
863
864         Returns a Deferred that will fire with the UploadResults instance.
865         """
866
867         self._started = time.time()
868         eu = IEncryptedUploadable(encrypted_uploadable)
869         self.log("starting upload of %s" % eu)
870
871         eu.set_upload_status(self._upload_status)
872         d = self.start_encrypted(eu)
873         def _done(uploadresults):
874             self._upload_status.set_active(False)
875             return uploadresults
876         d.addBoth(_done)
877         return d
878
879     def abort(self):
880         """Call this if the upload must be abandoned before it completes.
881         This will tell the shareholders to delete their partial shares. I
882         return a Deferred that fires when these messages have been acked."""
883         if not self._encoder:
884             # how did you call abort() before calling start() ?
885             return defer.succeed(None)
886         return self._encoder.abort()
887
888     def start_encrypted(self, encrypted):
889         """ Returns a Deferred that will fire with the UploadResults instance. """
890         eu = IEncryptedUploadable(encrypted)
891
892         started = time.time()
893         self._encoder = e = encode.Encoder(self._log_number,
894                                            self._upload_status)
895         d = e.set_encrypted_uploadable(eu)
896         d.addCallback(self.locate_all_shareholders, started)
897         d.addCallback(self.set_shareholders, e)
898         d.addCallback(lambda res: e.start())
899         d.addCallback(self._encrypted_done)
900         return d
901
902     def locate_all_shareholders(self, encoder, started):
903         peer_selection_started = now = time.time()
904         self._storage_index_elapsed = now - started
905         storage_broker = self._storage_broker
906         secret_holder = self._secret_holder
907         storage_index = encoder.get_param("storage_index")
908         self._storage_index = storage_index
909         upload_id = si_b2a(storage_index)[:5]
910         self.log("using storage index %s" % upload_id)
911         peer_selector = self.peer_selector_class(upload_id, self._log_number,
912                                                  self._upload_status)
913
914         share_size = encoder.get_param("share_size")
915         block_size = encoder.get_param("block_size")
916         num_segments = encoder.get_param("num_segments")
917         k,desired,n = encoder.get_param("share_counts")
918
919         self._peer_selection_started = time.time()
920         d = peer_selector.get_shareholders(storage_broker, secret_holder,
921                                            storage_index,
922                                            share_size, block_size,
923                                            num_segments, n, k, desired)
924         def _done(res):
925             self._peer_selection_elapsed = time.time() - peer_selection_started
926             return res
927         d.addCallback(_done)
928         return d
929
930     def set_shareholders(self, (upload_servers, already_peers), encoder):
931         """
932         @param upload_servers: a sequence of PeerTracker objects that have agreed to hold some
933             shares for us (the shareids are stashed inside the PeerTracker)
934         @paran already_peers: a dict mapping sharenum to a set of peerids
935                               that claim to already have this share
936         """
937         msgtempl = "set_shareholders; upload_servers is %s, already_peers is %s"
938         values = ([', '.join([str_shareloc(k,v) for k,v in p.buckets.iteritems()])
939             for p in upload_servers], already_peers)
940         self.log(msgtempl % values, level=log.OPERATIONAL)
941         # record already-present shares in self._results
942         self._results.preexisting_shares = len(already_peers)
943
944         self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
945         for peer in upload_servers:
946             assert isinstance(peer, PeerTracker)
947         buckets = {}
948         servermap = already_peers.copy()
949         for peer in upload_servers:
950             buckets.update(peer.buckets)
951             for shnum in peer.buckets:
952                 self._peer_trackers[shnum] = peer
953                 servermap.setdefault(shnum, set()).add(peer.peerid)
954         assert len(buckets) == sum([len(peer.buckets) for peer in upload_servers]), \
955             "%s (%s) != %s (%s)" % (
956                 len(buckets),
957                 buckets,
958                 sum([len(peer.buckets) for peer in upload_servers]),
959                 [(p.buckets, p.peerid) for p in upload_servers]
960                 )
961         encoder.set_shareholders(buckets, servermap)
962
963     def _encrypted_done(self, verifycap):
964         """ Returns a Deferred that will fire with the UploadResults instance. """
965         r = self._results
966         for shnum in self._encoder.get_shares_placed():
967             peer_tracker = self._peer_trackers[shnum]
968             peerid = peer_tracker.peerid
969             r.sharemap.add(shnum, peerid)
970             r.servermap.add(peerid, shnum)
971         r.pushed_shares = len(self._encoder.get_shares_placed())
972         now = time.time()
973         r.file_size = self._encoder.file_size
974         r.timings["total"] = now - self._started
975         r.timings["storage_index"] = self._storage_index_elapsed
976         r.timings["peer_selection"] = self._peer_selection_elapsed
977         r.timings.update(self._encoder.get_times())
978         r.uri_extension_data = self._encoder.get_uri_extension_data()
979         r.verifycapstr = verifycap.to_string()
980         return r
981
982     def get_upload_status(self):
983         return self._upload_status
984
985 def read_this_many_bytes(uploadable, size, prepend_data=[]):
986     if size == 0:
987         return defer.succeed([])
988     d = uploadable.read(size)
989     def _got(data):
990         assert isinstance(data, list)
991         bytes = sum([len(piece) for piece in data])
992         assert bytes > 0
993         assert bytes <= size
994         remaining = size - bytes
995         if remaining:
996             return read_this_many_bytes(uploadable, remaining,
997                                         prepend_data + data)
998         return prepend_data + data
999     d.addCallback(_got)
1000     return d
1001
1002 class LiteralUploader:
1003
1004     def __init__(self):
1005         self._results = UploadResults()
1006         self._status = s = UploadStatus()
1007         s.set_storage_index(None)
1008         s.set_helper(False)
1009         s.set_progress(0, 1.0)
1010         s.set_active(False)
1011         s.set_results(self._results)
1012
1013     def start(self, uploadable):
1014         uploadable = IUploadable(uploadable)
1015         d = uploadable.get_size()
1016         def _got_size(size):
1017             self._size = size
1018             self._status.set_size(size)
1019             self._results.file_size = size
1020             return read_this_many_bytes(uploadable, size)
1021         d.addCallback(_got_size)
1022         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
1023         d.addCallback(lambda u: u.to_string())
1024         d.addCallback(self._build_results)
1025         return d
1026
1027     def _build_results(self, uri):
1028         self._results.uri = uri
1029         self._status.set_status("Finished")
1030         self._status.set_progress(1, 1.0)
1031         self._status.set_progress(2, 1.0)
1032         return self._results
1033
1034     def close(self):
1035         pass
1036
1037     def get_upload_status(self):
1038         return self._status
1039
1040 class RemoteEncryptedUploadable(Referenceable):
1041     implements(RIEncryptedUploadable)
1042
1043     def __init__(self, encrypted_uploadable, upload_status):
1044         self._eu = IEncryptedUploadable(encrypted_uploadable)
1045         self._offset = 0
1046         self._bytes_sent = 0
1047         self._status = IUploadStatus(upload_status)
1048         # we are responsible for updating the status string while we run, and
1049         # for setting the ciphertext-fetch progress.
1050         self._size = None
1051
1052     def get_size(self):
1053         if self._size is not None:
1054             return defer.succeed(self._size)
1055         d = self._eu.get_size()
1056         def _got_size(size):
1057             self._size = size
1058             return size
1059         d.addCallback(_got_size)
1060         return d
1061
1062     def remote_get_size(self):
1063         return self.get_size()
1064     def remote_get_all_encoding_parameters(self):
1065         return self._eu.get_all_encoding_parameters()
1066
1067     def _read_encrypted(self, length, hash_only):
1068         d = self._eu.read_encrypted(length, hash_only)
1069         def _read(strings):
1070             if hash_only:
1071                 self._offset += length
1072             else:
1073                 size = sum([len(data) for data in strings])
1074                 self._offset += size
1075             return strings
1076         d.addCallback(_read)
1077         return d
1078
1079     def remote_read_encrypted(self, offset, length):
1080         # we don't support seek backwards, but we allow skipping forwards
1081         precondition(offset >= 0, offset)
1082         precondition(length >= 0, length)
1083         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1084                      level=log.NOISY)
1085         precondition(offset >= self._offset, offset, self._offset)
1086         if offset > self._offset:
1087             # read the data from disk anyways, to build up the hash tree
1088             skip = offset - self._offset
1089             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1090                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1091             d = self._read_encrypted(skip, hash_only=True)
1092         else:
1093             d = defer.succeed(None)
1094
1095         def _at_correct_offset(res):
1096             assert offset == self._offset, "%d != %d" % (offset, self._offset)
1097             return self._read_encrypted(length, hash_only=False)
1098         d.addCallback(_at_correct_offset)
1099
1100         def _read(strings):
1101             size = sum([len(data) for data in strings])
1102             self._bytes_sent += size
1103             return strings
1104         d.addCallback(_read)
1105         return d
1106
1107     def remote_close(self):
1108         return self._eu.close()
1109
1110
1111 class AssistedUploader:
1112
1113     def __init__(self, helper):
1114         self._helper = helper
1115         self._log_number = log.msg("AssistedUploader starting")
1116         self._storage_index = None
1117         self._upload_status = s = UploadStatus()
1118         s.set_helper(True)
1119         s.set_active(True)
1120
1121     def log(self, *args, **kwargs):
1122         if "parent" not in kwargs:
1123             kwargs["parent"] = self._log_number
1124         return log.msg(*args, **kwargs)
1125
1126     def start(self, encrypted_uploadable, storage_index):
1127         """Start uploading the file.
1128
1129         Returns a Deferred that will fire with the UploadResults instance.
1130         """
1131         precondition(isinstance(storage_index, str), storage_index)
1132         self._started = time.time()
1133         eu = IEncryptedUploadable(encrypted_uploadable)
1134         eu.set_upload_status(self._upload_status)
1135         self._encuploadable = eu
1136         self._storage_index = storage_index
1137         d = eu.get_size()
1138         d.addCallback(self._got_size)
1139         d.addCallback(lambda res: eu.get_all_encoding_parameters())
1140         d.addCallback(self._got_all_encoding_parameters)
1141         d.addCallback(self._contact_helper)
1142         d.addCallback(self._build_verifycap)
1143         def _done(res):
1144             self._upload_status.set_active(False)
1145             return res
1146         d.addBoth(_done)
1147         return d
1148
1149     def _got_size(self, size):
1150         self._size = size
1151         self._upload_status.set_size(size)
1152
1153     def _got_all_encoding_parameters(self, params):
1154         k, happy, n, segment_size = params
1155         # stash these for URI generation later
1156         self._needed_shares = k
1157         self._total_shares = n
1158         self._segment_size = segment_size
1159
1160     def _contact_helper(self, res):
1161         now = self._time_contacting_helper_start = time.time()
1162         self._storage_index_elapsed = now - self._started
1163         self.log(format="contacting helper for SI %(si)s..",
1164                  si=si_b2a(self._storage_index), level=log.NOISY)
1165         self._upload_status.set_status("Contacting Helper")
1166         d = self._helper.callRemote("upload_chk", self._storage_index)
1167         d.addCallback(self._contacted_helper)
1168         return d
1169
1170     def _contacted_helper(self, (upload_results, upload_helper)):
1171         now = time.time()
1172         elapsed = now - self._time_contacting_helper_start
1173         self._elapsed_time_contacting_helper = elapsed
1174         if upload_helper:
1175             self.log("helper says we need to upload", level=log.NOISY)
1176             self._upload_status.set_status("Uploading Ciphertext")
1177             # we need to upload the file
1178             reu = RemoteEncryptedUploadable(self._encuploadable,
1179                                             self._upload_status)
1180             # let it pre-compute the size for progress purposes
1181             d = reu.get_size()
1182             d.addCallback(lambda ignored:
1183                           upload_helper.callRemote("upload", reu))
1184             # this Deferred will fire with the upload results
1185             return d
1186         self.log("helper says file is already uploaded", level=log.OPERATIONAL)
1187         self._upload_status.set_progress(1, 1.0)
1188         self._upload_status.set_results(upload_results)
1189         return upload_results
1190
1191     def _convert_old_upload_results(self, upload_results):
1192         # pre-1.3.0 helpers return upload results which contain a mapping
1193         # from shnum to a single human-readable string, containing things
1194         # like "Found on [x],[y],[z]" (for healthy files that were already in
1195         # the grid), "Found on [x]" (for files that needed upload but which
1196         # discovered pre-existing shares), and "Placed on [x]" (for newly
1197         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1198         # set of binary serverid strings.
1199
1200         # the old results are too hard to deal with (they don't even contain
1201         # as much information as the new results, since the nodeids are
1202         # abbreviated), so if we detect old results, just clobber them.
1203
1204         sharemap = upload_results.sharemap
1205         if str in [type(v) for v in sharemap.values()]:
1206             upload_results.sharemap = None
1207
1208     def _build_verifycap(self, upload_results):
1209         self.log("upload finished, building readcap", level=log.OPERATIONAL)
1210         self._convert_old_upload_results(upload_results)
1211         self._upload_status.set_status("Building Readcap")
1212         r = upload_results
1213         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1214         assert r.uri_extension_data["total_shares"] == self._total_shares
1215         assert r.uri_extension_data["segment_size"] == self._segment_size
1216         assert r.uri_extension_data["size"] == self._size
1217         r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1218                                              uri_extension_hash=r.uri_extension_hash,
1219                                              needed_shares=self._needed_shares,
1220                                              total_shares=self._total_shares, size=self._size
1221                                              ).to_string()
1222         now = time.time()
1223         r.file_size = self._size
1224         r.timings["storage_index"] = self._storage_index_elapsed
1225         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1226         if "total" in r.timings:
1227             r.timings["helper_total"] = r.timings["total"]
1228         r.timings["total"] = now - self._started
1229         self._upload_status.set_status("Finished")
1230         self._upload_status.set_results(r)
1231         return r
1232
1233     def get_upload_status(self):
1234         return self._upload_status
1235
1236 class BaseUploadable:
1237     default_max_segment_size = 128*KiB # overridden by max_segment_size
1238     default_encoding_param_k = 3 # overridden by encoding_parameters
1239     default_encoding_param_happy = 7
1240     default_encoding_param_n = 10
1241
1242     max_segment_size = None
1243     encoding_param_k = None
1244     encoding_param_happy = None
1245     encoding_param_n = None
1246
1247     _all_encoding_parameters = None
1248     _status = None
1249
1250     def set_upload_status(self, upload_status):
1251         self._status = IUploadStatus(upload_status)
1252
1253     def set_default_encoding_parameters(self, default_params):
1254         assert isinstance(default_params, dict)
1255         for k,v in default_params.items():
1256             precondition(isinstance(k, str), k, v)
1257             precondition(isinstance(v, int), k, v)
1258         if "k" in default_params:
1259             self.default_encoding_param_k = default_params["k"]
1260         if "happy" in default_params:
1261             self.default_encoding_param_happy = default_params["happy"]
1262         if "n" in default_params:
1263             self.default_encoding_param_n = default_params["n"]
1264         if "max_segment_size" in default_params:
1265             self.default_max_segment_size = default_params["max_segment_size"]
1266
1267     def get_all_encoding_parameters(self):
1268         if self._all_encoding_parameters:
1269             return defer.succeed(self._all_encoding_parameters)
1270
1271         max_segsize = self.max_segment_size or self.default_max_segment_size
1272         k = self.encoding_param_k or self.default_encoding_param_k
1273         happy = self.encoding_param_happy or self.default_encoding_param_happy
1274         n = self.encoding_param_n or self.default_encoding_param_n
1275
1276         d = self.get_size()
1277         def _got_size(file_size):
1278             # for small files, shrink the segment size to avoid wasting space
1279             segsize = min(max_segsize, file_size)
1280             # this must be a multiple of 'required_shares'==k
1281             segsize = mathutil.next_multiple(segsize, k)
1282             encoding_parameters = (k, happy, n, segsize)
1283             self._all_encoding_parameters = encoding_parameters
1284             return encoding_parameters
1285         d.addCallback(_got_size)
1286         return d
1287
1288 class FileHandle(BaseUploadable):
1289     implements(IUploadable)
1290
1291     def __init__(self, filehandle, convergence):
1292         """
1293         Upload the data from the filehandle.  If convergence is None then a
1294         random encryption key will be used, else the plaintext will be hashed,
1295         then the hash will be hashed together with the string in the
1296         "convergence" argument to form the encryption key.
1297         """
1298         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1299         self._filehandle = filehandle
1300         self._key = None
1301         self.convergence = convergence
1302         self._size = None
1303
1304     def _get_encryption_key_convergent(self):
1305         if self._key is not None:
1306             return defer.succeed(self._key)
1307
1308         d = self.get_size()
1309         # that sets self._size as a side-effect
1310         d.addCallback(lambda size: self.get_all_encoding_parameters())
1311         def _got(params):
1312             k, happy, n, segsize = params
1313             f = self._filehandle
1314             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1315             f.seek(0)
1316             BLOCKSIZE = 64*1024
1317             bytes_read = 0
1318             while True:
1319                 data = f.read(BLOCKSIZE)
1320                 if not data:
1321                     break
1322                 enckey_hasher.update(data)
1323                 # TODO: setting progress in a non-yielding loop is kind of
1324                 # pointless, but I'm anticipating (perhaps prematurely) the
1325                 # day when we use a slowjob or twisted's CooperatorService to
1326                 # make this yield time to other jobs.
1327                 bytes_read += len(data)
1328                 if self._status:
1329                     self._status.set_progress(0, float(bytes_read)/self._size)
1330             f.seek(0)
1331             self._key = enckey_hasher.digest()
1332             if self._status:
1333                 self._status.set_progress(0, 1.0)
1334             assert len(self._key) == 16
1335             return self._key
1336         d.addCallback(_got)
1337         return d
1338
1339     def _get_encryption_key_random(self):
1340         if self._key is None:
1341             self._key = os.urandom(16)
1342         return defer.succeed(self._key)
1343
1344     def get_encryption_key(self):
1345         if self.convergence is not None:
1346             return self._get_encryption_key_convergent()
1347         else:
1348             return self._get_encryption_key_random()
1349
1350     def get_size(self):
1351         if self._size is not None:
1352             return defer.succeed(self._size)
1353         self._filehandle.seek(0,2)
1354         size = self._filehandle.tell()
1355         self._size = size
1356         self._filehandle.seek(0)
1357         return defer.succeed(size)
1358
1359     def read(self, length):
1360         return defer.succeed([self._filehandle.read(length)])
1361
1362     def close(self):
1363         # the originator of the filehandle reserves the right to close it
1364         pass
1365
1366 class FileName(FileHandle):
1367     def __init__(self, filename, convergence):
1368         """
1369         Upload the data from the filename.  If convergence is None then a
1370         random encryption key will be used, else the plaintext will be hashed,
1371         then the hash will be hashed together with the string in the
1372         "convergence" argument to form the encryption key.
1373         """
1374         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1375         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1376     def close(self):
1377         FileHandle.close(self)
1378         self._filehandle.close()
1379
1380 class Data(FileHandle):
1381     def __init__(self, data, convergence):
1382         """
1383         Upload the data from the data argument.  If convergence is None then a
1384         random encryption key will be used, else the plaintext will be hashed,
1385         then the hash will be hashed together with the string in the
1386         "convergence" argument to form the encryption key.
1387         """
1388         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1389         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1390
1391 class Uploader(service.MultiService, log.PrefixingLogMixin):
1392     """I am a service that allows file uploading. I am a service-child of the
1393     Client.
1394     """
1395     implements(IUploader)
1396     name = "uploader"
1397     URI_LIT_SIZE_THRESHOLD = 55
1398
1399     def __init__(self, helper_furl=None, stats_provider=None):
1400         self._helper_furl = helper_furl
1401         self.stats_provider = stats_provider
1402         self._helper = None
1403         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1404         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1405         service.MultiService.__init__(self)
1406
1407     def startService(self):
1408         service.MultiService.startService(self)
1409         if self._helper_furl:
1410             self.parent.tub.connectTo(self._helper_furl,
1411                                       self._got_helper)
1412
1413     def _got_helper(self, helper):
1414         self.log("got helper connection, getting versions")
1415         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1416                     { },
1417                     "application-version": "unknown: no get_version()",
1418                     }
1419         d = add_version_to_remote_reference(helper, default)
1420         d.addCallback(self._got_versioned_helper)
1421
1422     def _got_versioned_helper(self, helper):
1423         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1424         if needed not in helper.version:
1425             raise InsufficientVersionError(needed, helper.version)
1426         self._helper = helper
1427         helper.notifyOnDisconnect(self._lost_helper)
1428
1429     def _lost_helper(self):
1430         self._helper = None
1431
1432     def get_helper_info(self):
1433         # return a tuple of (helper_furl_or_None, connected_bool)
1434         return (self._helper_furl, bool(self._helper))
1435
1436
1437     def upload(self, uploadable, history=None):
1438         """
1439         Returns a Deferred that will fire with the UploadResults instance.
1440         """
1441         assert self.parent
1442         assert self.running
1443
1444         uploadable = IUploadable(uploadable)
1445         d = uploadable.get_size()
1446         def _got_size(size):
1447             default_params = self.parent.get_encoding_parameters()
1448             precondition(isinstance(default_params, dict), default_params)
1449             precondition("max_segment_size" in default_params, default_params)
1450             uploadable.set_default_encoding_parameters(default_params)
1451
1452             if self.stats_provider:
1453                 self.stats_provider.count('uploader.files_uploaded', 1)
1454                 self.stats_provider.count('uploader.bytes_uploaded', size)
1455
1456             if size <= self.URI_LIT_SIZE_THRESHOLD:
1457                 uploader = LiteralUploader()
1458                 return uploader.start(uploadable)
1459             else:
1460                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1461                 d2 = defer.succeed(None)
1462                 if self._helper:
1463                     uploader = AssistedUploader(self._helper)
1464                     d2.addCallback(lambda x: eu.get_storage_index())
1465                     d2.addCallback(lambda si: uploader.start(eu, si))
1466                 else:
1467                     storage_broker = self.parent.get_storage_broker()
1468                     secret_holder = self.parent._secret_holder
1469                     uploader = CHKUploader(storage_broker, secret_holder)
1470                     d2.addCallback(lambda x: uploader.start(eu))
1471
1472                 self._all_uploads[uploader] = None
1473                 if history:
1474                     history.add_upload(uploader.get_upload_status())
1475                 def turn_verifycap_into_read_cap(uploadresults):
1476                     # Generate the uri from the verifycap plus the key.
1477                     d3 = uploadable.get_encryption_key()
1478                     def put_readcap_into_results(key):
1479                         v = uri.from_string(uploadresults.verifycapstr)
1480                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1481                         uploadresults.uri = r.to_string()
1482                         return uploadresults
1483                     d3.addCallback(put_readcap_into_results)
1484                     return d3
1485                 d2.addCallback(turn_verifycap_into_read_cap)
1486                 return d2
1487         d.addCallback(_got_size)
1488         def _done(res):
1489             uploadable.close()
1490             return res
1491         d.addBoth(_done)
1492         return d